viral32111_stomp/
frame.rs

1use flate2::read::GzDecoder;
2use std::{error::Error, io::Read, str::from_utf8};
3
4use crate::header::Headers;
5
6// https://stomp.github.io/stomp-specification-1.2.html
7
8/// Represents a STOMP frame.
9pub struct Frame {
10	pub command: String,
11	pub headers: Vec<(String, String)>,
12	pub body: Option<String>,
13}
14
15/// Creates a STOMP frame.
16pub fn create(command: &str, headers: Option<Vec<(&str, &str)>>, body: Option<&str>) -> String {
17	// Just command & body if there aren't any headers
18	if headers.is_none() {
19		return format!("{}\n\n{}\0", command, body.unwrap_or(""));
20	}
21
22	// Convert headers into colon delimited key-value pairs between line feeds
23	let header_lines = headers
24		.unwrap()
25		.iter()
26		.map(|(name, value)| format!("{}:{}", name, value))
27		.collect::<Vec<String>>()
28		.join("\n");
29
30	// Include headers in the frame
31	return format!("{}\n{}\n\n{}\0", command, header_lines, body.unwrap_or(""));
32}
33
34/// Attempts to parse the first STOMP frame in a byte buffer.
35pub fn parse(buffer: &mut Vec<u8>) -> Result<Option<(Frame, usize)>, Box<dyn Error>> {
36	// TODO: This implementation does not account for optional CR before each LF
37
38	// Can't continue until we have at least a NT + LF
39	if buffer.len() < 2 {
40		return Ok(None); // Wait for more data
41	}
42
43	// Locate the double LF between headers & body
44	let separator_position = buffer.windows(2).position(|bytes| bytes == [b'\n', b'\n']);
45	if separator_position.is_none() {
46		return Ok(None); // Wait for more data
47	}
48
49	// Extract the command from the first line
50	let command_end_position = buffer.iter().position(|&byte| byte == b'\n').unwrap();
51	let command = from_utf8(&buffer[..command_end_position])?
52		.trim_end() // Strip trailing CR/LF
53		.to_string();
54
55	// Extract the headers hereafter until the double LF
56	let headers_start_position = command_end_position + 1;
57	let headers_end_position = separator_position.unwrap() + 1;
58	if headers_end_position > buffer.len() {
59		return Ok(None); // Wait for more data
60	}
61	let headers = from_utf8(&buffer[headers_start_position..headers_end_position])?
62		.lines()
63		.filter_map(|line| {
64			// Skip empty lines
65			if line.is_empty() {
66				return None;
67			}
68
69			// Headers are colon delimited key-value pairs
70			let (name, value) = line.split_once(":")?;
71
72			// Ignore headers with no name
73			if name.is_empty() {
74				return None;
75			}
76
77			// Force name to lowercase
78			let name = name.to_lowercase();
79
80			// Apply transformations to value
81			let value = value
82				.replace("\\r", "\r")
83				.replace("\\n", "\n")
84				.replace("\\c", ":")
85				.replace("\\\\", "\\");
86
87			// Force name to lowercase
88			Some((name, value.to_string()))
89		})
90		.collect::<Vec<(String, String)>>();
91
92	// Find the size of the body
93	let content_length = headers.iter().find_map(|(name, value)| {
94		if name.eq(Headers::ContentLength.as_str()) {
95			return value.parse::<usize>().ok();
96		}
97
98		None
99	});
100
101	// Frame is finished if we don't have a body
102	if content_length.is_none() {
103		// Ensure we're terminated with a NT + LF
104		if buffer.len() < headers_end_position + 2 {
105			return Ok(None); // Wait for more data
106		}
107		if buffer[headers_end_position + 1] != 0x00 {
108			return Err("Frame not null terminated".into());
109		}
110		if buffer[headers_end_position + 2] != b'\n' {
111			return Err("Frame not terminated with a new line".into());
112		}
113
114		// Return the frame & the position of where this frame ends
115		return Ok(Some((
116			Frame {
117				command,
118				headers,
119				body: None,
120			},
121			headers_end_position + 2, // Skip the double LF
122		)));
123	}
124
125	// Decompress the body
126	let body_start_position = headers_end_position + 1; // Move past the double LF
127	let body_length = content_length.unwrap();
128	let body_end_position = body_start_position + body_length;
129	if body_end_position > buffer.len() {
130		return Ok(None); // Wait for more data
131	}
132	let mut decompressor = GzDecoder::new(&buffer[body_start_position..body_end_position]);
133	let mut body = String::new();
134	decompressor.read_to_string(&mut body)?;
135
136	// Ensure we're terminated with a NT + LF
137	if buffer.len() < body_end_position + 2 {
138		return Ok(None); // Wait for more data
139	}
140	if buffer[body_end_position] != 0x00 {
141		return Err("Frame not null terminated".into());
142	}
143	if buffer[body_end_position + 1] != b'\n' {
144		return Err("Frame not terminated with a new line".into());
145	}
146
147	// Return the frame & the position of where this frame ends
148	Ok(Some((
149		Frame {
150			command,
151			headers,
152			body: Some(body),
153		},
154		body_end_position + 1, // Skip the NT + LF
155	)))
156}