use self::frame::Frame;
use std::error::Error;
use std::io::{Read, Write};
use std::net::{Shutdown, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
pub mod frame;
pub mod header;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Connection {
tcp_stream: TcpStream,
receive_thread: Option<JoinHandle<()>>,
host_header: String,
pub frame_receiver: Receiver<Result<Frame, String>>, }
impl Connection {
pub fn authenticate(&mut self, username: &str, password: &str) -> Result<(), Box<dyn Error>> {
let headers = vec![
("accept-version", "1.2"),
("host", self.host_header.as_str()),
("heart-beat", "0,0"), ("login", username),
("passcode", password),
];
let frame = frame::create("CONNECT", Some(headers), None);
self.tcp_stream.write_all(frame.as_bytes())?;
Ok(())
}
pub fn subscribe(&mut self, identifier: u32, topic: &str) -> Result<(), Box<dyn Error>> {
let id = identifier.to_string();
let headers = vec![
("id", id.as_str()),
("destination", topic),
("ack", "auto"), ];
let frame = frame::create("SUBSCRIBE", Some(headers), None);
self.tcp_stream.write_all(frame.as_bytes())?;
Ok(())
}
pub fn wait(&mut self) -> Result<(), Box<dyn Error>> {
if self.receive_thread.is_none() {
return Ok(());
}
let result = self.receive_thread.take().unwrap().join();
if result.is_err() {
return Err("Unable to join receive thread".into());
}
Ok(())
}
pub fn close(&mut self) -> Result<(), Box<dyn Error>> {
self.tcp_stream.shutdown(Shutdown::Both)?;
self.wait()?;
Ok(())
}
}
pub fn open(
host: &str,
port: u16,
timeout: Option<Duration>,
) -> Result<Connection, Box<dyn Error>> {
let address = format!("{}:{}", host, port)
.to_socket_addrs()?
.last()
.expect(format!("Unable to convert '{}:{}' to socket address", host, port).as_str());
let tcp_stream = TcpStream::connect_timeout(&address, timeout.unwrap_or(DEFAULT_TIMEOUT))?;
tcp_stream.set_nodelay(true)?;
tcp_stream.set_write_timeout(timeout.or(Some(DEFAULT_TIMEOUT)))?;
let (frame_sender, frame_receiver) = channel();
let tcp_stream_clone = tcp_stream.try_clone()?;
let receive_thread = spawn(move || {
let result = receive_bytes(tcp_stream_clone, frame_sender); if result.is_err() {
let reason = result.err().unwrap_or("Unknown error".into()).to_string();
panic!("Unable to receive bytes: {}", reason);
}
});
Ok(Connection {
tcp_stream,
receive_thread: Some(receive_thread),
host_header: host.to_string(),
frame_receiver,
})
}
fn receive_bytes(
mut tcp_stream: TcpStream,
frame_sender: Sender<Result<Frame, String>>,
) -> Result<(), Box<dyn Error>> {
let mut receive_buffer = [0; 4096]; let mut pending_data: Vec<u8> = Vec::new(); loop {
let received_byte_count = tcp_stream.read(&mut receive_buffer)?;
if received_byte_count == 0 {
return Ok(()); }
pending_data.extend_from_slice(&receive_buffer[..received_byte_count]);
while let Some((frame, end_position)) = frame::parse(&mut pending_data)? {
pending_data.drain(..end_position + 1);
frame_sender.send(Ok(frame))?;
}
}
}