From e81104f9eb79f0a7832b4490b6947b479e2c6fa0 Mon Sep 17 00:00:00 2001 From: Valmo Trindade Date: Wed, 4 Dec 2024 22:23:35 -0300 Subject: [PATCH] [WIP] added second thread with OpenTAKServer API handling --- src/api.rs | 84 +++++++++++++++++++++++++++ src/commands.rs | 146 ----------------------------------------------- src/lib.rs | 27 +++++++-- src/structs.rs | 91 +++++++++++++++++++++++++++++ src/util.rs | 145 ++++++++++++++++++++++++++++++++++++++++++++++ src/websocket.rs | 122 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 463 insertions(+), 152 deletions(-) create mode 100644 src/api.rs create mode 100644 src/util.rs create mode 100644 src/websocket.rs diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..7d2277d --- /dev/null +++ b/src/api.rs @@ -0,0 +1,84 @@ +use once_cell::sync::Lazy; +use tokio::runtime::Runtime; + +use crate::{ + structs::LoginPayload, + util::{blocking_fetch_auth_token, parse_login_to_payload}, +}; + +pub static RUNTIME: Lazy = Lazy::new(|| { + Runtime::new().expect("Failed to build the Tokio Runtime") +}); + +pub fn get_auth_token(login_payload: LoginPayload) -> String { + let api_address = login_payload.address.clone(); + let login_info = parse_login_to_payload(login_payload); + + return blocking_fetch_auth_token(login_info, api_address); +} + +pub mod markers { + use crate::{structs::Marker, util::{async_post_markers, parse_marker_to_payload}}; + use log::{error, info}; + use std::thread; + + use super::RUNTIME; + + pub fn get(placeholder: String) -> &'static str { + info!("{}", placeholder); + + return "not implemented yet"; + } + + pub fn post(data: Vec) -> &'static str { + thread::spawn(move || { + RUNTIME.block_on(async_post_markers(data)); + }); + + "loading" + } + + pub fn post_debug(data: Vec) -> String { + let client = reqwest::blocking::Client::new(); + + let authentication_token = data[0].api_auth_token.clone(); + let parsed_address: String = + data[0].api_address.clone() + "/api/markers?auth_token=" + &authentication_token; + + let mut status: String = "fetching".to_string(); + + for marker in data { + let payload = parse_marker_to_payload(marker); + let request_body = serde_json::to_string(&payload).unwrap(); + + info!( + "Parsing: {} with {}", + request_body, authentication_token + ); + + let response = client + .post(parsed_address.clone()) + .body(request_body) + .header("Content-Type", "application/json") + .send(); + + match response { + Ok(result) => { + info!("Received: {}", result.text().unwrap()); + } + Err(error) => { + status = "fetch failed".to_string(); + error!("Error: {}", error) + } + } + }; + + return status.to_string(); + } + + pub fn delete(placeholder: String) -> &'static str { + info!("{}", placeholder); + + return "not implemented yet"; + } +} diff --git a/src/commands.rs b/src/commands.rs index 1c50256..e69de29 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,146 +0,0 @@ -use lazy_static::lazy_static; -use log::info; -use std::net::{IpAddr, UdpSocket}; -use std::sync::mpsc::{self, Receiver, Sender}; -use std::sync::{Arc, Mutex}; -use std::thread; -use ws::{listen, Message, Result as WsResult}; - -use crate::structs::{IntoMessage, LocationPayload}; - -enum WsCommand { - SendMessage(String), - Stop, -} - -struct WsServer { - tx: Sender, -} - -impl WsServer { - fn start(&self, rx: Receiver) { - let clients = Arc::new(Mutex::new(Vec::new())); - let clients_clone = Arc::clone(&clients); - - thread::spawn(move || { - let mut running = true; - - let ws_thread = thread::spawn(move || { - listen("0.0.0.0:4152", |out| { - let clients_inner = Arc::clone(&clients_clone); - { - let mut clients_guard = clients_inner.lock().unwrap(); - clients_guard.push(out.clone()); - } - - move |msg: Message| -> WsResult<()> { - info!("Received: {}", msg); - Ok(()) - } - }) - .unwrap(); - }); - - while running { - match rx.recv() { - Ok(WsCommand::SendMessage(message)) => { - let clients_guard = clients.lock().unwrap(); - for client in clients_guard.iter() { - client.send(message.clone()).unwrap(); - } - } - Ok(WsCommand::Stop) => { - running = false; - info!("Stopping WebSocket server."); - } - Err(error) => { - info!("Error receiving command: {}", error.to_string()); - } - } - } - - ws_thread.join().unwrap(); - }); - } - - fn send_message(&self, payload: T) { - let message = payload.into_message(); - self.tx.send(WsCommand::SendMessage(message)).unwrap(); - } - - fn stop(&self) { - self.tx.send(WsCommand::Stop).unwrap(); - } -} - -lazy_static! { - static ref WEBSOCKET_SERVER: Arc>> = Arc::new(Mutex::new(None)); -} - -pub fn start() -> &'static str { - let (tx, rx): (Sender, Receiver) = mpsc::channel(); - - let server = WsServer { tx }; - server.start(rx); - - let mut server_guard = WEBSOCKET_SERVER.lock().unwrap(); - *server_guard = Some(server); - - info!("WebSocket server started."); - - "Starting WebSocket Server" -} - -pub fn message(payload: String) -> &'static str { - if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { - info!("Broadcasting message: {}", payload); - server.send_message(payload); - } else { - info!("WebSocket server is not running."); - } - - "Sending message to all WebSocket clients" -} - -pub fn location(payload: LocationPayload) -> &'static str { - if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { - server.send_message(payload); - } else { - info!("WebSocket server is not running."); - } - "sending location to all WebSocket clients" -} - -pub fn stop() -> &'static str { - if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { - server.stop(); - } else { - info!("WebSocket server is not running."); - } - - "Stopping WebSocket server" -} - -pub fn local_address() -> String { - fn get_local_ip() -> Result { - let socket = UdpSocket::bind("0.0.0.0:0").map_err(|e| e.to_string())?; - socket - .connect("8.8.8.8:80") - .map_err(|e| e.to_string())?; - socket - .local_addr() - .map(|addr| addr.ip()) - .map_err(|e| e.to_string()) - } - - let parsed_data = get_local_ip(); - - match parsed_data { - Ok(ip) => { - return format!("ws://{}:4152", ip.to_string()); - }, - Err(_) => { - return "not provided".to_string(); - }, - } -} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 15fda45..ee71f1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,10 @@ -use arma_rs::{arma, Extension}; +use arma_rs::{arma, Extension, Group}; mod commands; mod structs; mod tests; +mod websocket; +mod api; +mod util; #[arma] pub fn init() -> Extension { @@ -27,10 +30,22 @@ pub fn init() -> Extension { log4rs::init_config(config).unwrap(); Extension::build() - .command("start", commands::start) - .command("stop", commands::stop) - .command("local_ip", commands::local_address) - .command("message", commands::message) - .command("location", commands::location) + .group("api", Group::new() + .command("start", websocket::start) + .command("stop", websocket::stop) + .command("message", websocket::message) + .command("location", websocket::location) + ) + .command("local_ip", util::get_local_address) + .command("uuid", util::get_uuid) + .command("get_auth_token", api::get_auth_token) + .group( + "markers", + Group::new() + .command("get", api::markers::get) + .command("post", api::markers::post) + .command("post_debug", api::markers::post_debug) + .command("delete", api::markers::delete), + ) .finish() } diff --git a/src/structs.rs b/src/structs.rs index 30a4435..8460e5a 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -36,3 +36,94 @@ impl FromArma for LocationPayload { }) } } + + +#[derive(Serialize)] +pub struct Marker { + pub uid: String, + pub longitude: f64, + pub latitude: f64, + pub name: String, + pub r#type: String, + pub course: f64, + pub speed: f64, + pub hae: f64, + pub api_address: String, + pub api_auth_token: String, +} + +impl FromArma for Marker { + fn from_arma(data: String) -> Result { + let ( + uid, + latitude, + longitude, + speed, + course, + r#type, + name, + hae, + api_address, + api_auth_token, + ) = <( + String, + f64, + f64, + f64, + f64, + String, + String, + f64, + String, + String + )>::from_arma(data)?; + Ok(Self { + uid, + latitude, + longitude, + speed, + course, + r#type, + name, + hae, + api_address, + api_auth_token + }) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MarkerPayload { + pub uid: String, + pub longitude: f64, + pub latitude: f64, + pub name: String, + pub r#type: String, + pub course: f64, + pub speed: f64, + pub hae: f64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct LoginPayload { + pub address: String, + pub username: String, + pub password: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct LoginInfo { + pub username: String, + pub password: String +} + +impl FromArma for LoginPayload { + fn from_arma(data: String) -> Result { + let (address, username, password) = <(String, String, String)>::from_arma(data)?; + Ok(Self { + address, + username, + password, + }) + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..8a05f68 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,145 @@ +use log::{error, info}; +use reqwest::Client; +use std::net::{IpAddr, UdpSocket}; +use crate::structs::{LoginInfo, LoginPayload, Marker, MarkerPayload}; + +pub fn get_uuid() -> String { + use uuid::Uuid; + + let id = Uuid::new_v4().to_string(); + + return id +} + +pub fn get_local_address() -> String { + fn get_local_ip() -> Result { + let socket = UdpSocket::bind("0.0.0.0:0").map_err(|e| e.to_string())?; + socket + .connect("8.8.8.8:80") + .map_err(|e| e.to_string())?; + socket + .local_addr() + .map(|addr| addr.ip()) + .map_err(|e| e.to_string()) + } + + let parsed_data = get_local_ip(); + + match parsed_data { + Ok(ip) => { + return format!("ws://{}:4152", ip.to_string()); + }, + Err(_) => { + return "not provided".to_string(); + }, + } +} + +pub fn parse_login_to_payload(login_payload: LoginPayload) -> LoginInfo { + return LoginInfo { + username: login_payload.username.to_owned(), + password: login_payload.password.to_owned() + } +} + +pub fn parse_marker_to_payload(marker: Marker) -> MarkerPayload { + return MarkerPayload { + uid: marker.uid, + longitude: marker.longitude, + latitude: marker.latitude, + name: marker.name, + r#type: marker.r#type, + course: marker.course, + speed: marker.speed, + hae: marker.hae + } +} + +pub async fn async_post_markers(data: Vec) { + let client = Client::new(); + + let authentication_token = data[0].api_auth_token.clone(); + let parsed_address: String = + data[0].api_address.clone() + "/api/markers?auth_token=" + &authentication_token; + + let mut status: String = "fetching".to_string(); + + info!("{}", status); + + for marker in data { + let payload = parse_marker_to_payload(marker); + let request_body = serde_json::to_string(&payload).unwrap(); + + info!( + "Parsing: {}, to {} with {}", + request_body, parsed_address, authentication_token + ); + + let response = client + .post(&parsed_address) + .body(request_body) + .header("Content-Type", "application/json") + .send() + .await; + + match response { + Ok(result) => { + status = result.status().to_string(); + info!("Received: {}", result.text().await.unwrap()); + } + Err(error) => { + status = "fetch failed".to_string(); + error!("Error: {}", error) + } + } + } + + info!("Final status: {}", status); +} + +pub fn blocking_fetch_auth_token(payload: LoginInfo, api_address: String) -> String { + let parsed_address = api_address + "/api/login?include_auth_token"; + + let request_body = serde_json::to_string(&payload).unwrap(); + let client = reqwest::blocking::Client::new(); + let response = client + .post(&parsed_address) + .body(request_body) + .header("Content-Type", "application/json") + .send(); + + match response { + Ok(result) => { + let response_body: Result = + serde_json::from_str(&result.text().unwrap()); + + match response_body { + Ok(result) => { + let csrf_token = result["response"]["user"]["authentication_token"].as_str(); + info!("Provided JSON: {:?}", result.as_str()); + match csrf_token { + Some(result) => { + return result.to_string(); + } + None => { + let message = "ERROR: Provided JSON doesnt match a valid Authentication Token"; + error!("{}", message); + + return message.to_string(); + } + } + } + Err(error) => { + error!("ERROR: failed to parse the response body to a valid JSON: {}", error); + + return "ERROR: failed to parse the response body to a valid JSON".to_string(); + } + } + } + Err(error) => { + error!("ERROR: failed to fetch the OTS API: {}", error); + + return "ERROR: failed to fetch the OTS API".to_string(); + } + } +} \ No newline at end of file diff --git a/src/websocket.rs b/src/websocket.rs new file mode 100644 index 0000000..1513528 --- /dev/null +++ b/src/websocket.rs @@ -0,0 +1,122 @@ +use log::info; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread; +use ws::{listen, Message, Result as WsResult}; +use crate::structs::{IntoMessage, LocationPayload}; +use lazy_static::lazy_static; + +pub enum WsCommand { + SendMessage(String), + Stop, +} + +pub struct WsServer { + pub(crate) tx: Sender, +} + +impl WsServer { + pub fn start(&self, rx: Receiver) { + let clients = Arc::new(Mutex::new(Vec::new())); + let clients_clone = Arc::clone(&clients); + + thread::spawn(move || { + let mut running = true; + + let ws_thread = thread::spawn(move || { + listen("0.0.0.0:4152", |out| { + let clients_inner = Arc::clone(&clients_clone); + { + let mut clients_guard = clients_inner.lock().unwrap(); + clients_guard.push(out.clone()); + } + + move |msg: Message| -> WsResult<()> { + info!("Received: {}", msg); + Ok(()) + } + }) + .unwrap(); + }); + + while running { + match rx.recv() { + Ok(WsCommand::SendMessage(message)) => { + let clients_guard = clients.lock().unwrap(); + for client in clients_guard.iter() { + client.send(message.clone()).unwrap(); + } + } + Ok(WsCommand::Stop) => { + running = false; + info!("Stopping WebSocket server."); + } + Err(error) => { + info!("Error receiving command: {}", error.to_string()); + } + } + } + + ws_thread.join().unwrap(); + }); + } + + pub fn send_message(&self, payload: T) { + let message = payload.into_message(); + self.tx.send(WsCommand::SendMessage(message)).unwrap(); + } + + pub fn stop(&self) { + self.tx.send(WsCommand::Stop).unwrap(); + } +} + + +lazy_static! { + static ref WEBSOCKET_SERVER: Arc>> = Arc::new(Mutex::new(None)); +} + +pub fn start() -> &'static str { + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + + let server = WsServer { tx }; + server.start(rx); + + let mut server_guard = WEBSOCKET_SERVER.lock().unwrap(); + *server_guard = Some(server); + + info!("WebSocket server started."); + + "Starting WebSocket Server" +} + +pub fn message(payload: String) -> &'static str { + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + info!("Broadcasting message: {}", payload); + server.send_message(payload); + } else { + info!("WebSocket server is not running."); + } + + "Sending message to all WebSocket clients" +} + + +pub fn location(payload: LocationPayload) -> &'static str { + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + server.send_message(payload); + } else { + info!("WebSocket server is not running."); + } + "sending location to all WebSocket clients" +} + +pub fn stop() -> &'static str { + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + server.stop(); + } else { + info!("WebSocket server is not running."); + } + + "Stopping WebSocket server" +}