From a9f09b6ce6d6ee1db0f8e66216ad9c1709346610 Mon Sep 17 00:00:00 2001 From: Valmo Trindade Date: Tue, 5 May 2026 12:19:14 -0300 Subject: [PATCH] Added extension piece of code for handling the mavlink mocker --- src/cot/cot.rs | 17 +-- src/cot/mod.rs | 2 + src/cot/uas.rs | 330 ++++++++++++++++++++++++++++++++++++++++++++ src/cot/video.rs | 53 +++++++ src/lib.rs | 17 ++- src/mavlink_mock.rs | 239 ++++++++++++++++++++++++++++++++ src/mdns.rs | 207 +++++++++++++++++++++++++++ src/tcp/cot.rs | 32 +++++ src/udp_socket.rs | 57 ++++++-- src/video_stream.rs | 240 +++++++++++++++++++++++--------- 10 files changed, 1098 insertions(+), 96 deletions(-) create mode 100644 src/cot/uas.rs create mode 100644 src/cot/video.rs create mode 100644 src/mavlink_mock.rs create mode 100644 src/mdns.rs diff --git a/src/cot/cot.rs b/src/cot/cot.rs index 0177905..c214f6f 100644 --- a/src/cot/cot.rs +++ b/src/cot/cot.rs @@ -1,3 +1,4 @@ +use super::video::video_detail_xml; use chrono::{Duration, SecondsFormat, Utc}; use uuid::Uuid; @@ -20,14 +21,6 @@ pub struct CursorOverTime { } impl CursorOverTime { - fn escape_xml_attribute(value: &str) -> String { - value - .replace('&', "&") - .replace('"', """) - .replace('<', "<") - .replace('>', ">") - } - pub fn convert_to_xml(&self) -> String { let uuid = match &self.uuid { Some(uuid) => uuid, @@ -118,13 +111,7 @@ impl CursorOverTime { if let Some(video_url) = &self.video_url { if !video_url.trim().is_empty() { - xml.push_str( - format!( - "<__video url=\"{}\" />", - Self::escape_xml_attribute(video_url.trim()) - ) - .as_str(), - ); + xml.push_str(&video_detail_xml(video_url, uuid, &self.contact_callsign)); } } diff --git a/src/cot/mod.rs b/src/cot/mod.rs index 1f23edf..dc732b2 100644 --- a/src/cot/mod.rs +++ b/src/cot/mod.rs @@ -5,3 +5,5 @@ pub mod eud; pub mod gps; pub mod message; pub mod nato; +pub mod uas; +pub mod video; diff --git a/src/cot/uas.rs b/src/cot/uas.rs new file mode 100644 index 0000000..af0663a --- /dev/null +++ b/src/cot/uas.rs @@ -0,0 +1,330 @@ +use super::video::video_detail_xml; +use arma_rs::{FromArma, FromArmaError}; +use chrono::{Duration, SecondsFormat, Utc}; + +fn escape_xml(value: &str) -> String { + value + .replace('&', "&") + .replace('"', """) + .replace('<', "<") + .replace('>', ">") + .replace('\'', "'") +} + +fn parse_rtsp_url(url: &str) -> Option<(String, String, String)> { + let without_proto = url.strip_prefix("rtsp://")?; + let slash_pos = without_proto.find('/')?; + let host_port = &without_proto[..slash_pos]; + let path = &without_proto[slash_pos..]; + let colon_pos = host_port.rfind(':')?; + let address = host_port[..colon_pos].to_string(); + let port = host_port[colon_pos + 1..].to_string(); + Some((address, port, path.to_string())) +} + +pub struct UasPlatformCoTPayload { + pub uid: String, + pub cot_type: String, + pub callsign: String, + pub point_lat: f64, + pub point_lon: f64, + pub point_hae: f32, + pub track_course: i32, + pub track_speed: f32, + pub sensor_azimuth: i32, + pub sensor_elevation: i32, + pub sensor_fov: i32, + pub sensor_vfov: i32, + pub sensor_range: i32, + pub attitude_yaw: i32, + pub attitude_pitch: f32, + pub attitude_roll: f32, + pub hal: f32, + pub vehicle_type_tag: String, + pub is_flying: i32, + pub link_uid: String, +} + +impl FromArma for UasPlatformCoTPayload { + fn from_arma(data: String) -> Result { + let ( + uid, + cot_type, + callsign, + point_lat, + point_lon, + point_hae, + track_course, + track_speed, + sensor_azimuth, + sensor_elevation, + sensor_fov, + sensor_vfov, + sensor_range, + attitude_yaw, + attitude_pitch, + attitude_roll, + hal, + vehicle_type_tag, + is_flying, + link_uid, + ) = <( + String, + String, + String, + f64, + f64, + f32, + i32, + f32, + i32, + i32, + i32, + i32, + i32, + i32, + f32, + f32, + f32, + String, + i32, + String, + )>::from_arma(data)?; + + Ok(Self { + uid, + cot_type, + callsign, + point_lat, + point_lon, + point_hae, + track_course, + track_speed, + sensor_azimuth, + sensor_elevation, + sensor_fov, + sensor_vfov, + sensor_range, + attitude_yaw, + attitude_pitch, + attitude_roll, + hal, + vehicle_type_tag, + is_flying, + link_uid, + }) + } +} + +impl UasPlatformCoTPayload { + pub fn to_xml(&self) -> String { + let uid = escape_xml(&self.uid); + let cot_type = escape_xml(&self.cot_type); + let callsign = escape_xml(&self.callsign); + let link_uid = escape_xml(&self.link_uid); + let (vehicle_type_tag, video_url) = + match self.vehicle_type_tag.split_once("|armatak_video_url=") { + Some((vehicle_type_tag, video_url)) => ( + escape_xml(vehicle_type_tag), + Some(escape_xml(video_url.trim())).filter(|value| !value.is_empty()), + ), + None => (escape_xml(&self.vehicle_type_tag), None), + }; + let now = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true); + let stale = + (Utc::now() + Duration::milliseconds(3500)).to_rfc3339_opts(SecondsFormat::Millis, true); + + let mut xml = String::new(); + xml.push_str(""); + xml.push_str(&format!( + "", + cot_type = cot_type, + uid = uid, + now = now, + stale = stale, + )); + xml.push_str(&format!( + "", + lat = self.point_lat, + lon = self.point_lon, + hae = self.point_hae, + )); + xml.push_str(""); + xml.push_str("<_uastool extendedCot=\"true\" activeRoute=\"false\"/>"); + xml.push_str(&format!( + "", + self.track_course, + self.track_speed, + )); + xml.push_str(&format!( + "", + self.sensor_elevation, + self.sensor_vfov, + self.sensor_range, + self.sensor_azimuth, + self.sensor_fov, + )); + xml.push_str(&format!( + "", + self.attitude_roll, + self.attitude_pitch, + self.attitude_yaw, + )); + xml.push_str(&format!( + "", + self.hal, + vehicle_type_tag, + if self.is_flying != 0 { "true" } else { "false" }, + )); + xml.push_str("<_radio rssi=\"-2147483648\" gps=\"false\"/>"); + xml.push_str(&format!("", callsign)); + xml.push_str(""); + xml.push_str(&format!("<_route sender=\"{}\"/>", link_uid)); + xml.push_str(""); + if let Some(video_url) = video_url { + xml.push_str(&video_detail_xml(&video_url, &self.uid, &self.callsign)); + } else { + xml.push_str("<__video>"); + } + xml.push_str(&format!("", link_uid)); + xml.push_str(""); + xml + } +} + +pub struct UasVideoCoTPayload { + pub uid: String, + pub callsign: String, + pub video_url: String, +} + +impl FromArma for UasVideoCoTPayload { + fn from_arma(data: String) -> Result { + let (uid, callsign, video_url) = <(String, String, String)>::from_arma(data)?; + Ok(Self { + uid, + callsign, + video_url, + }) + } +} + +impl UasVideoCoTPayload { + pub fn to_xml(&self) -> String { + let (address, port, path) = match parse_rtsp_url(&self.video_url) { + Some(parts) => parts, + None => { + log::warn!( + "UasVideoCoTPayload: could not parse RTSP URL: {}", + self.video_url + ); + return String::new(); + } + }; + let callsign = escape_xml(&self.callsign); + let uid = escape_xml(&self.uid); + let address = escape_xml(&address); + let path = escape_xml(&path); + + let now = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true); + let stale = + (Utc::now() + Duration::seconds(3600)).to_rfc3339_opts(SecondsFormat::Millis, true); + + let mut xml = String::new(); + xml.push_str(""); + xml.push_str(&format!( + "", + uid = uid, + now = now, + stale = stale + )); + xml.push_str( + "", + ); + xml.push_str(""); + xml.push_str("<__video>"); + xml.push_str(&format!( + "", + path = path, + address = address, + port = port, + uid = uid, + callsign = callsign, + )); + xml.push_str(""); + xml.push_str(&format!("", callsign)); + xml.push_str(""); + xml.push_str(""); + xml + } +} + +pub struct UasSensorCoTPayload { + pub uid: String, + pub video_uid: String, + pub callsign: String, + pub point_lat: f64, + pub point_lon: f64, + pub point_hae: f32, + pub azimuth: i32, + pub fov: i32, + pub range: i32, +} + +impl FromArma for UasSensorCoTPayload { + fn from_arma(data: String) -> Result { + let (uid, video_uid, callsign, point_lat, point_lon, point_hae, azimuth, fov, range) = + <(String, String, String, f64, f64, f32, i32, i32, i32)>::from_arma(data)?; + Ok(Self { + uid, + video_uid, + callsign, + point_lat, + point_lon, + point_hae, + azimuth, + fov, + range, + }) + } +} + +impl UasSensorCoTPayload { + pub fn to_xml(&self) -> String { + let uid = escape_xml(&self.uid); + let video_uid = escape_xml(&self.video_uid); + let callsign = escape_xml(&self.callsign); + let now = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true); + let stale = + (Utc::now() + Duration::seconds(60)).to_rfc3339_opts(SecondsFormat::Millis, true); + + let mut xml = String::new(); + xml.push_str(""); + xml.push_str(&format!( + "", + uid = uid, + now = now, + stale = stale, + )); + xml.push_str(&format!( + "", + lat = self.point_lat, + lon = self.point_lon, + hae = self.point_hae, + )); + xml.push_str(""); + xml.push_str(&format!( + "", + fov = self.fov, + range = self.range, + az = self.azimuth, + )); + xml.push_str(&format!("<__video uid=\"{}\"/>", video_uid)); + xml.push_str(&format!("", callsign)); + xml.push_str(""); + xml.push_str(""); + xml + } +} + + diff --git a/src/cot/video.rs b/src/cot/video.rs new file mode 100644 index 0000000..68296e0 --- /dev/null +++ b/src/cot/video.rs @@ -0,0 +1,53 @@ +fn escape_xml_attribute(value: &str) -> String { + value + .replace('&', "&") + .replace('"', """) + .replace('<', "<") + .replace('>', ">") + .replace('\'', "'") +} + +fn parse_video_url(url: &str) -> Option<(String, String, String, String)> { + let (protocol, rest) = url.trim().split_once("://")?; + let (authority, path) = match rest.split_once('/') { + Some((authority, path)) => (authority, format!("/{}", path)), + None => (rest, String::new()), + }; + let host_port = authority.rsplit_once('@').map_or(authority, |(_, host_port)| host_port); + let (address, port) = host_port.rsplit_once(':')?; + + if protocol.is_empty() || address.is_empty() || port.is_empty() { + return None; + } + + Some(( + protocol.to_ascii_lowercase(), + address.to_string(), + port.to_string(), + path, + )) +} + +pub fn video_detail_xml(video_url: &str, uid: &str, callsign: &str) -> String { + let trimmed_url = video_url.trim(); + if trimmed_url.is_empty() { + return "<__video>".to_string(); + } + + let Some((protocol, address, port, path)) = parse_video_url(trimmed_url) else { + return format!( + "<__video url=\"{}\"/>", + escape_xml_attribute(trimmed_url) + ); + }; + + format!( + "<__video>", + escape_xml_attribute(&protocol), + escape_xml_attribute(&path), + escape_xml_attribute(&address), + escape_xml_attribute(&port), + escape_xml_attribute(uid), + escape_xml_attribute(callsign), + ) +} diff --git a/src/lib.rs b/src/lib.rs index f3cfb53..fae878d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ use arma_rs::{arma, Extension, Group}; use rustls::crypto::aws_lc_rs; +mod mavlink_mock; +mod mdns; mod structs; mod tcp; mod tests; @@ -39,6 +41,16 @@ pub fn init() -> Extension { .command("local_ip", utils::address::get_local_address) .command("uuid", utils::uuid::get_uuid) .command("log", utils::log::log_info) + .group( + "mavlink_mock", + Group::new().command("send_uas_telemetry", mavlink_mock::send_uas_telemetry), + ) + .group( + "mdns", + Group::new() + .command("start_uas_advertisement", mdns::start_uas_advertisement) + .command("stop", mdns::stop), + ) .group( "udp_socket", Group::new() @@ -61,7 +73,10 @@ pub fn init() -> Extension { .command("eud", tcp::cot::send_eud_cot) .command("marker", tcp::cot::send_marker_cot) .command("digital_pointer", tcp::cot::send_digital_pointer_cot) - .command("chat", tcp::cot::send_message_cot), + .command("chat", tcp::cot::send_message_cot) + .command("uas_platform", tcp::cot::send_uas_platform_cot) + .command("uas_video", tcp::cot::send_uas_video_cot) + .command("uas_sensor", tcp::cot::send_uas_sensor_cot), ) .group( "draw", diff --git a/src/mavlink_mock.rs b/src/mavlink_mock.rs new file mode 100644 index 0000000..b781fce --- /dev/null +++ b/src/mavlink_mock.rs @@ -0,0 +1,239 @@ +use arma_rs::{Context, FromArma, FromArmaError}; +use chrono::Utc; +use log::info; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicU8, Ordering}; + +static MAVLINK_SEQUENCE: AtomicU8 = AtomicU8::new(0); + +pub struct UasTelemetryPayload { + pub address: String, + pub system_id: u8, + pub component_id: u8, + pub vehicle_type: u8, + pub lat_deg: f64, + pub lon_deg: f64, + pub alt_msl_m: f32, + pub rel_alt_m: f32, + pub heading_deg: f32, + pub groundspeed_mps: f32, + pub roll_deg: f32, + pub pitch_deg: f32, + pub yaw_deg: f32, + pub flying: bool, +} + +impl FromArma for UasTelemetryPayload { + fn from_arma(data: String) -> Result { + let ( + address, + system_id, + component_id, + vehicle_type, + lat_deg, + lon_deg, + alt_msl_m, + rel_alt_m, + heading_deg, + groundspeed_mps, + roll_deg, + pitch_deg, + yaw_deg, + flying, + ) = <( + String, + i32, + i32, + i32, + f64, + f64, + f32, + f32, + f32, + f32, + f32, + f32, + f32, + i32, + )>::from_arma(data)?; + + Ok(Self { + address, + system_id: system_id.clamp(1, 255) as u8, + component_id: component_id.clamp(1, 255) as u8, + vehicle_type: vehicle_type.clamp(0, 255) as u8, + lat_deg, + lon_deg, + alt_msl_m, + rel_alt_m, + heading_deg, + groundspeed_mps, + roll_deg, + pitch_deg, + yaw_deg, + flying: flying != 0, + }) + } +} + +fn crc_accumulate(byte: u8, crc: &mut u16) { + let mut tmp = byte ^ (*crc as u8); + tmp ^= tmp << 4; + *crc = (*crc >> 8) ^ ((tmp as u16) << 8) ^ ((tmp as u16) << 3) ^ ((tmp as u16) >> 4); +} + +fn mavlink_crc(header_and_payload: &[u8], crc_extra: u8) -> u16 { + let mut crc = 0xFFFFu16; + for byte in header_and_payload { + crc_accumulate(*byte, &mut crc); + } + crc_accumulate(crc_extra, &mut crc); + crc +} + +fn build_v1_packet(system_id: u8, component_id: u8, msg_id: u8, payload: &[u8], crc_extra: u8) -> Vec { + let seq = MAVLINK_SEQUENCE.fetch_add(1, Ordering::Relaxed); + let mut packet = Vec::with_capacity(payload.len() + 8); + packet.push(0xFE); + packet.push(payload.len() as u8); + packet.push(seq); + packet.push(system_id); + packet.push(component_id); + packet.push(msg_id); + packet.extend_from_slice(payload); + + let crc = mavlink_crc(&packet[1..], crc_extra); + packet.push((crc & 0xFF) as u8); + packet.push((crc >> 8) as u8); + packet +} + +fn heartbeat_packet(payload: &UasTelemetryPayload) -> Vec { + let mut msg = Vec::with_capacity(9); + msg.extend_from_slice(&0u32.to_le_bytes()); + msg.push(payload.vehicle_type); + msg.push(0); + msg.push(if payload.flying { 0x81 } else { 0x01 }); + msg.push(if payload.flying { 4 } else { 3 }); + msg.push(3); + build_v1_packet(payload.system_id, payload.component_id, 0, &msg, 50) +} + +fn gps_raw_int_packet(payload: &UasTelemetryPayload) -> Vec { + let now_us = Utc::now().timestamp_micros().max(0) as u64; + let lat = (payload.lat_deg * 1e7).round() as i32; + let lon = (payload.lon_deg * 1e7).round() as i32; + let alt = (payload.alt_msl_m * 1000.0).round() as i32; + let speed_cms = (payload.groundspeed_mps.max(0.0) * 100.0).round() as u16; + let cog_cdeg = (((payload.heading_deg % 360.0 + 360.0) % 360.0) * 100.0).round() as u16; + + let mut msg = Vec::with_capacity(30); + msg.extend_from_slice(&now_us.to_le_bytes()); + msg.extend_from_slice(&lat.to_le_bytes()); + msg.extend_from_slice(&lon.to_le_bytes()); + msg.extend_from_slice(&alt.to_le_bytes()); + msg.extend_from_slice(&100u16.to_le_bytes()); + msg.extend_from_slice(&100u16.to_le_bytes()); + msg.extend_from_slice(&speed_cms.to_le_bytes()); + msg.extend_from_slice(&cog_cdeg.to_le_bytes()); + msg.push(3); + msg.push(10); + build_v1_packet(payload.system_id, payload.component_id, 24, &msg, 24) +} + +fn global_position_int_packet(payload: &UasTelemetryPayload) -> Vec { + let now_ms = Utc::now().timestamp_millis().max(0) as u32; + let lat = (payload.lat_deg * 1e7).round() as i32; + let lon = (payload.lon_deg * 1e7).round() as i32; + let alt = (payload.alt_msl_m * 1000.0).round() as i32; + let rel_alt = (payload.rel_alt_m.max(0.0) * 1000.0).round() as i32; + let speed_cms = (payload.groundspeed_mps.max(0.0) * 100.0).round() as i16; + let hdg_cdeg = (((payload.heading_deg % 360.0 + 360.0) % 360.0) * 100.0).round() as u16; + + let mut msg = Vec::with_capacity(28); + msg.extend_from_slice(&now_ms.to_le_bytes()); + msg.extend_from_slice(&lat.to_le_bytes()); + msg.extend_from_slice(&lon.to_le_bytes()); + msg.extend_from_slice(&alt.to_le_bytes()); + msg.extend_from_slice(&rel_alt.to_le_bytes()); + msg.extend_from_slice(&speed_cms.to_le_bytes()); + msg.extend_from_slice(&0i16.to_le_bytes()); + msg.extend_from_slice(&0i16.to_le_bytes()); + msg.extend_from_slice(&hdg_cdeg.to_le_bytes()); + build_v1_packet(payload.system_id, payload.component_id, 33, &msg, 104) +} + +fn attitude_packet(payload: &UasTelemetryPayload) -> Vec { + let now_ms = Utc::now().timestamp_millis().max(0) as u32; + let roll = payload.roll_deg.to_radians(); + let pitch = payload.pitch_deg.to_radians(); + let yaw = payload.yaw_deg.to_radians(); + + let mut msg = Vec::with_capacity(28); + msg.extend_from_slice(&now_ms.to_le_bytes()); + msg.extend_from_slice(&roll.to_le_bytes()); + msg.extend_from_slice(&pitch.to_le_bytes()); + msg.extend_from_slice(&yaw.to_le_bytes()); + msg.extend_from_slice(&0f32.to_le_bytes()); + msg.extend_from_slice(&0f32.to_le_bytes()); + msg.extend_from_slice(&0f32.to_le_bytes()); + build_v1_packet(payload.system_id, payload.component_id, 30, &msg, 39) +} + +fn vfr_hud_packet(payload: &UasTelemetryPayload) -> Vec { + let heading = (((payload.heading_deg % 360.0 + 360.0) % 360.0).round()) as i16; + let throttle = if payload.flying { 50u16 } else { 0u16 }; + + let mut msg = Vec::with_capacity(20); + msg.extend_from_slice(&payload.groundspeed_mps.to_le_bytes()); + msg.extend_from_slice(&payload.groundspeed_mps.to_le_bytes()); + msg.extend_from_slice(&payload.alt_msl_m.to_le_bytes()); + msg.extend_from_slice(&0f32.to_le_bytes()); + msg.extend_from_slice(&heading.to_le_bytes()); + msg.extend_from_slice(&throttle.to_le_bytes()); + build_v1_packet(payload.system_id, payload.component_id, 74, &msg, 20) +} + +pub fn send_uas_telemetry(ctx: Context, payload: UasTelemetryPayload) -> &'static str { + info!( + "MAVLink mock send requested to {} sysid={} compid={} lat={} lon={} alt_msl={} rel_alt={} heading={} speed={} flying={}", + payload.address, + payload.system_id, + payload.component_id, + payload.lat_deg, + payload.lon_deg, + payload.alt_msl_m, + payload.rel_alt_m, + payload.heading_deg, + payload.groundspeed_mps, + payload.flying + ); + + let socket = match UdpSocket::bind("0.0.0.0:0") { + Ok(socket) => socket, + Err(error) => { + let _ = ctx.callback_data("MAVLINK MOCK ERROR", "Failed to bind UDP socket", error.to_string()); + info!("MAVLink mock failed to bind UDP socket: {}", error); + return "Failed to bind MAVLink mock socket"; + } + }; + + let packets = [ + heartbeat_packet(&payload), + gps_raw_int_packet(&payload), + global_position_int_packet(&payload), + attitude_packet(&payload), + vfr_hud_packet(&payload), + ]; + + for (index, packet) in packets.iter().enumerate() { + if let Err(error) = socket.send_to(packet, &payload.address) { + let _ = ctx.callback_data("MAVLINK MOCK ERROR", "Failed to send MAVLink packet", error.to_string()); + info!("MAVLink mock failed sending packet {} to {}: {}", index, payload.address, error); + return "Failed to send MAVLink mock telemetry"; + } + } + + info!("MAVLink mock sent {} packets to {}", packets.len(), payload.address); + "Sent MAVLink mock telemetry" +} diff --git a/src/mdns.rs b/src/mdns.rs new file mode 100644 index 0000000..c56362b --- /dev/null +++ b/src/mdns.rs @@ -0,0 +1,207 @@ +use arma_rs::Context; +use lazy_static::lazy_static; +use log::info; +use std::net::{Ipv4Addr, SocketAddrV4, UdpSocket}; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::Mutex; +use std::thread; +use std::time::Duration; + +lazy_static! { + static ref MDNS_CTRL: Mutex>> = Mutex::new(None); +} + +fn detect_local_ipv4() -> Result { + let socket = UdpSocket::bind("0.0.0.0:0").map_err(|e| e.to_string())?; + socket.connect("8.8.8.8:80").map_err(|e| e.to_string())?; + match socket.local_addr().map_err(|e| e.to_string())? { + std::net::SocketAddr::V4(addr) => Ok(*addr.ip()), + std::net::SocketAddr::V6(_) => Err("Local address is not IPv4".to_string()), + } +} + +fn sanitize_label(value: &str, fallback: &str) -> String { + let mut sanitized = value + .chars() + .map(|c| if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' }) + .collect::() + .trim_matches('-') + .to_string(); + + if sanitized.is_empty() { + sanitized = fallback.to_string(); + } + + if sanitized.len() > 63 { + sanitized.truncate(63); + } + + sanitized +} + +fn encode_name(name: &str) -> Vec { + let mut encoded = Vec::new(); + for label in name.split('.') { + let bytes = label.as_bytes(); + encoded.push(bytes.len() as u8); + encoded.extend_from_slice(bytes); + } + encoded.push(0); + encoded +} + +fn push_u16(buf: &mut Vec, value: u16) { + buf.extend_from_slice(&value.to_be_bytes()); +} + +fn push_u32(buf: &mut Vec, value: u32) { + buf.extend_from_slice(&value.to_be_bytes()); +} + +fn push_record(buf: &mut Vec, name: &str, rr_type: u16, rr_class: u16, ttl: u32, rdata: &[u8]) { + buf.extend_from_slice(&encode_name(name)); + push_u16(buf, rr_type); + push_u16(buf, rr_class); + push_u32(buf, ttl); + push_u16(buf, rdata.len() as u16); + buf.extend_from_slice(rdata); +} + +fn build_mdns_packet(instance_name: &str, host_name: &str, ip: Ipv4Addr, port: u16, video_uri: &str) -> Vec { + let service_type = "_mavlink._udp.local"; + let instance_fqdn = format!("{}.{}", instance_name, service_type); + let host_fqdn = format!("{}.local", host_name); + + let mut packet = Vec::new(); + push_u16(&mut packet, 0); + push_u16(&mut packet, 0x8400); + push_u16(&mut packet, 0); + push_u16(&mut packet, 4); + push_u16(&mut packet, 0); + push_u16(&mut packet, 0); + + let ptr_rdata = encode_name(&instance_fqdn); + push_record(&mut packet, service_type, 12, 0x0001, 120, &ptr_rdata); + + let mut srv_rdata = Vec::new(); + push_u16(&mut srv_rdata, 0); + push_u16(&mut srv_rdata, 0); + push_u16(&mut srv_rdata, port); + srv_rdata.extend_from_slice(&encode_name(&host_fqdn)); + push_record(&mut packet, &instance_fqdn, 33, 0x8001, 120, &srv_rdata); + + let txt_value = format!("uri={}", video_uri); + let txt_bytes = txt_value.as_bytes(); + let mut txt_rdata = Vec::new(); + txt_rdata.push(txt_bytes.len() as u8); + txt_rdata.extend_from_slice(txt_bytes); + push_record(&mut packet, &instance_fqdn, 16, 0x8001, 120, &txt_rdata); + + let a_rdata = ip.octets(); + push_record(&mut packet, &host_fqdn, 1, 0x8001, 120, &a_rdata); + + packet +} + +fn stop_existing() { + if let Ok(mut lock) = MDNS_CTRL.lock() { + if let Some(tx) = lock.take() { + let _ = tx.send(()); + } + } +} + +pub fn start_uas_advertisement( + ctx: Context, + instance_name: String, + mavlink_port: i32, + video_uri: String, +) -> &'static str { + stop_existing(); + + let local_ip = match detect_local_ipv4() { + Ok(ip) => ip, + Err(error) => { + let _ = ctx.callback_data("MDNS ERROR", "Failed to determine local IPv4", error.clone()); + return "mdns local IPv4 error"; + } + }; + + let port = mavlink_port.clamp(1, 65535) as u16; + let safe_instance = sanitize_label(&instance_name, "ArmaTAK-UAS"); + let host_label = sanitize_label( + &format!("armatak-{}", safe_instance.to_lowercase()), + "armatak-uas-host", + ); + let packet = build_mdns_packet(&safe_instance, &host_label, local_ip, port, &video_uri); + let callback_video_uri = video_uri.clone(); + let multicast_addr = SocketAddrV4::new(Ipv4Addr::new(224, 0, 0, 251), 5353); + + let (stop_tx, stop_rx): (Sender<()>, Receiver<()>) = mpsc::channel(); + if let Ok(mut lock) = MDNS_CTRL.lock() { + *lock = Some(stop_tx); + } + + thread::spawn(move || { + let socket = match UdpSocket::bind("0.0.0.0:0") { + Ok(socket) => socket, + Err(error) => { + info!("mDNS failed to bind UDP socket: {}", error); + return; + } + }; + + let _ = socket.set_multicast_ttl_v4(255); + let _ = socket.set_multicast_loop_v4(true); + + info!( + "Starting mDNS UAS advertisement instance={} host={} ip={} port={} video_uri={}", + safe_instance, host_label, local_ip, port, video_uri + ); + + loop { + match socket.send_to(&packet, multicast_addr) { + Ok(size) => info!("Sent mDNS UAS advertisement ({} bytes) to {}", size, multicast_addr), + Err(error) => info!("Failed sending mDNS UAS advertisement: {}", error), + } + + match stop_rx.recv_timeout(Duration::from_secs(5)) { + Ok(_) => break, + Err(mpsc::RecvTimeoutError::Timeout) => {} + Err(_) => break, + } + } + + info!("Stopped mDNS UAS advertisement for instance={}", safe_instance); + }); + + let _ = ctx.callback_data( + "MDNS", + "UAS advertisement started", + format!("{}:{} | {}", local_ip, port, callback_video_uri), + ); + + "starting mdns uas advertisement" +} + +pub fn stop(ctx: Context) -> &'static str { + let had_running = match MDNS_CTRL.lock() { + Ok(mut lock) => { + if let Some(tx) = lock.take() { + let _ = tx.send(()); + true + } else { + false + } + } + Err(_) => false, + }; + + if had_running { + let _ = ctx.callback_null("MDNS", "UAS advertisement stopped"); + "stopping mdns advertisement" + } else { + let _ = ctx.callback_null("MDNS ERROR", "No mDNS advertisement is running"); + "no mdns advertisement running" + } +} diff --git a/src/tcp/cot.rs b/src/tcp/cot.rs index a61a82f..ff863cc 100644 --- a/src/tcp/cot.rs +++ b/src/tcp/cot.rs @@ -39,3 +39,35 @@ pub fn send_message_cot( "Sending Message CoT to TCP server" } + +pub fn send_uas_platform_cot( + ctx: Context, + payload: cot::uas::UasPlatformCoTPayload, +) -> &'static str { + let xml = payload.to_xml(); + send_payload(ctx, xml); + + "Sending UAS Platform main CoT to TCP server" +} + +pub fn send_uas_video_cot( + ctx: Context, + payload: cot::uas::UasVideoCoTPayload, +) -> &'static str { + let xml = payload.to_xml(); + if !xml.is_empty() { + send_payload(ctx, xml); + } + + "Sending UAS Video (b-i-v) CoT to TCP server" +} + +pub fn send_uas_sensor_cot( + ctx: Context, + payload: cot::uas::UasSensorCoTPayload, +) -> &'static str { + let xml = payload.to_xml(); + send_payload(ctx, xml); + + "Sending UAS Sensor (b-m-p-s-p-loc) CoT to TCP server" +} diff --git a/src/udp_socket.rs b/src/udp_socket.rs index a135ac2..ea0c812 100644 --- a/src/udp_socket.rs +++ b/src/udp_socket.rs @@ -15,15 +15,14 @@ pub enum UdpCommand { pub struct UdpClient { pub(crate) tx: Sender, + pub(crate) address: String, } impl UdpClient { pub fn start(&self, address: String, rx: Receiver, ctx: Context) { - if let Some(ref client) = *UDP_CLIENT.lock().unwrap() { - client.stop(); - } - thread::spawn(move || { + info!("Starting UDP client thread for destination {}", address); + let socket = match UdpSocket::bind("0.0.0.0:0") { Ok(s) => s, Err(e) => { @@ -32,19 +31,28 @@ impl UdpClient { "Failed to bind UDP socket", e.to_string(), ); - info!("Failed to bind UDP socket: {}", e); + info!("Failed to bind UDP socket for {}: {}", address, e); return; } }; + if let Ok(local_addr) = socket.local_addr() { + info!( + "UDP client bound local socket {} for destination {}", + local_addr, address + ); + } + let _ = ctx.callback_data("UDP SOCKET", "EUD Connected", address.clone()); + info!("UDP client reported EUD Connected for {}", address); let mut running = true; while running { match rx.recv() { Ok(UdpCommand::SendMessage(message, context)) => { + info!("UDP client sending {} bytes to {}", message.len(), address); if let Err(e) = socket.send_to(message.as_bytes(), &address) { - info!("Failed to send UDP message: {}", e); + info!("Failed to send UDP message to {}: {}", address, e); let _ = context.callback_data( "UDP SOCKET ERROR", "Failed to send UDP message", @@ -54,13 +62,15 @@ impl UdpClient { } Ok(UdpCommand::Stop) => { running = false; - info!("Stopping UDP client."); + info!("Stopping UDP client for {}", address); } Err(error) => { - info!("Error receiving command: {}", error.to_string()); + info!("Error receiving UDP command for {}: {}", address, error); } } } + + info!("UDP client thread exited for {}", address); }); } @@ -73,7 +83,9 @@ impl UdpClient { pub fn stop(&self) { let tx = self.tx.clone(); + let address = self.address.clone(); thread::spawn(move || { + info!("Queueing stop for UDP client {}", address); tx.send(UdpCommand::Stop).unwrap(); }); } @@ -84,13 +96,31 @@ lazy_static! { } pub fn start(ctx: Context, address: String) -> &'static str { + info!("UDP socket start requested for {}", address); + let (tx, rx): (Sender, Receiver) = mpsc::channel(); - let client = UdpClient { tx }; - client.start(address, rx, ctx); + let client = UdpClient { + tx, + address: address.clone(), + }; - let mut client_guard = UDP_CLIENT.lock().unwrap(); - *client_guard = Some(client); + { + let mut client_guard = UDP_CLIENT.lock().unwrap(); + if let Some(ref existing_client) = *client_guard { + info!( + "Stopping previous UDP client {} before starting {}", + existing_client.address, address + ); + existing_client.stop(); + } + *client_guard = Some(UdpClient { + tx: client.tx.clone(), + address: client.address.clone(), + }); + } + + client.start(address, rx, ctx); "Starting UDP Client" } @@ -100,6 +130,7 @@ pub fn send_payload(ctx: Context, payload: String) -> &'static str { client.send_payload(ctx, payload); } else { let _ = ctx.callback_null("UDP SOCKET ERROR", "UDP Socket is not running"); + info!("UDP send requested while socket was not running"); } "Sending payload to UDP server" @@ -117,10 +148,12 @@ pub fn send_gps_cot( pub fn stop(ctx: Context) -> &'static str { if let Some(ref client) = *UDP_CLIENT.lock().unwrap() { + info!("UDP socket stop requested for {}", client.address); client.stop(); let _ = ctx.callback_null("UDP SOCKET", "EUD Disconnected"); } else { let _ = ctx.callback_null("UDP SOCKET ERROR", "UDP Socket is not running"); + info!("UDP stop requested while socket was not running"); } "Stopping UDP Client" diff --git a/src/video_stream.rs b/src/video_stream.rs index ec059e9..0670576 100644 --- a/src/video_stream.rs +++ b/src/video_stream.rs @@ -1,5 +1,6 @@ use arma_rs::Context; use lazy_static::lazy_static; +use log::info; use std::process::Command; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::Mutex; @@ -16,53 +17,34 @@ lazy_static! { #[cfg(target_os = "windows")] const CREATE_NO_WINDOW: u32 = 0x08000000; -fn build_rtsp_url( - address: &str, - port: &str, - stream_path: &str, - username: &str, - password: &str, -) -> String { - if username.is_empty() || password.is_empty() { - format!("rtsp://{}:{}/{}", address, port, stream_path) - } else { - format!( - "rtsp://{}:{}@{}:{}/{}", - username, password, address, port, stream_path - ) +fn stop_existing_stream() { + if let Ok(mut lock) = STREAM_CTRL.lock() { + if let Some(tx) = lock.take() { + let _ = tx.send(()); + } } } -#[cfg(any(target_os = "windows", target_os = "linux"))] -fn spawn_ffmpeg(rtsp_url: String, stop_rx: Receiver<()>, status_tx: Sender>) { +fn spawn_ffmpeg_with_args( + mut cmd: Command, + stop_rx: Receiver<()>, + status_tx: Sender>, + description: String, +) { thread::spawn(move || { - let mut cmd = Command::new("ffmpeg"); - cmd.args(&[ - "-f", - "x11grab", - "-framerate", - "30", - "-video_size", - "1920x1080", - "-i", - ":0", - "-f", - "rtsp", - "-rtsp_transport", - "tcp", - &rtsp_url, - ]); + info!("Starting FFmpeg video stream: {}", description); #[cfg(target_os = "windows")] let child_result = cmd.creation_flags(CREATE_NO_WINDOW).spawn(); - #[cfg(target_os = "linux")] + #[cfg(not(target_os = "windows"))] let child_result = cmd.spawn(); match child_result { Ok(mut child) => { let _ = status_tx.send(Ok(())); let _ = stop_rx.recv(); + info!("Stopping FFmpeg video stream: {}", description); let _ = child.kill(); } Err(e) => { @@ -80,47 +62,169 @@ pub fn start_stream( username: String, password: String, ) -> &'static str { - #[cfg(any(target_os = "windows", target_os = "linux"))] - { - let (stop_tx, stop_rx) = mpsc::channel(); - let (status_tx, status_rx) = mpsc::channel(); + stop_existing_stream(); - let rtsp_url = build_rtsp_url(&address, &port, &stream_path, &username, &password); + let (stop_tx, stop_rx) = mpsc::channel(); + let (status_tx, status_rx) = mpsc::channel(); - spawn_ffmpeg(rtsp_url, stop_rx, status_tx); + let rtsp_url = if username.is_empty() || password.is_empty() { + format!("rtsp://{}:{}/{}", address, port, stream_path) + } else { + format!("rtsp://{}:{}@{}:{}/{}", username, password, address, port, stream_path) + }; - match STREAM_CTRL.lock() { - Ok(mut lock) => *lock = Some(stop_tx), - Err(e) => { - let _ = ctx.callback_data( - "VIDEO ERROR", - "Failed to acquire lock for stream control", - e.to_string(), - ); - return "stream control lock error"; - } - } + let mut cmd = Command::new("ffmpeg"); + #[cfg(target_os = "windows")] + cmd.args([ + "-f", + "gdigrab", + "-framerate", + "15", + "-i", + "desktop", + "-an", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-pix_fmt", + "yuv420p", + "-f", + "rtsp", + "-rtsp_transport", + "tcp", + &rtsp_url, + ]); - match status_rx.recv_timeout(Duration::from_secs(2)) { - Ok(Ok(())) => { - let _ = ctx.callback_null("VIDEO", "FFmpeg started successfully"); - "starting video stream" - } - Ok(Err(e)) => { - let _ = ctx.callback_data("VIDEO ERROR", "FFmpeg failed to start", e); - "ffmpeg failed to start" - } - Err(_) => { - let _ = ctx.callback_null("VIDEO ERROR", "FFmpeg did not respond in time"); - "ffmpeg did not respond" - } - } + #[cfg(target_os = "linux")] + cmd.args([ + "-f", + "x11grab", + "-framerate", + "15", + "-video_size", + "1280x720", + "-i", + ":0.0", + "-an", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-pix_fmt", + "yuv420p", + "-f", + "rtsp", + "-rtsp_transport", + "tcp", + &rtsp_url, + ]); + + spawn_ffmpeg_with_args(cmd, stop_rx, status_tx, format!("RTSP {}", rtsp_url)); + + if let Ok(mut lock) = STREAM_CTRL.lock() { + *lock = Some(stop_tx); } - #[cfg(not(any(target_os = "windows", target_os = "linux")))] - { - ctx.callback_null("VIDEO ERROR", "Screen capture is only supported on Windows"); - "unsupported platform" + match status_rx.recv_timeout(Duration::from_secs(2)) { + Ok(Ok(())) => { + let _ = ctx.callback_null("VIDEO", "FFmpeg RTSP stream started successfully"); + "starting video stream" + } + Ok(Err(e)) => { + let _ = ctx.callback_data("VIDEO ERROR", "FFmpeg failed to start", e); + "ffmpeg failed to start" + } + Err(_) => { + let _ = ctx.callback_null("VIDEO ERROR", "FFmpeg did not respond in time"); + "ffmpeg did not respond" + } + } +} + +pub fn start_rtp_stream(ctx: Context, address: String, port: String) -> &'static str { + stop_existing_stream(); + + let (stop_tx, stop_rx) = mpsc::channel(); + let (status_tx, status_rx) = mpsc::channel(); + let rtp_url = format!("rtp://{}:{}", address, port); + + let mut cmd = Command::new("ffmpeg"); + + #[cfg(target_os = "windows")] + cmd.args([ + "-f", + "gdigrab", + "-framerate", + "15", + "-i", + "desktop", + "-an", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-pix_fmt", + "yuv420p", + "-g", + "30", + "-f", + "rtp", + &rtp_url, + ]); + + #[cfg(target_os = "linux")] + cmd.args([ + "-f", + "x11grab", + "-framerate", + "15", + "-video_size", + "1280x720", + "-i", + ":0.0", + "-an", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-pix_fmt", + "yuv420p", + "-g", + "30", + "-f", + "rtp", + &rtp_url, + ]); + + spawn_ffmpeg_with_args(cmd, stop_rx, status_tx, format!("RTP {}", rtp_url)); + + if let Ok(mut lock) = STREAM_CTRL.lock() { + *lock = Some(stop_tx); + } + + match status_rx.recv_timeout(Duration::from_secs(2)) { + Ok(Ok(())) => { + info!("Started RTP video stream toward {}", rtp_url); + let _ = ctx.callback_null("VIDEO", "FFmpeg RTP stream started successfully"); + "starting RTP video stream" + } + Ok(Err(e)) => { + let _ = ctx.callback_data("VIDEO ERROR", "FFmpeg failed to start RTP stream", e); + "ffmpeg failed to start RTP stream" + } + Err(_) => { + let _ = ctx.callback_null("VIDEO ERROR", "FFmpeg RTP stream did not respond in time"); + "ffmpeg RTP stream did not respond" + } } }