shared tokio runtime

This commit is contained in:
Valmo Trindade
2024-10-16 23:43:04 -03:00
parent 319acf92b9
commit cb031d7c65
2 changed files with 33 additions and 25 deletions

View File

@@ -1,11 +1,19 @@
use std::{sync::Arc, thread};
use log::{error, info};
use tokio::{net::TcpListener, sync::{Mutex, oneshot}, task::JoinHandle, runtime::Runtime};
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message, WebSocketStream};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use serde_json::json;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{error, info};
use once_cell::sync::Lazy;
use serde_json::json;
use std::{sync::Arc, thread};
use tokio::net::TcpStream;
use tokio::{
net::TcpListener,
runtime::Runtime,
sync::{oneshot, Mutex},
task::JoinHandle,
};
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message, WebSocketStream};
use crate::structs::LocationPayload;
type WebSocket = WebSocketStream<TcpStream>; type WebSocket = WebSocketStream<TcpStream>;
@@ -27,21 +35,22 @@ impl ServerState {
lazy_static! { lazy_static! {
static ref SERVER_STATE: Arc<Mutex<ServerState>> = Arc::new(Mutex::new(ServerState::new())); static ref SERVER_STATE: Arc<Mutex<ServerState>> = Arc::new(Mutex::new(ServerState::new()));
static ref LOCATION_PAYLOAD: LocationPayload = LocationPayload {
latitude: 0.0,
longitude: 0.0,
altitude: 0.0,
bearing: 0.0,
};
} }
fn create_tokio_runtime() -> Runtime { pub static RUNTIME: Lazy<Runtime> =
tokio::runtime::Builder::new_multi_thread() Lazy::new(|| Runtime::new().expect("Failed to build the Tokio Runtime"));
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
}
pub fn start() -> &'static str { pub fn start() -> &'static str {
let rt = create_tokio_runtime();
let server_state = SERVER_STATE.clone(); let server_state = SERVER_STATE.clone();
thread::spawn(move || { thread::spawn(move || {
rt.block_on(async { RUNTIME.block_on(async {
let mut state = server_state.lock().await; let mut state = server_state.lock().await;
if state.handle.is_some() { if state.handle.is_some() {
@@ -54,8 +63,8 @@ pub fn start() -> &'static str {
state.handle = Some(tokio::spawn(async move { state.handle = Some(tokio::spawn(async move {
info!("Starting server..."); info!("Starting server...");
let listener = TcpListener::bind("192.168.15.8:8080").await.expect("Failed to bind"); let listener = TcpListener::bind("192.168.0.43:8080").await.expect("Failed to bind");
info!("WebSocket server running on ws://192.168.15.8:8080"); info!("WebSocket server running on ws://192.168.0.43:8080");
tokio::select! { tokio::select! {
_ = async { _ = async {
@@ -90,8 +99,7 @@ async fn handle_client(client: Arc<Mutex<WebSocket>>) {
pub fn send_ping() -> &'static str { pub fn send_ping() -> &'static str {
thread::spawn(move || { thread::spawn(move || {
let rt = create_tokio_runtime(); RUNTIME.block_on(async {
rt.block_on(async {
send_to_all_clients(Message::Text("Ping".into())).await; send_to_all_clients(Message::Text("Ping".into())).await;
}); });
}); });
@@ -100,10 +108,10 @@ pub fn send_ping() -> &'static str {
} }
pub fn send_location() -> &'static str { pub fn send_location() -> &'static str {
let current_position = LOCATION_PAYLOAD.clone();
thread::spawn(move || { thread::spawn(move || {
let rt = create_tokio_runtime(); RUNTIME.block_on(async {
rt.block_on(async { let location_data = json!(current_position);
let location_data = json!({ "location": "42.3601, -71.0589" });
send_to_all_clients(Message::Text(location_data.to_string())).await; send_to_all_clients(Message::Text(location_data.to_string())).await;
}); });
}); });
@@ -112,6 +120,7 @@ pub fn send_location() -> &'static str {
} }
async fn send_to_all_clients(message: Message) { async fn send_to_all_clients(message: Message) {
info!("Sending message to all clients: {:?}", message);
let state = SERVER_STATE.lock().await; let state = SERVER_STATE.lock().await;
for client in &state.clients { for client in &state.clients {
let mut client = client.lock().await; let mut client = client.lock().await;
@@ -125,8 +134,7 @@ pub fn stop() -> &'static str {
let server_state = SERVER_STATE.clone(); let server_state = SERVER_STATE.clone();
thread::spawn(move || { thread::spawn(move || {
let rt = create_tokio_runtime(); RUNTIME.block_on(async {
rt.block_on(async {
let mut state = server_state.lock().await; let mut state = server_state.lock().await;
if let Some(stop_tx) = state.stop_sender.take() { if let Some(stop_tx) = state.stop_sender.take() {

View File

@@ -11,7 +11,7 @@ pub fn init() -> Extension {
let file_appender = FileAppender::builder() let file_appender = FileAppender::builder()
.append(true) .append(true)
.encoder(Box::new(PatternEncoder::new("{d} {t} {l} - {m}{n}"))) .encoder(Box::new(PatternEncoder::new("{d} {t} - {m}{n}")))
.build("armatak.log") .build("armatak.log")
.unwrap(); .unwrap();