tempo_node/rpc/consensus/
mod.rs1pub mod types;
9
10use jsonrpsee::{
11 core::RpcResult,
12 proc_macros::rpc,
13 types::{ErrorObject, error::INTERNAL_ERROR_CODE},
14};
15
16pub use types::{
17 CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError, IdentityTransition,
18 IdentityTransitionResponse, Query, TransitionProofData,
19};
20
21#[rpc(server, client, namespace = "consensus")]
23pub trait TempoConsensusApi {
24 #[method(name = "getFinalization")]
28 async fn get_finalization(&self, query: Query) -> RpcResult<Option<CertifiedBlock>>;
29
30 #[method(name = "getLatest")]
34 async fn get_latest(&self) -> RpcResult<ConsensusState>;
35
36 #[subscription(name = "subscribe" => "event", unsubscribe = "unsubscribe", item = Event)]
38 async fn subscribe_events(&self) -> jsonrpsee::core::SubscriptionResult;
39
40 #[method(name = "getIdentityTransitionProof")]
49 async fn get_identity_transition_proof(
50 &self,
51 from_epoch: Option<u64>,
52 full: Option<bool>,
53 ) -> RpcResult<IdentityTransitionResponse>;
54}
55
56#[derive(Debug, Clone)]
58pub struct TempoConsensusRpc<I> {
59 consensus_feed: I,
60}
61
62impl<I: ConsensusFeed> TempoConsensusRpc<I> {
63 pub fn new(consensus_feed: I) -> Self {
65 Self { consensus_feed }
66 }
67}
68
69#[async_trait::async_trait]
70impl<I: ConsensusFeed> TempoConsensusApiServer for TempoConsensusRpc<I> {
71 async fn get_finalization(&self, query: Query) -> RpcResult<Option<CertifiedBlock>> {
72 Ok(self.consensus_feed.get_finalization(query).await)
73 }
74
75 async fn get_latest(&self) -> RpcResult<ConsensusState> {
76 Ok(self.consensus_feed.get_latest().await)
77 }
78
79 async fn subscribe_events(
80 &self,
81 pending: jsonrpsee::PendingSubscriptionSink,
82 ) -> jsonrpsee::core::SubscriptionResult {
83 let sink = pending.accept().await?;
84 let mut rx = self.consensus_feed.subscribe().await.ok_or_else(|| {
85 ErrorObject::owned(INTERNAL_ERROR_CODE, "Failed to subscribe", None::<()>)
86 })?;
87
88 tokio::spawn(async move {
89 loop {
90 match rx.recv().await {
91 Ok(event) => {
92 let msg = jsonrpsee::SubscriptionMessage::new(
93 sink.method_name(),
94 sink.subscription_id().clone(),
95 &event,
96 )
97 .expect("Event should be serializable");
98 if sink.send(msg).await.is_err() {
99 break;
100 }
101 }
102 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
103 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
104 }
105 }
106 });
107
108 Ok(())
109 }
110
111 async fn get_identity_transition_proof(
112 &self,
113 from_epoch: Option<u64>,
114 full: Option<bool>,
115 ) -> RpcResult<IdentityTransitionResponse> {
116 self.consensus_feed
117 .get_identity_transition_proof(from_epoch, full.unwrap_or(false))
118 .await
119 .map_err(|e| ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::<()>))
120 }
121}