added initial version of the websocket branch

This commit is contained in:
Valmo Trindade
2024-10-16 13:08:00 -03:00
parent 57d633e5a8
commit 1890b7f490
5 changed files with 149 additions and 297 deletions

View File

@@ -3,18 +3,20 @@ name = "armatak"
version = "0.6.0" version = "0.6.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
arma-rs = "1.10.4" arma-rs = "1.10.4"
futures = "0.3.31"
futures-util = "0.3.31"
lazy_static = "1.5.0"
log = "0.4.22" log = "0.4.22"
log4rs = "1.3.0" log4rs = "1.3.0"
once_cell = "1.19.0" once_cell = "1.19.0"
regex = "1.10.6" regex = "1.10.6"
reqwest = {version = "0.12.7", features = ["blocking"]} reqwest = { version = "0.12.7", features = ["blocking"] }
serde = { version = "1.0.210", features = ["derive"] } serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128" serde_json = "1.0.128"
tokio = { version = "1.40", features = ["full"] } tokio = { version = "1.40", features = ["full"] }
tokio-tungstenite = "0.24.0"
[dependencies.uuid] [dependencies.uuid]
version = "1.10.0" version = "1.10.0"

View File

@@ -1,101 +1,145 @@
use once_cell::sync::Lazy; use std::{sync::Arc, thread};
use tokio::runtime::Runtime; use log::{error, info};
use tokio::{net::TcpListener, sync::{Mutex, oneshot}, task::JoinHandle, runtime::Runtime};
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message, WebSocketStream};
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use serde_json::json;
use lazy_static::lazy_static;
use crate::{ type WebSocket = WebSocketStream<TcpStream>;
structs::LoginPayload,
util::{blocking_fetch_auth_token, parse_login_to_payload},
};
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| { struct ServerState {
Runtime::new().expect("Failed to build the Tokio Runtime") handle: Option<JoinHandle<()>>,
}); stop_sender: Option<oneshot::Sender<()>>,
clients: Vec<Arc<Mutex<WebSocket>>>,
pub fn get_auth_token(login_payload: LoginPayload) -> String {
let api_address = login_payload.address.clone();
let login_info = parse_login_to_payload(login_payload);
return blocking_fetch_auth_token(login_info, api_address);
} }
pub(crate) mod markers { impl ServerState {
use crate::{structs::Marker, util::{async_post_markers, parse_marker_to_payload}}; fn new() -> Self {
use log::{error, info}; ServerState {
use std::thread; handle: None,
stop_sender: None,
use super::RUNTIME; clients: vec![],
}
pub fn get(placeholder: String) -> &'static str {
info!("{}", placeholder);
return "not implemented yet";
} }
}
pub fn post(data: Vec<Marker>) -> &'static str { lazy_static! {
thread::spawn(move || { static ref SERVER_STATE: Arc<Mutex<ServerState>> = Arc::new(Mutex::new(ServerState::new()));
RUNTIME.block_on(async_post_markers(data)); }
});
"loading" fn create_tokio_runtime() -> Runtime {
} tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
}
pub fn post_debug(data: Vec<Marker>) -> String { pub fn start() -> &'static str {
let client = reqwest::blocking::Client::new(); let rt = create_tokio_runtime();
let server_state = SERVER_STATE.clone();
let authentication_token = data[0].api_auth_token.clone(); thread::spawn(move || {
let parsed_address: String = rt.block_on(async {
data[0].api_address.clone() + "/api/markers?auth_token=" + &authentication_token; let mut state = server_state.lock().await;
let mut status: String = "fetching".to_string(); if state.handle.is_some() {
info!("Server is already running.");
info!("{}", status); return;
for marker in data {
let payload = parse_marker_to_payload(marker);
let request_body = serde_json::to_string(&payload).unwrap();
info!(
"Parsing: {}, to {} with {}",
request_body, parsed_address, authentication_token
);
let response = client
.post(parsed_address)
.body(request_body)
.header("Content-Type", "application/json")
.send();
match response {
Ok(result) => {
status = result.status().to_string();
info!("Received: {}", result.text().unwrap());
}
Err(error) => {
status = "fetch failed".to_string();
error!("Error: {}", error)
}
} }
return status; let (stop_tx, stop_rx) = oneshot::channel();
state.stop_sender = Some(stop_tx);
state.handle = Some(tokio::spawn(async move {
info!("Starting server...");
let listener = TcpListener::bind("192.168.15.8:8080").await.expect("Failed to bind");
info!("WebSocket server running on ws://192.168.15.8:8080");
tokio::select! {
_ = async {
while let Ok((stream, _)) = listener.accept().await {
let ws_stream = accept_async(stream).await.expect("Failed to accept WebSocket connection");
let client = Arc::new(Mutex::new(ws_stream));
SERVER_STATE.lock().await.clients.push(client.clone());
tokio::spawn(handle_client(client));
}
} => {}
_ = stop_rx => {
info!("Shutting down WebSocket server.");
}
}
}));
});
});
"Server starting..."
}
async fn handle_client(client: Arc<Mutex<WebSocket>>) {
let mut client = client.lock().await;
while let Some(Ok(msg)) = client.next().await {
if let Message::Text(text) = msg {
info!("Received message from client: {}", text);
} }
return "ok".to_string();
}
pub fn delete(placeholder: String) -> &'static str {
info!("{}", placeholder);
return "not implemented yet";
} }
} }
pub(crate) mod casevac { pub fn send_ping() -> &'static str {
pub fn get(placeholder: String) -> String { thread::spawn(move || {
format!("ERROR: Not implemented yet, {}", placeholder) let rt = create_tokio_runtime();
} rt.block_on(async {
pub fn post(placeholder: String) -> String { send_to_all_clients(Message::Text("Ping".into())).await;
format!("ERROR: Not implemented yet, {}", placeholder) });
} });
pub fn delete(placeholder: String) -> String {
format!("ERROR: Not implemented yet, {}", placeholder) "sending ping..."
}
pub fn send_location() -> &'static str {
thread::spawn(move || {
let rt = create_tokio_runtime();
rt.block_on(async {
let location_data = json!({ "location": "42.3601, -71.0589" });
send_to_all_clients(Message::Text(location_data.to_string())).await;
});
});
"sending location..."
}
async fn send_to_all_clients(message: Message) {
let state = SERVER_STATE.lock().await;
for client in &state.clients {
let mut client = client.lock().await;
if let Err(e) = client.send(message.clone()).await {
error!("Failed to send message: {:?}", e);
}
} }
} }
pub fn stop() -> &'static str {
let server_state = SERVER_STATE.clone();
thread::spawn(move || {
let rt = create_tokio_runtime();
rt.block_on(async {
let mut state = server_state.lock().await;
if let Some(stop_tx) = state.stop_sender.take() {
let _ = stop_tx.send(());
}
if let Some(handle) = state.handle.take() {
let _ = handle.await;
state.clients.clear();
info!("Server stopped.");
}
});
});
"Server stopping..."
}

