Grove/Protocol/
SpineConnection.rs1use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41
42use crate::{Protocol::ProtocolConfig, dev_log};
43#[cfg(feature = "grove_echo")]
44use crate::vine::generated::vine::{
45 EchoAction,
46 EchoActionResponse,
47 echo_action_service_client::EchoActionServiceClient,
48};
49
50#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum ConnectionState {
53 Disconnected,
55
56 Connecting,
58
59 Connected,
61
62 Error,
64}
65
66#[derive(Clone, Debug)]
68pub struct HeartbeatConfig {
69 pub interval_seconds:u64,
71
72 pub max_missed:u32,
74
75 pub enabled:bool,
77}
78
79impl Default for HeartbeatConfig {
81 fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
82}
83
84#[derive(Clone, Debug, Default)]
86pub struct ConnectionMetrics {
87 pub total_requests:u64,
89
90 pub successful_requests:u64,
92
93 pub failed_requests:u64,
95
96 pub uptime_seconds:u64,
98
99 pub reconnections:u64,
101}
102
103pub struct SpineConnectionImpl {
105 config:Arc<RwLock<ProtocolConfig>>,
107
108 state:Arc<RwLock<ConnectionState>>,
110
111 #[cfg(feature = "grove_echo")]
112 echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
114
115 heartbeat_config:HeartbeatConfig,
117
118 last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
120
121 metrics:Arc<RwLock<ConnectionMetrics>>,
123}
124
125impl SpineConnectionImpl {
126 pub fn new(config:ProtocolConfig) -> Self {
136 Self {
137 config:Arc::new(RwLock::new(config)),
138
139 state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
140
141 #[cfg(feature = "grove_echo")]
142 echo_client:None,
143
144 heartbeat_config:HeartbeatConfig::default(),
145
146 last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
147
148 metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
149 }
150 }
151
152 pub async fn Connect(&mut self) -> Result<()> {
154 let guard = self.config.read().await;
155
156 let url = guard.mountain_endpoint.clone();
157
158 drop(guard);
159
160 dev_log!("grpc", "Connecting to Spine at: {}", url);
161
162 *self.state.write().await = ConnectionState::Connecting;
163 *self.state.write().await = ConnectionState::Connected;
164 *self.last_heartbeat.write().await = chrono::Utc::now();
165 dev_log!("grpc", "Successfully connected to Spine");
166
167 Ok(())
168 }
169
170 pub async fn Disconnect(&mut self) -> Result<()> {
172 dev_log!("grpc", "Disconnecting from Spine");
173
174 #[cfg(feature = "grove_echo")]
175 {
176 self.echo_client = None;
177 }
178
179 *self.state.write().await = ConnectionState::Disconnected;
180 dev_log!("grpc", "Successfully disconnected from Spine");
181
182 Ok(())
183 }
184
185 pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
187
188 pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
195 if self.GetState().await != ConnectionState::Connected {
196 return Err(anyhow::anyhow!("Not connected to Spine"));
197 }
198
199 dev_log!("grpc", "Sending request: {}", method);
200
201 let mut metrics = self.metrics.write().await;
202
203 metrics.total_requests += 1;
204
205 metrics.successful_requests += 1;
206
207 Ok(Vec::new())
208 }
209
210 pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
212
213 pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
215}
216
217#[cfg(feature = "grove_echo")]
218impl SpineConnectionImpl {
219 pub async fn ConnectEchoClient(&mut self) -> Result<()> {
220 let guard = self.config.read().await;
221
222 let url = guard.mountain_endpoint.clone();
223
224 drop(guard);
225
226 dev_log!("grpc", "Connecting EchoAction client to: {}", url);
227
228 let channel = tonic::transport::Channel::from_shared(url)
229 .context("Invalid Mountain URL")?
230 .connect()
231 .await
232 .context("Failed to connect EchoAction client")?;
233
234 self.echo_client = Some(EchoActionServiceClient::new(channel));
235
236 dev_log!("grpc", "EchoAction client connected");
237
238 Ok(())
239 }
240
241 pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
242 if self.GetState().await != ConnectionState::Connected {
243 return Err(anyhow::anyhow!("Not connected to Spine"));
244 }
245
246 let client = self
247 .echo_client
248 .as_ref()
249 .ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
250
251 let response = client
252 .send_echo_action(action)
253 .await
254 .context("Failed to send EchoAction")?
255 .into_inner();
256
257 if !response.success {
258 anyhow::bail!("EchoAction failed: {}", response.error);
259 }
260
261 Ok(response)
262 }
263
264 pub async fn SendRpcViaEcho(
265 &self,
266
267 method:&str,
268
269 payload:Vec<u8>,
270
271 metadata:HashMap<String, String>,
272 ) -> Result<Vec<u8>> {
273 let mut headers = metadata;
274
275 headers.insert("rpc_method".to_string(), method.to_string());
276
277 let action = EchoAction {
278 action_id:uuid::Uuid::new_v4().to_string(),
279
280 source:"grove".to_string(),
281
282 target:"mountain".to_string(),
283
284 action_type:"rpc".to_string(),
285
286 payload,
287
288 headers,
289
290 timestamp:chrono::Utc::now().timestamp(),
291
292 nested_actions:vec![],
293 };
294
295 let response = self.SendEchoAction(action).await?;
296
297 Ok(response.result)
298 }
299
300 pub async fn SendEventViaEcho(
301 &self,
302
303 event_name:&str,
304
305 payload:Vec<u8>,
306
307 metadata:HashMap<String, String>,
308 ) -> Result<()> {
309 let mut headers = metadata;
310
311 headers.insert("event_name".to_string(), event_name.to_string());
312
313 let action = EchoAction {
314 action_id:uuid::Uuid::new_v4().to_string(),
315
316 source:"grove".to_string(),
317
318 target:"mountain".to_string(),
319
320 action_type:"event".to_string(),
321
322 payload,
323
324 headers,
325
326 timestamp:chrono::Utc::now().timestamp(),
327
328 nested_actions:vec![],
329 };
330
331 self.SendEchoAction(action).await?;
332
333 Ok(())
334 }
335
336 pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
337}
338
339#[cfg(test)]
340mod tests {
341
342 use super::*;
343
344 #[test]
345 fn test_connection_state() {
346 let state = ConnectionState::Connected;
347
348 assert_eq!(state, ConnectionState::Connected);
349 }
350
351 #[test]
352 fn test_heartbeat_config_default() {
353 let config = HeartbeatConfig::default();
354
355 assert_eq!(config.interval_seconds, 30);
356
357 assert!(config.enabled);
358 }
359
360 #[tokio::test]
361 async fn test_spine_connection_creation() {
362 let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
363
364 let connection = SpineConnectionImpl::new(config);
365
366 assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
367 }
368}