mirror of
https://github.com/valmojr/armatak.git
synced 2026-06-13 15:23:28 +00:00
refactored the code to get and send messages from a websocket server
This commit is contained in:
637
Cargo.lock
generated
637
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -7,6 +7,7 @@ edition = "2021"
|
|||||||
arma-rs = "1.10.4"
|
arma-rs = "1.10.4"
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
futures-util = "0.3.31"
|
futures-util = "0.3.31"
|
||||||
|
http = "1.1.0"
|
||||||
lazy_static = "1.5.0"
|
lazy_static = "1.5.0"
|
||||||
log = "0.4.22"
|
log = "0.4.22"
|
||||||
log4rs = "1.3.0"
|
log4rs = "1.3.0"
|
||||||
@@ -17,6 +18,7 @@ serde = { version = "1.0.210", features = ["derive"] }
|
|||||||
serde_json = "1.0.128"
|
serde_json = "1.0.128"
|
||||||
tokio = { version = "1.40", features = ["full"] }
|
tokio = { version = "1.40", features = ["full"] }
|
||||||
tokio-tungstenite = "0.24.0"
|
tokio-tungstenite = "0.24.0"
|
||||||
|
ws = "0.9.2"
|
||||||
|
|
||||||
[dependencies.uuid]
|
[dependencies.uuid]
|
||||||
version = "1.10.0"
|
version = "1.10.0"
|
||||||
|
|||||||
215
src/commands.rs
215
src/commands.rs
@@ -1,153 +1,108 @@
|
|||||||
use futures::{SinkExt, StreamExt};
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::sync::mpsc::{self, Sender, Receiver};
|
||||||
|
use std::thread;
|
||||||
|
use log::info;
|
||||||
|
use ws::{listen, Message, Result as WsResult};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::{error, info};
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use serde_json::json;
|
|
||||||
use std::{sync::Arc, thread};
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
use tokio::{
|
|
||||||
net::TcpListener,
|
|
||||||
runtime::Runtime,
|
|
||||||
sync::{oneshot, Mutex},
|
|
||||||
task::JoinHandle,
|
|
||||||
};
|
|
||||||
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message, WebSocketStream};
|
|
||||||
|
|
||||||
use crate::structs::LocationPayload;
|
enum WsCommand {
|
||||||
|
SendMessage(String),
|
||||||
type WebSocket = WebSocketStream<TcpStream>;
|
Stop,
|
||||||
|
|
||||||
struct ServerState {
|
|
||||||
handle: Option<JoinHandle<()>>,
|
|
||||||
stop_sender: Option<oneshot::Sender<()>>,
|
|
||||||
clients: Vec<Arc<Mutex<WebSocket>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerState {
|
struct WsServer {
|
||||||
fn new() -> Self {
|
tx: Sender<WsCommand>,
|
||||||
ServerState {
|
}
|
||||||
handle: None,
|
|
||||||
stop_sender: None,
|
impl WsServer {
|
||||||
clients: vec![],
|
fn start(&self, rx: Receiver<WsCommand>) {
|
||||||
}
|
let clients = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let clients_clone = Arc::clone(&clients);
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut running = true;
|
||||||
|
|
||||||
|
let ws_thread = thread::spawn(move || {
|
||||||
|
listen("127.0.0.1:3012", |out| {
|
||||||
|
let clients_inner = Arc::clone(&clients_clone);
|
||||||
|
{
|
||||||
|
let mut clients_guard = clients_inner.lock().unwrap();
|
||||||
|
clients_guard.push(out.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
move |msg: Message| -> WsResult<()> {
|
||||||
|
info!("Received: {}", msg);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
while running {
|
||||||
|
match rx.recv() {
|
||||||
|
Ok(WsCommand::SendMessage(message)) => {
|
||||||
|
let clients_guard = clients.lock().unwrap();
|
||||||
|
for client in clients_guard.iter() {
|
||||||
|
client.send(message.clone()).unwrap();
|
||||||
|
}
|
||||||
|
info!("Broadcasting message: {}", message);
|
||||||
|
}
|
||||||
|
Ok(WsCommand::Stop) => {
|
||||||
|
running = false;
|
||||||
|
info!("Stopping WebSocket server.");
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
info!("Error receiving command.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ws_thread.join().unwrap();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_message(&self, message: String) {
|
||||||
|
self.tx.send(WsCommand::SendMessage(message)).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&self) {
|
||||||
|
self.tx.send(WsCommand::Stop).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref SERVER_STATE: Arc<Mutex<ServerState>> = Arc::new(Mutex::new(ServerState::new()));
|
static ref WEBSOCKET_SERVER: Arc<Mutex<Option<WsServer>>> = Arc::new(Mutex::new(None));
|
||||||
static ref LOCATION_PAYLOAD: LocationPayload = LocationPayload {
|
|
||||||
latitude: 0.0,
|
|
||||||
longitude: 0.0,
|
|
||||||
altitude: 0.0,
|
|
||||||
bearing: 0.0,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub static RUNTIME: Lazy<Runtime> =
|
|
||||||
Lazy::new(|| Runtime::new().expect("Failed to build the Tokio Runtime"));
|
|
||||||
|
|
||||||
pub fn start() -> &'static str {
|
pub fn start() -> &'static str {
|
||||||
let server_state = SERVER_STATE.clone();
|
let (tx, rx): (Sender<WsCommand>, Receiver<WsCommand>) = mpsc::channel();
|
||||||
|
|
||||||
thread::spawn(move || {
|
let server = WsServer { tx };
|
||||||
RUNTIME.block_on(async {
|
server.start(rx);
|
||||||
let mut state = server_state.lock().await;
|
|
||||||
|
|
||||||
if state.handle.is_some() {
|
let mut server_guard = WEBSOCKET_SERVER.lock().unwrap();
|
||||||
info!("Server is already running.");
|
*server_guard = Some(server);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (stop_tx, stop_rx) = oneshot::channel();
|
info!("WebSocket server started.");
|
||||||
state.stop_sender = Some(stop_tx);
|
|
||||||
|
|
||||||
state.handle = Some(tokio::spawn(async move {
|
"Starting WebSocket Server"
|
||||||
info!("Starting server...");
|
|
||||||
let listener = TcpListener::bind("192.168.0.43:8080").await.expect("Failed to bind");
|
|
||||||
info!("WebSocket server running on ws://192.168.0.43:8080");
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
_ = async {
|
|
||||||
while let Ok((stream, _)) = listener.accept().await {
|
|
||||||
let ws_stream = accept_async(stream).await.expect("Failed to accept WebSocket connection");
|
|
||||||
let client = Arc::new(Mutex::new(ws_stream));
|
|
||||||
SERVER_STATE.lock().await.clients.push(client.clone());
|
|
||||||
|
|
||||||
tokio::spawn(handle_client(client));
|
|
||||||
}
|
|
||||||
} => {}
|
|
||||||
_ = stop_rx => {
|
|
||||||
info!("Shutting down WebSocket server.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
"Server starting..."
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_client(client: Arc<Mutex<WebSocket>>) {
|
pub fn message(payload: String) -> &'static str {
|
||||||
let mut client = client.lock().await;
|
if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() {
|
||||||
|
server.send_message(payload);
|
||||||
while let Some(Ok(msg)) = client.next().await {
|
} else {
|
||||||
if let Message::Text(text) = msg {
|
info!("WebSocket server is not running.");
|
||||||
info!("Received message from client: {}", text);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_ping() -> &'static str {
|
"Sending message to all WebSocket clients"
|
||||||
thread::spawn(move || {
|
|
||||||
RUNTIME.block_on(async {
|
|
||||||
send_to_all_clients(Message::Text("Ping".into())).await;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
"sending ping..."
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_location() -> &'static str {
|
|
||||||
let current_position = LOCATION_PAYLOAD.clone();
|
|
||||||
thread::spawn(move || {
|
|
||||||
RUNTIME.block_on(async {
|
|
||||||
let location_data = json!(current_position);
|
|
||||||
send_to_all_clients(Message::Text(location_data.to_string())).await;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
"sending location..."
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_to_all_clients(message: Message) {
|
|
||||||
info!("Sending message to all clients: {:?}", message);
|
|
||||||
let state = SERVER_STATE.lock().await;
|
|
||||||
for client in &state.clients {
|
|
||||||
let mut client = client.lock().await;
|
|
||||||
if let Err(e) = client.send(message.clone()).await {
|
|
||||||
error!("Failed to send message: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop() -> &'static str {
|
pub fn stop() -> &'static str {
|
||||||
let server_state = SERVER_STATE.clone();
|
if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() {
|
||||||
|
server.stop();
|
||||||
|
} else {
|
||||||
|
info!("WebSocket server is not running.");
|
||||||
|
}
|
||||||
|
|
||||||
thread::spawn(move || {
|
"Stopping WebSocket server"
|
||||||
RUNTIME.block_on(async {
|
}
|
||||||
let mut state = server_state.lock().await;
|
|
||||||
|
|
||||||
if let Some(stop_tx) = state.stop_sender.take() {
|
|
||||||
let _ = stop_tx.send(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(handle) = state.handle.take() {
|
|
||||||
let _ = handle.await;
|
|
||||||
state.clients.clear();
|
|
||||||
info!("Server stopped.");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
"Server stopping..."
|
|
||||||
}
|
|
||||||
@@ -25,11 +25,10 @@ pub fn init() -> Extension {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log4rs::init_config(config).unwrap();
|
log4rs::init_config(config).unwrap();
|
||||||
|
|
||||||
Extension::build()
|
Extension::build()
|
||||||
.command("start", commands::start)
|
.command("start", commands::start)
|
||||||
.command("stop", commands::stop)
|
.command("stop", commands::stop)
|
||||||
.command("ping", commands::send_ping)
|
.command("message", commands::message)
|
||||||
.command("location", commands::send_location)
|
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user