Switched WebSocket start motor to allow callbacks when client connects

This commit is contained in:
Valmo Trindade
2025-05-16 03:33:08 -03:00
parent 319ea9b32a
commit c1b00cb050
3 changed files with 143 additions and 80 deletions

View File

@@ -4,6 +4,8 @@ if (!hasInterface) exitWith {};
_local_address = "armatak" callExtension ["local_ip", []] select 0; _local_address = "armatak" callExtension ["local_ip", []] select 0;
CALLEXT(websocket:start);
SETVAR(player,GVAR(localAddress),_local_address); SETVAR(player,GVAR(localAddress),_local_address);
[{ [{

View File

@@ -31,10 +31,10 @@ pub fn init() -> Extension {
log4rs::init_config(config).unwrap(); log4rs::init_config(config).unwrap();
websocket::start();
Extension::build() Extension::build()
.group("websocket", Group::new() .group("websocket", Group::new()
.command("start", websocket::start)
.command("stop", websocket::stop)
.command("message", websocket::message) .command("message", websocket::message)
.command("location", websocket::location) .command("location", websocket::location)
) )

View File

@@ -1,114 +1,175 @@
use arma_rs::Context;
use lazy_static::lazy_static;
use log::info; use log::info;
use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use ws::{listen, Message, Result as WsResult}; use ws::{listen, CloseCode, Handler, Handshake, Message, Result as WsResult, Sender as WsSender};
use crate::structs::{IntoMessage, LocationPayload}; use crate::structs::{IntoMessage, LocationPayload};
use lazy_static::lazy_static;
pub enum WsCommand { pub enum WsCommand {
SendMessage(String), SendMessage(String),
Stop, Stop,
}
enum WsEvent {
FirstClientConnected,
LastClientDisconnected,
} }
pub struct WsServer { pub struct WsServer {
pub(crate) tx: Sender<WsCommand>, pub(crate) tx: Sender<WsCommand>,
}
struct WsHandler {
out: WsSender,
clients: Arc<Mutex<Vec<WsSender>>>,
event_tx: Sender<WsEvent>,
}
impl Handler for WsHandler {
fn on_open(&mut self, _: Handshake) -> WsResult<()> {
let mut clients = self.clients.lock().unwrap();
clients.push(self.out.clone());
let count = clients.len();
info!("New client connected. Total clients: {}", count);
if count == 1 {
let _ = self.event_tx.send(WsEvent::FirstClientConnected);
}
Ok(())
}
fn on_close(&mut self, _: CloseCode, _: &str) {
let mut clients = self.clients.lock().unwrap();
clients.retain(|client| client.token() != self.out.token());
let count = clients.len();
info!("Client disconnected. Total clients: {}", count);
if count == 0 {
let _ = self.event_tx.send(WsEvent::LastClientDisconnected);
}
}
fn on_message(&mut self, msg: Message) -> WsResult<()> {
info!("Received: {}", msg);
Ok(())
}
} }
impl WsServer { impl WsServer {
pub fn start(&self, rx: Receiver<WsCommand>) { pub fn start(&self, ctx: Context, rx: Receiver<WsCommand>) {
if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() {
server.stop(); server.stop();
} }
let clients = Arc::new(Mutex::new(Vec::new())); let clients = Arc::new(Mutex::new(Vec::new()));
let clients_clone = Arc::clone(&clients); let clients_clone = Arc::clone(&clients);
let (event_tx, event_rx): (Sender<WsEvent>, Receiver<WsEvent>) = mpsc::channel();
thread::spawn(move || { // Start WebSocket listener in a background thread
let mut running = true; thread::spawn(move || {
listen("0.0.0.0:4152", |out| WsHandler {
out,
clients: Arc::clone(&clients_clone),
event_tx: event_tx.clone(),
}).expect("WebSocket server failed to start");
});
let ws_thread = thread::spawn(move || { // This thread owns ctx and reacts to commands and events
listen("0.0.0.0:4152", |out| { thread::spawn(move || {
let clients_inner = Arc::clone(&clients_clone); loop {
{ // WebSocket message commands
let mut clients_guard = clients_inner.lock().unwrap(); match rx.try_recv() {
clients_guard.push(out.clone()); Ok(WsCommand::SendMessage(message)) => {
} let clients_guard = clients.lock().unwrap();
for client in clients_guard.iter() {
let _ = client.send(message.clone());
}
}
Ok(WsCommand::Stop) => {
info!("Stopping WebSocket server.");
break;
}
Err(_) => {}
}
move |msg: Message| -> WsResult<()> { // Handle event callbacks with valid Context
info!("Received: {}", msg); match event_rx.try_recv() {
Ok(()) Ok(WsEvent::FirstClientConnected) => {
} info!("Triggering callback: client_connected");
}) let _ = ctx.callback_null("armatak_websocket", "client_connected");
.unwrap(); }
}); Ok(WsEvent::LastClientDisconnected) => {
info!("Triggering callback: client_disconnected");
let _ = ctx.callback_null("armatak_websocket", "client_disconnected");
}
Err(_) => {}
}
while running { thread::sleep(std::time::Duration::from_millis(10));
match rx.recv() { }
Ok(WsCommand::SendMessage(message)) => { });
let clients_guard = clients.lock().unwrap(); }
for client in clients_guard.iter() {
client.send(message.clone()).unwrap();
}
}
Ok(WsCommand::Stop) => {
running = false;
info!("Stopping WebSocket server.");
}
Err(error) => {
info!("Error receiving command: {}", error.to_string());
}
}
}
ws_thread.join().unwrap(); pub fn send_message<T: IntoMessage>(&self, payload: T) {
}); let message = payload.into_message();
} let _ = self.tx.send(WsCommand::SendMessage(message));
}
pub fn send_message<T: IntoMessage>(&self, payload: T) { pub fn stop(&self) {
let message = payload.into_message(); let _ = self.tx.send(WsCommand::Stop);
self.tx.send(WsCommand::SendMessage(message)).unwrap(); }
}
pub fn stop(&self) {
self.tx.send(WsCommand::Stop).unwrap();
}
} }
lazy_static! { lazy_static! {
static ref WEBSOCKET_SERVER: Arc<Mutex<Option<WsServer>>> = Arc::new(Mutex::new(None)); static ref WEBSOCKET_SERVER: Arc<Mutex<Option<WsServer>>> = Arc::new(Mutex::new(None));
} }
pub fn start() -> &'static str { pub fn start(ctx: Context) -> &'static str {
let (tx, rx): (Sender<WsCommand>, Receiver<WsCommand>) = mpsc::channel(); let (tx, rx): (Sender<WsCommand>, Receiver<WsCommand>) = mpsc::channel();
let server = WsServer { tx }; let server = WsServer { tx };
server.start(rx); server.start(ctx, rx);
let mut server_guard = WEBSOCKET_SERVER.lock().unwrap(); let mut server_guard = WEBSOCKET_SERVER.lock().unwrap();
*server_guard = Some(server); *server_guard = Some(server);
info!("WebSocket server started."); info!("WebSocket server started.");
"Starting WebSocket Server"
"Starting WebSocket Server"
} }
pub fn message(payload: String) -> &'static str { pub fn message(payload: String) -> &'static str {
if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() {
info!("Broadcasting message: {}", payload); info!("Broadcasting message: {}", payload);
server.send_message(payload); server.send_message(payload);
} else { } else {
info!("WebSocket server is not running."); info!("WebSocket server is not running.");
} }
"Sending message to all WebSocket clients" "Sending message to all WebSocket clients"
} }
pub fn location(payload: LocationPayload) -> &'static str { pub fn location(payload: LocationPayload) -> &'static str {
if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() { if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() {
server.send_message(payload); server.send_message(payload);
} else { } else {
info!("WebSocket server is not running."); info!("WebSocket server is not running.");
} }
"sending location to all WebSocket clients"
"Sending location to all WebSocket clients"
}
pub fn stop() -> &'static str {
if let Some(ref server) = *WEBSOCKET_SERVER.lock().unwrap() {
server.stop();
} else {
info!("WebSocket server is not running.");
}
"Stopping WebSocket Server"
} }