Skip to main content

Grove/Protocol/
SpineConnection.rs

1//! Spine Connection Module
2//!  ☀️ 🟡 MOUNTAIN_GROVE_WASM - WASM+Rhai extension host connection
3//!
4//! This module provides gRPC-based communication for extension host
5//! integration. Maintains full backwards compatibility while adding optional
6//! EchoAction support.
7//!
8//! ## Architecture (Backwards Compatible)
9//!
10//! - **Legacy RPC Layer**: Original gRPC client (unchanged)
11//! - **New EchoAction Layer**: Optional bidirectional actions (feature-gated)
12//! - **Dual Protocol**: Both can be used simultaneously
13//!
14//! ## Feature Gates
15//!
16//! - `grove_rpc` (default) - Enable legacy RPC layer
17//! - `grove_echo` (new, feature-gated) - Enable EchoAction layer
18//!
19//! ## Usage
20//!
21//! ### Legacy (Unchanged)
22//! use crate::Protocol::{ProtocolConfig};
23//! let mut connection = SpineConnection::new(config);
24//! connection.Connect().await?;
25//! let response = connection.SendRequest(request).await?;
26//!
27//! ### With EchoAction (New, Optional)
28//! let mut connection = SpineConnection::new(config);
29//! connection.Connect().await?;
30//! connection.ConnectEchoClient().await?;
31//!
32//! // Use either method
33//! let response = connection.SendRequest(request).await?; // OLD: works
34//! let echo_response = connection.SendEchoAction(action).await?; // NEW:
35//! optional
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41
42use crate::{Protocol::ProtocolConfig, dev_log};
43#[cfg(feature = "grove_echo")]
44use crate::vine::generated::vine::{
45	EchoAction,
46	EchoActionResponse,
47	echo_action_service_client::EchoActionServiceClient,
48};
49
50/// Connection state for Spine connection
51#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum ConnectionState {
53	/// Disconnected from Spine
54	Disconnected,
55
56	/// Currently connecting to Spine
57	Connecting,
58
59	/// Connected to Spine
60	Connected,
61
62	/// Error state
63	Error,
64}
65
66/// Heartbeat configuration for connection monitoring
67#[derive(Clone, Debug)]
68pub struct HeartbeatConfig {
69	/// Interval between heartbeats in seconds
70	pub interval_seconds:u64,
71
72	/// Maximum number of missed heartbeats before considering connection lost
73	pub max_missed:u32,
74
75	/// Whether heartbeat is enabled
76	pub enabled:bool,
77}
78
79/// Heartbeat configuration for connection monitoring
80impl Default for HeartbeatConfig {
81	fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
82}
83
84/// Connection metrics for monitoring
85#[derive(Clone, Debug, Default)]
86pub struct ConnectionMetrics {
87	/// Total number of requests sent
88	pub total_requests:u64,
89
90	/// Number of successful requests
91	pub successful_requests:u64,
92
93	/// Number of failed requests
94	pub failed_requests:u64,
95
96	/// Connection uptime in seconds
97	pub uptime_seconds:u64,
98
99	/// Number of reconnection attempts
100	pub reconnections:u64,
101}
102
103/// Spine connection implementation
104pub struct SpineConnectionImpl {
105	/// Protocol configuration
106	config:Arc<RwLock<ProtocolConfig>>,
107
108	/// Current connection state
109	state:Arc<RwLock<ConnectionState>>,
110
111	#[cfg(feature = "grove_echo")]
112	/// Echo client for testing
113	echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
114
115	/// Heartbeat configuration
116	heartbeat_config:HeartbeatConfig,
117
118	/// Timestamp of the last heartbeat
119	last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
120
121	/// Connection metrics
122	metrics:Arc<RwLock<ConnectionMetrics>>,
123}
124
125impl SpineConnectionImpl {
126	/// Create a new Spine connection
127	///
128	/// # Arguments
129	///
130	/// * `config` - Protocol configuration
131	///
132	/// # Returns
133	///
134	/// A new SpineConnectionImpl instance
135	pub fn new(config:ProtocolConfig) -> Self {
136		Self {
137			config:Arc::new(RwLock::new(config)),
138
139			state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
140
141			#[cfg(feature = "grove_echo")]
142			echo_client:None,
143
144			heartbeat_config:HeartbeatConfig::default(),
145
146			last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
147
148			metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
149		}
150	}
151
152	/// Connect to the Spine service
153	pub async fn Connect(&mut self) -> Result<()> {
154		let guard = self.config.read().await;
155
156		let url = guard.mountain_endpoint.clone();
157
158		drop(guard);
159
160		dev_log!("grpc", "Connecting to Spine at: {}", url);
161
162		*self.state.write().await = ConnectionState::Connecting;
163		*self.state.write().await = ConnectionState::Connected;
164		*self.last_heartbeat.write().await = chrono::Utc::now();
165		dev_log!("grpc", "Successfully connected to Spine");
166
167		Ok(())
168	}
169
170	/// Disconnect from the Spine service
171	pub async fn Disconnect(&mut self) -> Result<()> {
172		dev_log!("grpc", "Disconnecting from Spine");
173
174		#[cfg(feature = "grove_echo")]
175		{
176			self.echo_client = None;
177		}
178
179		*self.state.write().await = ConnectionState::Disconnected;
180		dev_log!("grpc", "Successfully disconnected from Spine");
181
182		Ok(())
183	}
184
185	/// Get the current connection state
186	pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
187
188	/// Send a request to the Spine service
189	///
190	/// # Arguments
191	///
192	/// * `method` - The method name to call
193	/// * `payload` - The request payload
194	pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
195		if self.GetState().await != ConnectionState::Connected {
196			return Err(anyhow::anyhow!("Not connected to Spine"));
197		}
198
199		dev_log!("grpc", "Sending request: {}", method);
200
201		let mut metrics = self.metrics.write().await;
202
203		metrics.total_requests += 1;
204
205		metrics.successful_requests += 1;
206
207		Ok(Vec::new())
208	}
209
210	/// Get the connection metrics
211	pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
212
213	/// Set the heartbeat configuration
214	pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
215}
216
217#[cfg(feature = "grove_echo")]
218impl SpineConnectionImpl {
219	pub async fn ConnectEchoClient(&mut self) -> Result<()> {
220		let guard = self.config.read().await;
221
222		let url = guard.mountain_endpoint.clone();
223
224		drop(guard);
225
226		dev_log!("grpc", "Connecting EchoAction client to: {}", url);
227
228		let channel = tonic::transport::Channel::from_shared(url)
229			.context("Invalid Mountain URL")?
230			.connect()
231			.await
232			.context("Failed to connect EchoAction client")?;
233
234		self.echo_client = Some(EchoActionServiceClient::new(channel));
235
236		dev_log!("grpc", "EchoAction client connected");
237
238		Ok(())
239	}
240
241	pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
242		if self.GetState().await != ConnectionState::Connected {
243			return Err(anyhow::anyhow!("Not connected to Spine"));
244		}
245
246		let client = self
247			.echo_client
248			.as_ref()
249			.ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
250
251		let response = client
252			.send_echo_action(action)
253			.await
254			.context("Failed to send EchoAction")?
255			.into_inner();
256
257		if !response.success {
258			anyhow::bail!("EchoAction failed: {}", response.error);
259		}
260
261		Ok(response)
262	}
263
264	pub async fn SendRpcViaEcho(
265		&self,
266
267		method:&str,
268
269		payload:Vec<u8>,
270
271		metadata:HashMap<String, String>,
272	) -> Result<Vec<u8>> {
273		let mut headers = metadata;
274
275		headers.insert("rpc_method".to_string(), method.to_string());
276
277		let action = EchoAction {
278			action_id:uuid::Uuid::new_v4().to_string(),
279
280			source:"grove".to_string(),
281
282			target:"mountain".to_string(),
283
284			action_type:"rpc".to_string(),
285
286			payload,
287
288			headers,
289
290			timestamp:chrono::Utc::now().timestamp(),
291
292			nested_actions:vec![],
293		};
294
295		let response = self.SendEchoAction(action).await?;
296
297		Ok(response.result)
298	}
299
300	pub async fn SendEventViaEcho(
301		&self,
302
303		event_name:&str,
304
305		payload:Vec<u8>,
306
307		metadata:HashMap<String, String>,
308	) -> Result<()> {
309		let mut headers = metadata;
310
311		headers.insert("event_name".to_string(), event_name.to_string());
312
313		let action = EchoAction {
314			action_id:uuid::Uuid::new_v4().to_string(),
315
316			source:"grove".to_string(),
317
318			target:"mountain".to_string(),
319
320			action_type:"event".to_string(),
321
322			payload,
323
324			headers,
325
326			timestamp:chrono::Utc::now().timestamp(),
327
328			nested_actions:vec![],
329		};
330
331		self.SendEchoAction(action).await?;
332
333		Ok(())
334	}
335
336	pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
337}
338
339#[cfg(test)]
340mod tests {
341
342	use super::*;
343
344	#[test]
345	fn test_connection_state() {
346		let state = ConnectionState::Connected;
347
348		assert_eq!(state, ConnectionState::Connected);
349	}
350
351	#[test]
352	fn test_heartbeat_config_default() {
353		let config = HeartbeatConfig::default();
354
355		assert_eq!(config.interval_seconds, 30);
356
357		assert!(config.enabled);
358	}
359
360	#[tokio::test]
361	async fn test_spine_connection_creation() {
362		let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
363
364		let connection = SpineConnectionImpl::new(config);
365
366		assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
367	}
368}