From 1890b7f490c244e007348951d9bcf2194c5cbde9 Mon Sep 17 00:00:00 2001 From: Valmo Trindade Date: Wed, 16 Oct 2024 13:08:00 -0300 Subject: [PATCH] added initial version of the websocket branch --- Cargo.toml | 8 +- src/commands.rs | 210 +++++++++++++++++++++++++++++------------------- src/lib.rs | 25 ++---- src/structs.rs | 94 +++------------------- src/util.rs | 109 ------------------------- 5 files changed, 149 insertions(+), 297 deletions(-) delete mode 100644 src/util.rs diff --git a/Cargo.toml b/Cargo.toml index 915e6fc..de1bec1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,18 +3,20 @@ name = "armatak" version = "0.6.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] arma-rs = "1.10.4" +futures = "0.3.31" +futures-util = "0.3.31" +lazy_static = "1.5.0" log = "0.4.22" log4rs = "1.3.0" once_cell = "1.19.0" regex = "1.10.6" -reqwest = {version = "0.12.7", features = ["blocking"]} +reqwest = { version = "0.12.7", features = ["blocking"] } serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" tokio = { version = "1.40", features = ["full"] } +tokio-tungstenite = "0.24.0" [dependencies.uuid] version = "1.10.0" diff --git a/src/commands.rs b/src/commands.rs index 42f2e4c..93ad194 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,101 +1,145 @@ -use once_cell::sync::Lazy; -use tokio::runtime::Runtime; +use std::{sync::Arc, thread}; +use log::{error, info}; +use tokio::{net::TcpListener, sync::{Mutex, oneshot}, task::JoinHandle, runtime::Runtime}; +use tokio_tungstenite::{accept_async, tungstenite::protocol::Message, WebSocketStream}; +use futures::{SinkExt, StreamExt}; +use tokio::net::TcpStream; +use serde_json::json; +use lazy_static::lazy_static; -use crate::{ - structs::LoginPayload, - util::{blocking_fetch_auth_token, parse_login_to_payload}, -}; +type WebSocket = WebSocketStream; -pub static RUNTIME: Lazy = Lazy::new(|| { - Runtime::new().expect("Failed to build the Tokio Runtime") -}); - -pub fn get_auth_token(login_payload: LoginPayload) -> String { - let api_address = login_payload.address.clone(); - let login_info = parse_login_to_payload(login_payload); - - return blocking_fetch_auth_token(login_info, api_address); +struct ServerState { + handle: Option>, + stop_sender: Option>, + clients: Vec>>, } -pub(crate) mod markers { - use crate::{structs::Marker, util::{async_post_markers, parse_marker_to_payload}}; - use log::{error, info}; - use std::thread; - - use super::RUNTIME; - - pub fn get(placeholder: String) -> &'static str { - info!("{}", placeholder); - - return "not implemented yet"; +impl ServerState { + fn new() -> Self { + ServerState { + handle: None, + stop_sender: None, + clients: vec![], + } } +} - pub fn post(data: Vec) -> &'static str { - thread::spawn(move || { - RUNTIME.block_on(async_post_markers(data)); - }); +lazy_static! { + static ref SERVER_STATE: Arc> = Arc::new(Mutex::new(ServerState::new())); +} - "loading" - } +fn create_tokio_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime") +} - pub fn post_debug(data: Vec) -> String { - let client = reqwest::blocking::Client::new(); +pub fn start() -> &'static str { + let rt = create_tokio_runtime(); + let server_state = SERVER_STATE.clone(); - let authentication_token = data[0].api_auth_token.clone(); - let parsed_address: String = - data[0].api_address.clone() + "/api/markers?auth_token=" + &authentication_token; + thread::spawn(move || { + rt.block_on(async { + let mut state = server_state.lock().await; - let mut status: String = "fetching".to_string(); - - info!("{}", status); - - for marker in data { - let payload = parse_marker_to_payload(marker); - let request_body = serde_json::to_string(&payload).unwrap(); - - info!( - "Parsing: {}, to {} with {}", - request_body, parsed_address, authentication_token - ); - - let response = client - .post(parsed_address) - .body(request_body) - .header("Content-Type", "application/json") - .send(); - - match response { - Ok(result) => { - status = result.status().to_string(); - info!("Received: {}", result.text().unwrap()); - } - Err(error) => { - status = "fetch failed".to_string(); - error!("Error: {}", error) - } + if state.handle.is_some() { + info!("Server is already running."); + return; } - return status; + let (stop_tx, stop_rx) = oneshot::channel(); + state.stop_sender = Some(stop_tx); + + state.handle = Some(tokio::spawn(async move { + info!("Starting server..."); + let listener = TcpListener::bind("192.168.15.8:8080").await.expect("Failed to bind"); + info!("WebSocket server running on ws://192.168.15.8:8080"); + + tokio::select! { + _ = async { + while let Ok((stream, _)) = listener.accept().await { + let ws_stream = accept_async(stream).await.expect("Failed to accept WebSocket connection"); + let client = Arc::new(Mutex::new(ws_stream)); + SERVER_STATE.lock().await.clients.push(client.clone()); + + tokio::spawn(handle_client(client)); + } + } => {} + _ = stop_rx => { + info!("Shutting down WebSocket server."); + } + } + })); + }); + }); + + "Server starting..." +} + +async fn handle_client(client: Arc>) { + let mut client = client.lock().await; + + while let Some(Ok(msg)) = client.next().await { + if let Message::Text(text) = msg { + info!("Received message from client: {}", text); } - - return "ok".to_string(); - } - - pub fn delete(placeholder: String) -> &'static str { - info!("{}", placeholder); - - return "not implemented yet"; } } -pub(crate) mod casevac { - pub fn get(placeholder: String) -> String { - format!("ERROR: Not implemented yet, {}", placeholder) - } - pub fn post(placeholder: String) -> String { - format!("ERROR: Not implemented yet, {}", placeholder) - } - pub fn delete(placeholder: String) -> String { - format!("ERROR: Not implemented yet, {}", placeholder) +pub fn send_ping() -> &'static str { + thread::spawn(move || { + let rt = create_tokio_runtime(); + rt.block_on(async { + send_to_all_clients(Message::Text("Ping".into())).await; + }); + }); + + "sending ping..." +} + +pub fn send_location() -> &'static str { + thread::spawn(move || { + let rt = create_tokio_runtime(); + rt.block_on(async { + let location_data = json!({ "location": "42.3601, -71.0589" }); + send_to_all_clients(Message::Text(location_data.to_string())).await; + }); + }); + + "sending location..." +} + +async fn send_to_all_clients(message: Message) { + let state = SERVER_STATE.lock().await; + for client in &state.clients { + let mut client = client.lock().await; + if let Err(e) = client.send(message.clone()).await { + error!("Failed to send message: {:?}", e); + } } } + +pub fn stop() -> &'static str { + let server_state = SERVER_STATE.clone(); + + thread::spawn(move || { + let rt = create_tokio_runtime(); + rt.block_on(async { + let mut state = server_state.lock().await; + + if let Some(stop_tx) = state.stop_sender.take() { + let _ = stop_tx.send(()); + } + + if let Some(handle) = state.handle.take() { + let _ = handle.await; + state.clients.clear(); + info!("Server stopped."); + } + }); + }); + + "Server stopping..." +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index bccf4bc..66ce687 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,7 @@ -use arma_rs::{arma, Extension, Group}; -use util::get_uuid; +use arma_rs::{arma, Extension}; mod commands; mod structs; mod tests; -mod util; #[arma] pub fn init() -> Extension { @@ -29,22 +27,9 @@ pub fn init() -> Extension { log4rs::init_config(config).unwrap(); Extension::build() - .command("uuid", get_uuid) - .command("get_auth_token", commands::get_auth_token) - .group( - "markers", - Group::new() - .command("get", commands::markers::get) - .command("post", commands::markers::post) - .command("post_debug", commands::markers::post_debug) - .command("delete", commands::markers::delete), - ) - .group( - "casevac", - Group::new() - .command("get", commands::casevac::get) - .command("post", commands::casevac::post) - .command("delete", commands::casevac::delete), - ) + .command("start", commands::start) + .command("stop", commands::stop) + .command("ping", commands::send_ping) + .command("location", commands::send_location) .finish() } diff --git a/src/structs.rs b/src/structs.rs index d1e3f9a..bd8b95e 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -1,92 +1,22 @@ use arma_rs::{FromArma, FromArmaError}; use serde::{Deserialize, Serialize}; -#[derive(Serialize)] -pub struct Marker { - pub uid: String, - pub longitude: f64, - pub latitude: f64, - pub name: String, - pub r#type: String, - pub course: f64, - pub speed: f64, - pub hae: f64, - pub api_address: String, - pub api_auth_token: String, -} - -impl FromArma for Marker { - fn from_arma(data: String) -> Result { - let ( - uid, - latitude, - longitude, - speed, - course, - r#type, - name, - hae, - api_address, - api_auth_token, - ) = <( - String, - f64, - f64, - f64, - f64, - String, - String, - f64, - String, - String - )>::from_arma(data)?; - Ok(Self { - uid, - latitude, - longitude, - speed, - course, - r#type, - name, - hae, - api_address, - api_auth_token - }) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MarkerPayload { - pub uid: String, - pub longitude: f64, - pub latitude: f64, - pub name: String, - pub r#type: String, - pub course: f64, - pub speed: f64, - pub hae: f64, -} - #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct LoginPayload { - pub address: String, - pub username: String, - pub password: String, +pub struct LocationPayload { + pub latitude: f32, + pub longitude: f32, + pub altitude: f32, + pub bearing: f32, } -#[derive(Debug, Serialize, Deserialize)] -pub struct LoginInfo { - pub username: String, - pub password: String -} - -impl FromArma for LoginPayload { - fn from_arma(data: String) -> Result { - let (address, username, password) = <(String, String, String)>::from_arma(data)?; +impl FromArma for LocationPayload { + fn from_arma(data: String) -> Result { + let (latitude, longitude, altitude, bearing) = <(f32, f32, f32, f32)>::from_arma(data)?; Ok(Self { - address, - username, - password, + latitude, + longitude, + altitude, + bearing, }) } } diff --git a/src/util.rs b/src/util.rs deleted file mode 100644 index 260c77c..0000000 --- a/src/util.rs +++ /dev/null @@ -1,109 +0,0 @@ -use log::{error, info}; -use reqwest::Client; - -use crate::structs::{LoginInfo, LoginPayload, Marker, MarkerPayload}; - -pub fn get_uuid() -> String { - use uuid::Uuid; - - let id = Uuid::new_v4().to_string(); - - return id -} - -pub fn parse_login_to_payload(login_payload: LoginPayload) -> LoginInfo { - return LoginInfo { - username: login_payload.username.to_owned(), - password: login_payload.password.to_owned() - } -} - -pub fn parse_marker_to_payload(marker: Marker) -> MarkerPayload { - return MarkerPayload { - uid: marker.uid, - longitude: marker.longitude, - latitude: marker.latitude, - name: marker.name, - r#type: marker.r#type, - course: marker.course, - speed: marker.speed, - hae: marker.hae - } -} - -pub async fn async_post_markers(data: Vec) { - let client = Client::new(); - - let authentication_token = data[0].api_auth_token.clone(); - let parsed_address: String = - data[0].api_address.clone() + "/api/markers?auth_token=" + &authentication_token; - - for marker in data { - let payload = parse_marker_to_payload(marker); - let request_body = serde_json::to_string(&payload).unwrap(); - - let response = client - .post(&parsed_address) - .body(request_body) - .header("Content-Type", "application/json") - .send() - .await; - - match response { - Ok(result) => { - info!("Received: {}", result.text().await.unwrap()); - } - Err(error) => { - error!("Error: {}", error) - } - } - } - -} - -pub fn blocking_fetch_auth_token(payload: LoginInfo, api_address: String) -> String { - let parsed_address = api_address + "/api/login?include_auth_token"; - - let request_body = serde_json::to_string(&payload).unwrap(); - let client = reqwest::blocking::Client::new(); - let response = client - .post(&parsed_address) - .body(request_body) - .header("Content-Type", "application/json") - .send(); - - match response { - Ok(result) => { - let response_body: Result = - serde_json::from_str(&result.text().unwrap()); - - match response_body { - Ok(result) => { - let csrf_token = result["response"]["user"]["authentication_token"].as_str(); - info!("Provided JSON: {:?}", result.as_str()); - match csrf_token { - Some(result) => { - return result.to_string(); - } - None => { - let message = "ERROR: Provided JSON doesnt match a valid Authentication Token"; - error!("{}", message); - - return message.to_string(); - } - } - } - Err(error) => { - error!("ERROR: failed to parse the response body to a valid JSON: {}", error); - - return "ERROR: failed to parse the response body to a valid JSON".to_string(); - } - } - } - Err(error) => { - error!("ERROR: failed to fetch the OTS API: {}", error); - - return "ERROR: failed to fetch the OTS API".to_string(); - } - } -} \ No newline at end of file