View File

@@ -1,9 +1,7 @@
use arma_rs::{arma, Extension, Group}; use arma_rs::{arma, Extension};
use util::get_uuid;
mod commands; mod commands;
mod structs; mod structs;
mod tests; mod tests;
mod util;
#[arma] #[arma]
pub fn init() -> Extension { pub fn init() -> Extension {
@@ -29,22 +27,9 @@ pub fn init() -> Extension {
log4rs::init_config(config).unwrap(); log4rs::init_config(config).unwrap();
Extension::build() Extension::build()
.command("uuid", get_uuid) .command("start", commands::start)
.command("get_auth_token", commands::get_auth_token) .command("stop", commands::stop)
.group( .command("ping", commands::send_ping)
"markers", .command("location", commands::send_location)
Group::new()
.command("get", commands::markers::get)
.command("post", commands::markers::post)
.command("post_debug", commands::markers::post_debug)
.command("delete", commands::markers::delete),
)
.group(
"casevac",
Group::new()
.command("get", commands::casevac::get)
.command("post", commands::casevac::post)
.command("delete", commands::casevac::delete),
)
.finish() .finish()
} }

View File

@@ -1,92 +1,22 @@
use arma_rs::{FromArma, FromArmaError}; use arma_rs::{FromArma, FromArmaError};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize)]
pub struct Marker {
pub uid: String,
pub longitude: f64,
pub latitude: f64,
pub name: String,
pub r#type: String,
pub course: f64,
pub speed: f64,
pub hae: f64,
pub api_address: String,
pub api_auth_token: String,
}
impl FromArma for Marker {
fn from_arma(data: String) -> Result<Marker, FromArmaError> {
let (
uid,
latitude,
longitude,
speed,
course,
r#type,
name,
hae,
api_address,
api_auth_token,
) = <(
String,
f64,
f64,
f64,
f64,
String,
String,
f64,
String,
String
)>::from_arma(data)?;
Ok(Self {
uid,
latitude,
longitude,
speed,
course,
r#type,
name,
hae,
api_address,
api_auth_token
})
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MarkerPayload {
pub uid: String,
pub longitude: f64,
pub latitude: f64,
pub name: String,
pub r#type: String,
pub course: f64,
pub speed: f64,
pub hae: f64,
}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LoginPayload { pub struct LocationPayload {
pub address: String, pub latitude: f32,
pub username: String, pub longitude: f32,
pub password: String, pub altitude: f32,
pub bearing: f32,
} }
#[derive(Debug, Serialize, Deserialize)] impl FromArma for LocationPayload {
pub struct LoginInfo { fn from_arma(data: String) -> Result<LocationPayload, FromArmaError> {
pub username: String, let (latitude, longitude, altitude, bearing) = <(f32, f32, f32, f32)>::from_arma(data)?;
pub password: String
}
impl FromArma for LoginPayload {
fn from_arma(data: String) -> Result<LoginPayload, FromArmaError> {
let (address, username, password) = <(String, String, String)>::from_arma(data)?;
Ok(Self { Ok(Self {
address, latitude,
username, longitude,
password, altitude,
bearing,
}) })
} }
} }

View File

@@ -1,109 +0,0 @@
use log::{error, info};
use reqwest::Client;
use crate::structs::{LoginInfo, LoginPayload, Marker, MarkerPayload};
pub fn get_uuid() -> String {
use uuid::Uuid;
let id = Uuid::new_v4().to_string();
return id
}
pub fn parse_login_to_payload(login_payload: LoginPayload) -> LoginInfo {
return LoginInfo {
username: login_payload.username.to_owned(),
password: login_payload.password.to_owned()
}
}
pub fn parse_marker_to_payload(marker: Marker) -> MarkerPayload {
return MarkerPayload {
uid: marker.uid,
longitude: marker.longitude,
latitude: marker.latitude,
name: marker.name,
r#type: marker.r#type,
course: marker.course,
speed: marker.speed,
hae: marker.hae
}
}
pub async fn async_post_markers(data: Vec<Marker>) {
let client = Client::new();
let authentication_token = data[0].api_auth_token.clone();
let parsed_address: String =
data[0].api_address.clone() + "/api/markers?auth_token=" + &authentication_token;
for marker in data {
let payload = parse_marker_to_payload(marker);
let request_body = serde_json::to_string(&payload).unwrap();
let response = client
.post(&parsed_address)
.body(request_body)
.header("Content-Type", "application/json")
.send()
.await;
match response {
Ok(result) => {
info!("Received: {}", result.text().await.unwrap());
}
Err(error) => {
error!("Error: {}", error)
}
}
}
}
pub fn blocking_fetch_auth_token(payload: LoginInfo, api_address: String) -> String {
let parsed_address = api_address + "/api/login?include_auth_token";
let request_body = serde_json::to_string(&payload).unwrap();
let client = reqwest::blocking::Client::new();
let response = client
.post(&parsed_address)
.body(request_body)
.header("Content-Type", "application/json")
.send();
match response {
Ok(result) => {
let response_body: Result<serde_json::Value, _> =
serde_json::from_str(&result.text().unwrap());
match response_body {
Ok(result) => {
let csrf_token = result["response"]["user"]["authentication_token"].as_str();
info!("Provided JSON: {:?}", result.as_str());
match csrf_token {
Some(result) => {
return result.to_string();
}
None => {
let message = "ERROR: Provided JSON doesnt match a valid Authentication Token";
error!("{}", message);
return message.to_string();
}
}
}
Err(error) => {
error!("ERROR: failed to parse the response body to a valid JSON: {}", error);
return "ERROR: failed to parse the response body to a valid JSON".to_string();
}
}
}
Err(error) => {
error!("ERROR: failed to fetch the OTS API: {}", error);
return "ERROR: failed to fetch the OTS API".to_string();
}
}
}