Grove/Transport/
gRPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12
13use crate::{
14 Transport::{
15 Strategy::{TransportStats, TransportStrategy, TransportType},
16 TransportConfig,
17 },
18 dev_log,
19};
20
21#[derive(Clone, Debug)]
23pub struct gRPCTransport {
24 Endpoint:String,
26
27 Channel:Arc<RwLock<Option<Channel>>>,
29
30 Configuration:TransportConfig,
32
33 Connected:Arc<RwLock<bool>>,
35
36 Statistics:Arc<RwLock<TransportStats>>,
38}
39
40impl gRPCTransport {
41 pub fn New(Address:&str) -> anyhow::Result<Self> {
43 Ok(Self {
44 Endpoint:Address.to_string(),
45 Channel:Arc::new(RwLock::new(None)),
46 Configuration:TransportConfig::default(),
47 Connected:Arc::new(RwLock::new(false)),
48 Statistics:Arc::new(RwLock::new(TransportStats::default())),
49 })
50 }
51
52 pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
54 Ok(Self {
55 Endpoint:Address.to_string(),
56 Channel:Arc::new(RwLock::new(None)),
57 Configuration,
58 Connected:Arc::new(RwLock::new(false)),
59 Statistics:Arc::new(RwLock::new(TransportStats::default())),
60 })
61 }
62
63 pub fn Address(&self) -> &str { &self.Endpoint }
65
66 pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
68 self.Channel
69 .read()
70 .await
71 .as_ref()
72 .cloned()
73 .ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
74 }
75
76 pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
78
79 fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
81 let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
82 .timeout(self.Configuration.ConnectionTimeout)
83 .connect_timeout(self.Configuration.ConnectionTimeout)
84 .tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
85
86 Ok(EndpointValue)
87 }
88}
89
90#[async_trait]
91impl TransportStrategy for gRPCTransport {
92 type Error = gRPCTransportError;
93
94 async fn connect(&self) -> Result<(), Self::Error> {
95 dev_log!("grpc", "Connecting to gRPC endpoint: {}", self.Endpoint);
96
97 let EndpointValue = self
98 .BuildEndpoint()
99 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
100
101 let ChannelValue = EndpointValue
102 .connect()
103 .await
104 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
105
106 *self.Channel.write().await = Some(ChannelValue);
107 *self.Connected.write().await = true;
108
109 dev_log!("grpc", "gRPC connection established: {}", self.Endpoint);
110
111 Ok(())
112 }
113
114 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
115 let Start = std::time::Instant::now();
116
117 if !self.is_connected() {
118 return Err(gRPCTransportError::NotConnected);
119 }
120
121 dev_log!("grpc", "Sending gRPC request ({} bytes)", request.len());
122
123 let Response:Vec<u8> = vec![];
124
125 let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
126
127 let mut Stats = self.Statistics.write().await;
128
129 Stats.record_sent(request.len() as u64, LatencyMicroseconds);
130
131 Stats.record_received(Response.len() as u64);
132
133 dev_log!("grpc", "gRPC request completed in {}µs", LatencyMicroseconds);
134
135 Ok(Response)
136 }
137
138 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
139 if !self.is_connected() {
140 return Err(gRPCTransportError::NotConnected);
141 }
142
143 dev_log!("grpc", "Sending gRPC notification ({} bytes)", data.len());
144
145 let mut Stats = self.Statistics.write().await;
146
147 Stats.record_sent(data.len() as u64, 0);
148
149 Ok(())
150 }
151
152 async fn close(&self) -> Result<(), Self::Error> {
153 dev_log!("grpc", "Closing gRPC connection: {}", self.Endpoint);
154
155 *self.Channel.write().await = None;
156 *self.Connected.write().await = false;
157 dev_log!("grpc", "gRPC connection closed: {}", self.Endpoint);
158
159 Ok(())
160 }
161
162 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
163
164 fn transport_type(&self) -> TransportType { TransportType::gRPC }
165}
166
167#[derive(Debug, thiserror::Error)]
169pub enum gRPCTransportError {
170 #[error("Connection failed: {0}")]
172 ConnectionFailed(String),
173
174 #[error("Send failed: {0}")]
176 SendFailed(String),
177
178 #[error("Receive failed: {0}")]
180 ReceiveFailed(String),
181
182 #[error("Not connected")]
184 NotConnected,
185
186 #[error("Timeout")]
188 Timeout,
189
190 #[error("gRPC error: {0}")]
192 Error(String),
193}
194
195impl From<tonic::transport::Error> for gRPCTransportError {
196 fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
197}
198
199impl From<tonic::Status> for gRPCTransportError {
200 fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
201}
202
203#[cfg(test)]
204mod tests {
205
206 use super::*;
207
208 #[test]
209 fn TestgRPCTransportCreation() {
210 let Result = gRPCTransport::New("127.0.0.1:50050");
211
212 assert!(Result.is_ok());
213
214 let Transport = Result.unwrap();
215
216 assert_eq!(Transport.Address(), "127.0.0.1:50050");
217 }
218
219 #[tokio::test]
220 async fn TestgRPCTransportNotConnected() {
221 let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
222
223 assert!(!Transport.is_connected());
224 }
225}