tempo_node/rpc/consensus/
mod.rs1pub mod types;
9
10use jsonrpsee::{
11 core::RpcResult,
12 proc_macros::rpc,
13 types::{ErrorObject, error::INTERNAL_ERROR_CODE},
14};
15
16pub use types::{
17 CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError, IdentityTransition,
18 IdentityTransitionResponse, Query, TransitionProofData,
19};
20
21#[derive(Copy, Clone, PartialEq, Eq)]
23#[repr(i32)]
24pub enum ErrorCode {
25 NoContent = 204,
26 ServiceUnavailable = 503,
27}
28
29impl ErrorCode {
30 fn msg(self) -> &'static str {
31 match self {
32 Self::NoContent => "the requested content was not available",
33 Self::ServiceUnavailable => {
34 "the consensus subservice was not available, but the request can be retried later"
35 }
36 }
37 }
38}
39
40impl<T> From<types::Response<T>> for RpcResult<T> {
41 fn from(value: types::Response<T>) -> Self {
42 match value {
43 types::Response::Success(val) => Ok(val),
44 types::Response::NotReady => Err(ErrorObject::owned(
45 ErrorCode::NoContent as i32,
46 ErrorCode::NoContent.msg(),
47 None::<()>,
48 )),
49 types::Response::Missing(msg) => Err(ErrorObject::owned(
50 ErrorCode::ServiceUnavailable as i32,
51 ErrorCode::ServiceUnavailable.msg(),
52 Some(msg),
53 )),
54 }
55 }
56}
57
58#[rpc(server, client, namespace = "consensus")]
60pub trait TempoConsensusApi {
61 #[method(name = "getFinalization")]
65 async fn get_finalization(&self, query: Query) -> RpcResult<CertifiedBlock>;
66
67 #[method(name = "getLatest")]
71 async fn get_latest(&self) -> RpcResult<ConsensusState>;
72
73 #[subscription(name = "subscribe" => "event", unsubscribe = "unsubscribe", item = Event)]
75 async fn subscribe_events(&self) -> jsonrpsee::core::SubscriptionResult;
76
77 #[method(name = "getIdentityTransitionProof")]
86 async fn get_identity_transition_proof(
87 &self,
88 from_epoch: Option<u64>,
89 full: Option<bool>,
90 ) -> RpcResult<IdentityTransitionResponse>;
91}
92
93#[derive(Debug, Clone)]
95pub struct TempoConsensusRpc<I> {
96 consensus_feed: I,
97}
98
99impl<I: ConsensusFeed> TempoConsensusRpc<I> {
100 pub fn new(consensus_feed: I) -> Self {
102 Self { consensus_feed }
103 }
104}
105
106#[async_trait::async_trait]
107impl<I: ConsensusFeed> TempoConsensusApiServer for TempoConsensusRpc<I> {
108 async fn get_finalization(&self, query: Query) -> RpcResult<CertifiedBlock> {
109 match self.consensus_feed.get_finalization(query).await {
110 types::Response::Success(obj) => Ok(obj),
111 types::Response::NotReady => todo!(),
112 types::Response::Missing(_) => todo!(),
113 }
114 }
115
116 async fn get_latest(&self) -> RpcResult<ConsensusState> {
117 Ok(self.consensus_feed.get_latest().await)
118 }
119
120 async fn subscribe_events(
121 &self,
122 pending: jsonrpsee::PendingSubscriptionSink,
123 ) -> jsonrpsee::core::SubscriptionResult {
124 let sink = pending.accept().await?;
125 let mut rx = self.consensus_feed.subscribe().await.ok_or_else(|| {
126 ErrorObject::owned(INTERNAL_ERROR_CODE, "Failed to subscribe", None::<()>)
127 })?;
128
129 tokio::spawn(async move {
130 loop {
131 match rx.recv().await {
132 Ok(event) => {
133 let msg = jsonrpsee::SubscriptionMessage::new(
134 sink.method_name(),
135 sink.subscription_id().clone(),
136 &event,
137 )
138 .expect("Event should be serializable");
139 if sink.send(msg).await.is_err() {
140 break;
141 }
142 }
143 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
144 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
145 }
146 }
147 });
148
149 Ok(())
150 }
151
152 async fn get_identity_transition_proof(
153 &self,
154 from_epoch: Option<u64>,
155 full: Option<bool>,
156 ) -> RpcResult<IdentityTransitionResponse> {
157 self.consensus_feed
158 .get_identity_transition_proof(from_epoch, full.unwrap_or(false))
159 .await
160 .map_err(|e| ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::<()>))
161 }
162}