From c1b00cb050fb2bc5aad358a0175cbf8f92bc0226 Mon Sep 17 00:00:00 2001 From: Valmo Trindade Date: Fri, 16 May 2025 03:33:08 -0300 Subject: [PATCH] Switched WebSocket start motor to allow callbacks when client connects --- addons/client/XEH_postInit.sqf | 2 + src/lib.rs | 4 +- src/websocket.rs | 217 +++++++++++++++++++++------------ 3 files changed, 143 insertions(+), 80 deletions(-) diff --git a/addons/client/XEH_postInit.sqf b/addons/client/XEH_postInit.sqf index 56d01f8..e53b1b7 100644 --- a/addons/client/XEH_postInit.sqf +++ b/addons/client/XEH_postInit.sqf @@ -4,6 +4,8 @@ if (!hasInterface) exitWith {}; _local_address = "armatak" callExtension ["local_ip", []] select 0; +CALLEXT(websocket:start); + SETVAR(player,GVAR(localAddress),_local_address); [{ diff --git a/src/lib.rs b/src/lib.rs index af1b15a..811525f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,10 +31,10 @@ pub fn init() -> Extension { log4rs::init_config(config).unwrap(); - websocket::start(); - Extension::build() .group("websocket", Group::new() + .command("start", websocket::start) + .command("stop", websocket::stop) .command("message", websocket::message) .command("location", websocket::location) ) diff --git a/src/websocket.rs b/src/websocket.rs index f5c8142..e310e5c 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,114 +1,175 @@ +use arma_rs::Context; +use lazy_static::lazy_static; use log::info; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; -use ws::{listen, Message, Result as WsResult}; +use ws::{listen, CloseCode, Handler, Handshake, Message, Result as WsResult, Sender as WsSender}; + use crate::structs::{IntoMessage, LocationPayload}; -use lazy_static::lazy_static; pub enum WsCommand { - SendMessage(String), - Stop, + SendMessage(String), + Stop, +} + +enum WsEvent { + FirstClientConnected, + LastClientDisconnected, } pub struct WsServer { - pub(crate) tx: Sender, + pub(crate) tx: Sender, +} + +struct WsHandler { + out: WsSender, + clients: Arc>>, + event_tx: Sender, +} + +impl Handler for WsHandler { + fn on_open(&mut self, _: Handshake) -> WsResult<()> { + let mut clients = self.clients.lock().unwrap(); + clients.push(self.out.clone()); + + let count = clients.len(); + info!("New client connected. Total clients: {}", count); + + if count == 1 { + let _ = self.event_tx.send(WsEvent::FirstClientConnected); + } + + Ok(()) + } + + fn on_close(&mut self, _: CloseCode, _: &str) { + let mut clients = self.clients.lock().unwrap(); + clients.retain(|client| client.token() != self.out.token()); + + let count = clients.len(); + info!("Client disconnected. Total clients: {}", count); + + if count == 0 { + let _ = self.event_tx.send(WsEvent::LastClientDisconnected); + } + } + + fn on_message(&mut self, msg: Message) -> WsResult<()> { + info!("Received: {}", msg); + Ok(()) + } } impl WsServer { - pub fn start(&self, rx: Receiver) { - if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { - server.stop(); - } + pub fn start(&self, ctx: Context, rx: Receiver) { + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + server.stop(); + } - let clients = Arc::new(Mutex::new(Vec::new())); - let clients_clone = Arc::clone(&clients); + let clients = Arc::new(Mutex::new(Vec::new())); + let clients_clone = Arc::clone(&clients); + let (event_tx, event_rx): (Sender, Receiver) = mpsc::channel(); - thread::spawn(move || { - let mut running = true; + // Start WebSocket listener in a background thread + thread::spawn(move || { + listen("0.0.0.0:4152", |out| WsHandler { + out, + clients: Arc::clone(&clients_clone), + event_tx: event_tx.clone(), + }).expect("WebSocket server failed to start"); + }); - let ws_thread = thread::spawn(move || { - listen("0.0.0.0:4152", |out| { - let clients_inner = Arc::clone(&clients_clone); - { - let mut clients_guard = clients_inner.lock().unwrap(); - clients_guard.push(out.clone()); - } + // This thread owns ctx and reacts to commands and events + thread::spawn(move || { + loop { + // WebSocket message commands + match rx.try_recv() { + Ok(WsCommand::SendMessage(message)) => { + let clients_guard = clients.lock().unwrap(); + for client in clients_guard.iter() { + let _ = client.send(message.clone()); + } + } + Ok(WsCommand::Stop) => { + info!("Stopping WebSocket server."); + break; + } + Err(_) => {} + } - move |msg: Message| -> WsResult<()> { - info!("Received: {}", msg); - Ok(()) - } - }) - .unwrap(); - }); + // Handle event callbacks with valid Context + match event_rx.try_recv() { + Ok(WsEvent::FirstClientConnected) => { + info!("Triggering callback: client_connected"); + let _ = ctx.callback_null("armatak_websocket", "client_connected"); + } + Ok(WsEvent::LastClientDisconnected) => { + info!("Triggering callback: client_disconnected"); + let _ = ctx.callback_null("armatak_websocket", "client_disconnected"); + } + Err(_) => {} + } - while running { - match rx.recv() { - Ok(WsCommand::SendMessage(message)) => { - let clients_guard = clients.lock().unwrap(); - for client in clients_guard.iter() { - client.send(message.clone()).unwrap(); - } - } - Ok(WsCommand::Stop) => { - running = false; - info!("Stopping WebSocket server."); - } - Err(error) => { - info!("Error receiving command: {}", error.to_string()); - } - } - } + thread::sleep(std::time::Duration::from_millis(10)); + } + }); + } - ws_thread.join().unwrap(); - }); - } + pub fn send_message(&self, payload: T) { + let message = payload.into_message(); + let _ = self.tx.send(WsCommand::SendMessage(message)); + } - pub fn send_message(&self, payload: T) { - let message = payload.into_message(); - self.tx.send(WsCommand::SendMessage(message)).unwrap(); - } - - pub fn stop(&self) { - self.tx.send(WsCommand::Stop).unwrap(); - } + pub fn stop(&self) { + let _ = self.tx.send(WsCommand::Stop); + } } lazy_static! { - static ref WEBSOCKET_SERVER: Arc>> = Arc::new(Mutex::new(None)); + static ref WEBSOCKET_SERVER: Arc>> = Arc::new(Mutex::new(None)); } -pub fn start() -> &'static str { - let (tx, rx): (Sender, Receiver) = mpsc::channel(); +pub fn start(ctx: Context) -> &'static str { + let (tx, rx): (Sender, Receiver) = mpsc::channel(); - let server = WsServer { tx }; - server.start(rx); + let server = WsServer { tx }; + server.start(ctx, rx); - let mut server_guard = WEBSOCKET_SERVER.lock().unwrap(); - *server_guard = Some(server); + let mut server_guard = WEBSOCKET_SERVER.lock().unwrap(); + *server_guard = Some(server); - info!("WebSocket server started."); - - "Starting WebSocket Server" + info!("WebSocket server started."); + "Starting WebSocket Server" } pub fn message(payload: String) -> &'static str { - if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { - info!("Broadcasting message: {}", payload); - server.send_message(payload); - } else { - info!("WebSocket server is not running."); - } + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + info!("Broadcasting message: {}", payload); + server.send_message(payload); + } else { + info!("WebSocket server is not running."); + } - "Sending message to all WebSocket clients" + "Sending message to all WebSocket clients" } pub fn location(payload: LocationPayload) -> &'static str { - if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { - server.send_message(payload); - } else { - info!("WebSocket server is not running."); - } - "sending location to all WebSocket clients" + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + server.send_message(payload); + } else { + info!("WebSocket server is not running."); + } + + "Sending location to all WebSocket clients" +} + +pub fn stop() -> &'static str { + if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { + server.stop(); + } else { + info!("WebSocket server is not running."); + } + + "Stopping WebSocket Server" }