tempo_node/rpc/consensus/
mod.rs1pub mod types;
9
10use jsonrpsee::{
11 core::RpcResult,
12 proc_macros::rpc,
13 types::{ErrorObject, error::INTERNAL_ERROR_CODE},
14};
15
16pub use types::{
17 CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError, IdentityTransition,
18 IdentityTransitionResponse, Query, TransitionProofData,
19};
20
21#[derive(Copy, Clone, PartialEq, Eq)]
23#[repr(i32)]
24pub enum ErrorCode {
25 NoContent = 204,
26 ServiceUnavailable = 503,
27}
28
29impl ErrorCode {
30 fn msg(self) -> &'static str {
31 match self {
32 Self::NoContent => "the requested content was not available",
33 Self::ServiceUnavailable => {
34 "the consensus subservice was not available, but the request can be retried later"
35 }
36 }
37 }
38}
39
40impl<T> From<types::Response<T>> for RpcResult<T> {
41 fn from(value: types::Response<T>) -> Self {
42 match value {
43 types::Response::Success(val) => Ok(val),
44 types::Response::NotReady => Err(ErrorObject::owned(
45 ErrorCode::NoContent as i32,
46 ErrorCode::NoContent.msg(),
47 None::<()>,
48 )),
49 types::Response::Missing(msg) => Err(ErrorObject::owned(
50 ErrorCode::ServiceUnavailable as i32,
51 ErrorCode::ServiceUnavailable.msg(),
52 Some(msg),
53 )),
54 }
55 }
56}
57
58#[rpc(server, client, namespace = "consensus")]
60pub trait TempoConsensusApi {
61 #[method(name = "getFinalization")]
65 async fn get_finalization(&self, query: Query) -> RpcResult<CertifiedBlock>;
66
67 #[method(name = "getLatest")]
71 async fn get_latest(&self) -> RpcResult<ConsensusState>;
72
73 #[subscription(name = "subscribe" => "event", unsubscribe = "unsubscribe", item = Event)]
75 async fn subscribe_events(&self) -> jsonrpsee::core::SubscriptionResult;
76
77 #[method(name = "getIdentityTransitionProof")]
86 async fn get_identity_transition_proof(
87 &self,
88 from_epoch: Option<u64>,
89 full: Option<bool>,
90 ) -> RpcResult<IdentityTransitionResponse>;
91}
92
93#[derive(Debug, Clone)]
95pub struct TempoConsensusRpc<I> {
96 consensus_feed: I,
97}
98
99impl<I: ConsensusFeed> TempoConsensusRpc<I> {
100 pub fn new(consensus_feed: I) -> Self {
102 Self { consensus_feed }
103 }
104}
105
106#[async_trait::async_trait]
107impl<I: ConsensusFeed> TempoConsensusApiServer for TempoConsensusRpc<I> {
108 async fn get_finalization(&self, query: Query) -> RpcResult<CertifiedBlock> {
109 self.consensus_feed.get_finalization(query).await.into()
110 }
111
112 async fn get_latest(&self) -> RpcResult<ConsensusState> {
113 Ok(self.consensus_feed.get_latest().await)
114 }
115
116 async fn subscribe_events(
117 &self,
118 pending: jsonrpsee::PendingSubscriptionSink,
119 ) -> jsonrpsee::core::SubscriptionResult {
120 let sink = pending.accept().await?;
121 let mut rx = self.consensus_feed.subscribe().await.ok_or_else(|| {
122 ErrorObject::owned(INTERNAL_ERROR_CODE, "Failed to subscribe", None::<()>)
123 })?;
124
125 tokio::spawn(async move {
126 loop {
127 match rx.recv().await {
128 Ok(event) => {
129 let msg = jsonrpsee::SubscriptionMessage::new(
130 sink.method_name(),
131 sink.subscription_id().clone(),
132 &event,
133 )
134 .expect("Event should be serializable");
135 if sink.send(msg).await.is_err() {
136 break;
137 }
138 }
139 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
140 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
141 }
142 }
143 });
144
145 Ok(())
146 }
147
148 async fn get_identity_transition_proof(
149 &self,
150 from_epoch: Option<u64>,
151 full: Option<bool>,
152 ) -> RpcResult<IdentityTransitionResponse> {
153 self.consensus_feed
154 .get_identity_transition_proof(from_epoch, full.unwrap_or(false))
155 .await
156 .map_err(|e| ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::<()>))
157 }
158}