darwin_push_port_live_feed/
main.rs1use env_file_reader::read_file;
2use std::{error::Error, path::Path, process::exit};
3use viral32111_stomp::{frame::Frame, header::Headers, open};
4use viral32111_xml::parse;
5
6fn main() -> Result<(), Box<dyn Error>> {
21 if !Path::new(".env").exists() {
22 eprintln!("The '.env' file does not exist in the current directory!");
23 exit(1);
24 }
25
26 let environment_variables = read_file(".env")?;
27
28 let host = environment_variables
29 .get("DARWIN_HOST")
30 .expect("Environment variable 'DARWIN_HOST' not present in .env file");
31 let port = environment_variables
32 .get("DARWIN_PORT")
33 .expect("Environment variable 'DARWIN_PORT' not present in .env file")
34 .parse::<u16>()?;
35 let username = environment_variables
36 .get("DARWIN_USERNAME")
37 .expect("Environment variable 'DARWIN_USERNAME' not present in .env file");
38 let password = environment_variables
39 .get("DARWIN_PASSWORD")
40 .expect("Environment variable 'DARWIN_PASSWORD' not present in .env file");
41
42 let mut connection = open(host, port, None)?;
43 connection.authenticate(username, password)?;
44 connection.subscribe(0, "/topic/darwin.pushport-v16")?;
45 for frame in connection.frame_receiver.iter() {
48 match frame {
49 Ok(frame) => {
50 handle_stomp_frame(frame)?;
51 }
52 Err(error) => {
53 eprintln!("Unable to receive STOMP frame! ({})\nAre the 'DARWIN_USERNAME' and 'DARWIN_PASSWORD' environment variables correct?", error);
54 }
55 }
56 }
57
58 connection.wait()?;
59 connection.close()?;
60
61 Ok(())
62}
63
64fn handle_stomp_frame(frame: Frame) -> Result<(), Box<dyn Error>> {
65 if frame.command == "CONNECTED" {
66 println!("Connected to STOMP server!");
67 return Ok(());
68 }
69
70 if frame.command == "MESSAGE" && frame.body.is_some() {
71 let body = frame.body.unwrap();
72
73 let content_type_option = frame.headers.iter().find_map(|(name, value)| {
74 if name.eq(Headers::ContentType.as_str()) {
75 return Some(value.to_string());
76 }
77
78 None
79 });
80
81 if content_type_option.is_none() {
82 return Err("No content type specified in message!".into());
83 }
84
85 let content_type = content_type_option.unwrap();
86 if !content_type.eq("application/xml") {
87 return Err(format!("Unexpected content type '{}'", content_type).into());
88 }
89
90 handle_stomp_xml_body(body)?;
91
92 return Ok(());
93 }
94
95 println!("{}", frame.command);
97 for (name, value) in frame.headers.clone() {
98 println!("{}: {}", name, value);
99 }
100 println!("");
101 if frame.body.is_some() {
102 println!("{}", frame.body.unwrap());
103 }
104
105 Ok(())
106}
107
108fn handle_stomp_xml_body(body: String) -> Result<(), Box<dyn std::error::Error>> {
109 let document = parse(&body)?;
110
111 let root_name = document.root.name.as_ref();
112 if root_name.is_none() {
113 return Err("Root element has no name!".into());
114 }
115 let root_name = root_name.unwrap();
116
117 let root_attributes = document.root.attributes.as_ref();
118 if root_attributes.is_none() {
119 return Err("Root element has no attributes!".into());
120 }
121
122 let root_children = document.root.children.as_ref();
123 if root_children.is_none() {
124 return Err("Root element has no children!".into());
125 }
126
127 if root_name != "Pport" {
128 return Err("Root element is not 'Pport'!".into());
129 }
130
131 let timestamp_iso8601 = root_attributes.as_ref().unwrap().get("ts").unwrap();
132
133 std::fs::write(format!("data/{}.xml", timestamp_iso8601), body)?;
134
135 let pport_elements = root_children.as_ref().unwrap();
136 if pport_elements.len() != 1 {
137 return Err("Pport element has more than 1 child!".into());
138 }
139
140 let ur = pport_elements.first().unwrap();
141 let ur_name = ur.name.as_ref();
142 if ur_name.is_none() {
143 return Err("Pport -> uR element has no name!".into());
144 }
145 let ur_name = ur_name.unwrap();
146 if ur_name != "uR" {
147 return Err("Pport -> uR element is not 'uR'!".into());
148 }
149
150 Ok(())
153}