viral32111_stomp/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use self::frame::Frame;
use std::error::Error;
use std::io::{Read, Write};
use std::net::{Shutdown, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;

pub mod frame;
pub mod header;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);

/// Represents a connection to a STOMP server.
pub struct Connection {
	tcp_stream: TcpStream,
	receive_thread: Option<JoinHandle<()>>,
	host_header: String,
	pub frame_receiver: Receiver<Result<Frame, String>>, // String instead of Box<dyn Error> as the latter doesn't implement Send trait
}

impl Connection {
	// Sends the CONNECT frame to the STOMP server.
	pub fn authenticate(&mut self, username: &str, password: &str) -> Result<(), Box<dyn Error>> {
		let headers = vec![
			("accept-version", "1.2"),
			("host", self.host_header.as_str()),
			("heart-beat", "0,0"), // TODO: Implement heart-beating
			("login", username),
			("passcode", password),
		];

		let frame = frame::create("CONNECT", Some(headers), None);

		self.tcp_stream.write_all(frame.as_bytes())?;

		Ok(())
	}

	/// Subscribes to a topic on the STOMP server.
	pub fn subscribe(&mut self, identifier: u32, topic: &str) -> Result<(), Box<dyn Error>> {
		let id = identifier.to_string();

		let headers = vec![
			("id", id.as_str()),
			("destination", topic),
			("ack", "auto"), // TODO: Implement acknowledgements
		];

		let frame = frame::create("SUBSCRIBE", Some(headers), None);

		self.tcp_stream.write_all(frame.as_bytes())?;

		Ok(())
	}

	/// Waits for the connection to close.
	pub fn wait(&mut self) -> Result<(), Box<dyn Error>> {
		// Don't bother if the thread no longer exists
		if self.receive_thread.is_none() {
			return Ok(());
		}

		// Yoink the thread handle & wait for it to finish
		let result = self.receive_thread.take().unwrap().join();
		if result.is_err() {
			return Err("Unable to join receive thread".into());
		}

		Ok(())
	}

	/// Closes the connection to the STOMP server.
	pub fn close(&mut self) -> Result<(), Box<dyn Error>> {
		self.tcp_stream.shutdown(Shutdown::Both)?;

		self.wait()?;

		Ok(())
	}
}

/// Establishes a connection to a STOMP server.
pub fn open(
	host: &str,
	port: u16,
	timeout: Option<Duration>,
) -> Result<Connection, Box<dyn Error>> {
	// Convert the host name & port number into a usable socket address
	let address = format!("{}:{}", host, port)
		.to_socket_addrs()?
		.last()
		.expect(format!("Unable to convert '{}:{}' to socket address", host, port).as_str());

	// Open a TCP stream to the this address
	let tcp_stream = TcpStream::connect_timeout(&address, timeout.unwrap_or(DEFAULT_TIMEOUT))?;

	// Configure this stream
	tcp_stream.set_nodelay(true)?;
	tcp_stream.set_write_timeout(timeout.or(Some(DEFAULT_TIMEOUT)))?;

	let (frame_sender, frame_receiver) = channel();

	// Spawn a thread to listen for incoming bytes
	let tcp_stream_clone = tcp_stream.try_clone()?;
	let receive_thread = spawn(move || {
		let result = receive_bytes(tcp_stream_clone, frame_sender); // Blocks until the TCP stream is closed

		if result.is_err() {
			let reason = result.err().unwrap_or("Unknown error".into()).to_string();
			panic!("Unable to receive bytes: {}", reason);
		}
	});

	// Give the caller a handle to this connection
	Ok(Connection {
		tcp_stream,
		receive_thread: Some(receive_thread),
		host_header: host.to_string(),
		frame_receiver,
	})
}

/// Continuously waits for bytes from the STOMP server.
fn receive_bytes(
	mut tcp_stream: TcpStream,
	frame_sender: Sender<Result<Frame, String>>,
) -> Result<(), Box<dyn Error>> {
	let mut receive_buffer = [0; 4096]; // 4 KiB
	let mut pending_data: Vec<u8> = Vec::new(); // Infinite

	loop {
		// Try to receive some bytes
		let received_byte_count = tcp_stream.read(&mut receive_buffer)?;
		if received_byte_count == 0 {
			return Ok(()); // Give up, there's nothing left to receive
		}

		// Append the received bytes to the unprocessed data
		pending_data.extend_from_slice(&receive_buffer[..received_byte_count]);

		// Remove any complete frames from the unprocessed data
		while let Some((frame, end_position)) = frame::parse(&mut pending_data)? {
			pending_data.drain(..end_position + 1);
			frame_sender.send(Ok(frame))?;
		}
	}
}

/*
#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn it_works() {
		let result = add(2, 2);
		assert_eq!(result, 4);
	}
}
*/