Skip to main content

Grove/Transport/
Strategy.rs

1//! Transport Strategy Module
2//!
3//! Defines the transport strategy trait and types for different
4//! communication methods (gRPC, IPC, WASM).
5
6use std::fmt;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12use crate::Transport::{IPCTransport::IPCTransport, WASMTransport::WASMTransportImpl, gRPCTransport::gRPCTransport};
13
14/// Transport strategy trait
15///
16/// All transport implementations must implement this trait to provide
17/// a common interface for connecting, sending, and closing connections.
18#[async_trait]
19pub trait TransportStrategy: Send + Sync {
20	/// Error type for this transport
21	type Error: std::error::Error + Send + Sync + 'static;
22
23	/// Connect to the transport endpoint
24	async fn connect(&self) -> Result<(), Self::Error>;
25
26	/// Send a request and receive a response
27	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error>;
28
29	/// Send data without expecting a response (fire and forget)
30	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error>;
31
32	/// Close the transport connection
33	async fn close(&self) -> Result<(), Self::Error>;
34
35	/// Check if the transport is connected
36	fn is_connected(&self) -> bool;
37
38	/// Get the transport type identifier
39	fn transport_type(&self) -> TransportType;
40}
41
42/// Transport type enumeration
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum TransportType {
45	/// gRPC transport
46	gRPC,
47
48	/// Inter-process communication
49	IPC,
50
51	/// Direct WASM module communication
52	WASM,
53
54	/// Unknown/unspecified transport
55	Unknown,
56}
57
58impl fmt::Display for TransportType {
59	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result {
60		match self {
61			Self::gRPC => write!(f, "grpc"),
62
63			Self::IPC => write!(f, "ipc"),
64
65			Self::WASM => write!(f, "wasm"),
66
67			Self::Unknown => write!(f, "unknown"),
68		}
69	}
70}
71
72impl std::str::FromStr for TransportType {
73	type Err = anyhow::Error;
74
75	fn from_str(s:&str) -> Result<Self, Self::Err> {
76		match s.to_lowercase().as_str() {
77			"grpc" => Ok(Self::gRPC),
78
79			"ipc" => Ok(Self::IPC),
80
81			"wasm" => Ok(Self::WASM),
82
83			_ => Err(anyhow::anyhow!("Unknown transport type: {}", s)),
84		}
85	}
86}
87
88/// Transport enumeration.
89///
90/// Union type wrapping all supported transport implementations.
91#[derive(Debug)]
92pub enum Transport {
93	/// gRPC-based transport (Mountain/Air communication).
94	gRPC(gRPCTransport),
95
96	/// IPC transport (same-machine process communication).
97	IPC(IPCTransport),
98
99	/// Direct WASM module transport (browser).
100	WASM(WASMTransportImpl),
101}
102
103impl Transport {
104	/// Get the transport type
105	pub fn transport_type(&self) -> TransportType {
106		match self {
107			Self::gRPC(_) => TransportType::gRPC,
108
109			Self::IPC(_) => TransportType::IPC,
110
111			Self::WASM(_) => TransportType::WASM,
112		}
113	}
114
115	/// Connect to the transport
116	pub async fn connect(&self) -> anyhow::Result<()> {
117		match self {
118			Self::gRPC(transport) => {
119				transport
120					.connect()
121					.await
122					.map_err(|e| anyhow::anyhow!("gRPC connect error: {}", e))
123			},
124
125			Self::IPC(transport) => {
126				transport
127					.connect()
128					.await
129					.map_err(|e| anyhow::anyhow!("IPC connect error: {}", e))
130			},
131
132			Self::WASM(transport) => {
133				transport
134					.connect()
135					.await
136					.map_err(|e| anyhow::anyhow!("WASM connect error: {}", e))
137			},
138		}
139	}
140
141	/// Send a request and receive a response
142	pub async fn send(&self, request:&[u8]) -> anyhow::Result<Vec<u8>> {
143		match self {
144			Self::gRPC(transport) => {
145				transport
146					.send(request)
147					.await
148					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
149			},
150
151			Self::IPC(transport) => {
152				transport
153					.send(request)
154					.await
155					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
156			},
157
158			Self::WASM(transport) => {
159				transport
160					.send(request)
161					.await
162					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
163			},
164		}
165	}
166
167	/// Send data without expecting a response
168	pub async fn send_no_response(&self, data:&[u8]) -> anyhow::Result<()> {
169		match self {
170			Self::gRPC(transport) => {
171				transport
172					.send_no_response(data)
173					.await
174					.map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
175			},
176
177			Self::IPC(transport) => {
178				transport
179					.send_no_response(data)
180					.await
181					.map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
182			},
183
184			Self::WASM(transport) => {
185				transport
186					.send_no_response(data)
187					.await
188					.map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
189			},
190		}
191	}
192
193	/// Close the transport
194	pub async fn close(&self) -> anyhow::Result<()> {
195		match self {
196			Self::gRPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("gRPC close error: {}", e)),
197
198			Self::IPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("IPC close error: {}", e)),
199
200			Self::WASM(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("WASM close error: {}", e)),
201		}
202	}
203
204	/// Check if the transport is connected
205	pub fn is_connected(&self) -> bool {
206		match self {
207			Self::gRPC(transport) => transport.is_connected(),
208
209			Self::IPC(transport) => transport.is_connected(),
210
211			Self::WASM(transport) => transport.is_connected(),
212		}
213	}
214
215	/// Get gRPC transport reference (if applicable)
216	pub fn AsgRPC(&self) -> Option<&gRPCTransport> {
217		match self {
218			Self::gRPC(Transport) => Some(Transport),
219
220			_ => None,
221		}
222	}
223
224	/// Returns the IPC transport reference if this is an IPC transport.
225	pub fn AsIPC(&self) -> Option<&IPCTransport> {
226		match self {
227			Self::IPC(Transport) => Some(Transport),
228
229			_ => None,
230		}
231	}
232
233	/// Get WASM transport reference (if applicable)
234	pub fn as_wasm(&self) -> Option<&WASMTransportImpl> {
235		match self {
236			Self::WASM(transport) => Some(transport),
237
238			_ => None,
239		}
240	}
241}
242
243impl Default for Transport {
244	fn default() -> Self {
245		Self::gRPC(
246			gRPCTransport::New("127.0.0.1:50050").unwrap_or_else(|_| {
247				gRPCTransport::New("0.0.0.0:50050").expect("Failed to create default gRPC transport")
248			}),
249		)
250	}
251}
252
253impl fmt::Display for Transport {
254	fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Transport({})", self.transport_type()) }
255}
256
257/// Transport message wrapper
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct TransportMessage {
260	/// Message type identifier
261	pub message_type:String,
262
263	/// Message ID for correlation
264	pub message_id:String,
265
266	/// Timestamp (Unix epoch)
267	pub timestamp:u64,
268
269	/// Message payload
270	pub payload:Bytes,
271
272	/// Optional metadata
273	pub metadata:Option<serde_json::Value>,
274}
275
276impl TransportMessage {
277	/// Create a new transport message
278	pub fn new(message_type:impl Into<String>, payload:Bytes) -> Self {
279		Self {
280			message_type:message_type.into(),
281
282			message_id:uuid::Uuid::new_v4().to_string(),
283
284			timestamp:std::time::SystemTime::now()
285				.duration_since(std::time::UNIX_EPOCH)
286				.map(|d| d.as_secs())
287				.unwrap_or(0),
288
289			payload,
290
291			metadata:None,
292		}
293	}
294
295	/// Set metadata for the message
296	pub fn with_metadata(mut self, metadata:serde_json::Value) -> Self {
297		self.metadata = Some(metadata);
298
299		self
300	}
301
302	/// Serialize the message to bytes
303	pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
304		serde_json::to_vec(self).map(Bytes::from).map_err(|e| anyhow::anyhow!("{}", e))
305	}
306
307	/// Deserialize message from bytes
308	pub fn from_bytes(bytes:&[u8]) -> anyhow::Result<Self> {
309		serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("{}", e))
310	}
311}
312
313/// Transport statistics
314#[derive(Debug, Clone, Default, Serialize, Deserialize)]
315pub struct TransportStats {
316	/// Number of messages sent
317	pub messages_sent:u64,
318
319	/// Number of messages received
320	pub messages_received:u64,
321
322	/// Number of errors encountered
323	pub errors:u64,
324
325	/// Total bytes sent
326	pub bytes_sent:u64,
327
328	/// Total bytes received
329	pub bytes_received:u64,
330
331	/// Average latency in microseconds
332	pub avg_latency_us:u64,
333
334	/// Connection uptime in seconds
335	pub uptime_seconds:u64,
336}
337
338impl TransportStats {
339	/// Update statistics with a sent message
340	pub fn record_sent(&mut self, bytes:u64, latency_us:u64) {
341		self.messages_sent += 1;
342
343		self.bytes_sent += bytes;
344
345		// Update average latency
346		if self.messages_sent > 0 {
347			self.avg_latency_us = (self.avg_latency_us * (self.messages_sent - 1) + latency_us) / self.messages_sent;
348		}
349	}
350
351	/// Update statistics with a received message
352	pub fn record_received(&mut self, bytes:u64) {
353		self.messages_received += 1;
354
355		self.bytes_received += bytes;
356	}
357
358	/// Record an error
359	pub fn record_error(&mut self) { self.errors += 1; }
360}
361
362#[cfg(test)]
363mod tests {
364
365	use super::*;
366
367	#[test]
368	fn test_transport_type_to_string() {
369		assert_eq!(TransportType::gRPC.to_string(), "grpc");
370
371		assert_eq!(TransportType::IPC.to_string(), "ipc");
372
373		assert_eq!(TransportType::WASM.to_string(), "wasm");
374	}
375
376	#[test]
377	fn test_transport_type_from_str() {
378		assert_eq!("grpc".parse::<TransportType>().unwrap(), TransportType::gRPC);
379
380		assert_eq!("ipc".parse::<TransportType>().unwrap(), TransportType::IPC);
381
382		assert_eq!("wasm".parse::<TransportType>().unwrap(), TransportType::WASM);
383
384		assert!("unknown".parse::<TransportType>().is_err());
385	}
386
387	#[test]
388	fn test_transport_display() {
389		// Create a dummy transport to test Display implementation
390		// In real tests, we'd use an actual transport
391		let transport = Transport::default();
392
393		let display = format!("{}", transport);
394
395		assert!(display.contains("Transport"));
396	}
397
398	#[test]
399	fn test_transport_message_creation() {
400		let message = TransportMessage::new("test_type", Bytes::from("hello"));
401
402		assert_eq!(message.message_type, "test_type");
403
404		assert_eq!(message.payload, Bytes::from("hello"));
405
406		assert!(!message.message_id.is_empty());
407	}
408
409	#[test]
410	fn test_transport_message_serialization() {
411		let message = TransportMessage::new("test", Bytes::from("data"));
412
413		let bytes = message.to_bytes().unwrap();
414
415		let deserialized = TransportMessage::from_bytes(&bytes).unwrap();
416
417		assert_eq!(deserialized.message_type, message.message_type);
418
419		assert_eq!(deserialized.payload, message.payload);
420	}
421
422	#[test]
423	fn test_transport_stats() {
424		let mut stats = TransportStats::default();
425
426		stats.record_sent(100, 1000);
427
428		stats.record_received(50);
429
430		stats.record_error();
431
432		assert_eq!(stats.messages_sent, 1);
433
434		assert_eq!(stats.messages_received, 1);
435
436		assert_eq!(stats.errors, 1);
437
438		assert_eq!(stats.bytes_sent, 100);
439
440		assert_eq!(stats.bytes_received, 50);
441
442		assert_eq!(stats.avg_latency_us, 1000);
443	}
444}