Skip to main content

tempo_node/rpc/consensus/
mod.rs

1//! Consensus namespace RPC implementation.
2//!
3//! Provides query methods and subscriptions for consensus data:
4//! - `consensus_getFinalization(query)` - Get finalization by height from marshal archive
5//! - `consensus_getLatest()` - Get the current consensus state snapshot
6//! - `consensus_subscribe()` - Subscribe to consensus events stream
7
8pub mod types;
9
10use jsonrpsee::{
11    core::RpcResult,
12    proc_macros::rpc,
13    types::{ErrorObject, error::INTERNAL_ERROR_CODE},
14};
15
16pub use types::{
17    CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError, IdentityTransition,
18    IdentityTransitionResponse, Query, TransitionProofData,
19};
20
21/// Custom error codes for the consensus RPC.
22#[derive(Copy, Clone, PartialEq, Eq)]
23#[repr(i32)]
24pub enum ErrorCode {
25    NoContent = 204,
26    ServiceUnavailable = 503,
27}
28
29impl ErrorCode {
30    fn msg(self) -> &'static str {
31        match self {
32            Self::NoContent => "the requested content was not available",
33            Self::ServiceUnavailable => {
34                "the consensus subservice was not available, but the request can be retried later"
35            }
36        }
37    }
38}
39
40impl<T> From<types::Response<T>> for RpcResult<T> {
41    fn from(value: types::Response<T>) -> Self {
42        match value {
43            types::Response::Success(val) => Ok(val),
44            types::Response::NotReady => Err(ErrorObject::owned(
45                ErrorCode::NoContent as i32,
46                ErrorCode::NoContent.msg(),
47                None::<()>,
48            )),
49            types::Response::Missing(msg) => Err(ErrorObject::owned(
50                ErrorCode::ServiceUnavailable as i32,
51                ErrorCode::ServiceUnavailable.msg(),
52                Some(msg),
53            )),
54        }
55    }
56}
57
58/// Consensus namespace RPC trait.
59#[rpc(server, client, namespace = "consensus")]
60pub trait TempoConsensusApi {
61    /// Get finalization by height query.
62    ///
63    /// Use `"latest"` to get the most recent finalization, or `{"height": N}` for a specific height.
64    #[method(name = "getFinalization")]
65    async fn get_finalization(&self, query: Query) -> RpcResult<CertifiedBlock>;
66
67    /// Get the current consensus state snapshot.
68    ///
69    /// Returns the latest finalized block and the latest notarized block (if not yet finalized).
70    #[method(name = "getLatest")]
71    async fn get_latest(&self) -> RpcResult<ConsensusState>;
72
73    /// Subscribe to all consensus events (Notarized, Finalized, Nullified).
74    #[subscription(name = "subscribe" => "event", unsubscribe = "unsubscribe", item = Event)]
75    async fn subscribe_events(&self) -> jsonrpsee::core::SubscriptionResult;
76
77    /// Get identity transition proofs (full DKG events).
78    ///
79    /// Each proof contains the block header with the new DKG outcome, and a BLS certificate from the OLD
80    /// network identity that signs the block.
81    ///
82    /// - `from_epoch`: Optional epoch to start searching from (defaults to latest finalized)
83    /// - `full = false` (default): Returns only the most recent transition
84    /// - `full = true`: Returns all transitions from the starting epoch back to genesis
85    #[method(name = "getIdentityTransitionProof")]
86    async fn get_identity_transition_proof(
87        &self,
88        from_epoch: Option<u64>,
89        full: Option<bool>,
90    ) -> RpcResult<IdentityTransitionResponse>;
91}
92
93/// Tempo consensus RPC implementation.
94#[derive(Debug, Clone)]
95pub struct TempoConsensusRpc<I> {
96    consensus_feed: I,
97}
98
99impl<I: ConsensusFeed> TempoConsensusRpc<I> {
100    /// Create a new consensus RPC handler.
101    pub fn new(consensus_feed: I) -> Self {
102        Self { consensus_feed }
103    }
104}
105
106#[async_trait::async_trait]
107impl<I: ConsensusFeed> TempoConsensusApiServer for TempoConsensusRpc<I> {
108    async fn get_finalization(&self, query: Query) -> RpcResult<CertifiedBlock> {
109        match self.consensus_feed.get_finalization(query).await {
110            types::Response::Success(obj) => Ok(obj),
111            types::Response::NotReady => todo!(),
112            types::Response::Missing(_) => todo!(),
113        }
114    }
115
116    async fn get_latest(&self) -> RpcResult<ConsensusState> {
117        Ok(self.consensus_feed.get_latest().await)
118    }
119
120    async fn subscribe_events(
121        &self,
122        pending: jsonrpsee::PendingSubscriptionSink,
123    ) -> jsonrpsee::core::SubscriptionResult {
124        let sink = pending.accept().await?;
125        let mut rx = self.consensus_feed.subscribe().await.ok_or_else(|| {
126            ErrorObject::owned(INTERNAL_ERROR_CODE, "Failed to subscribe", None::<()>)
127        })?;
128
129        tokio::spawn(async move {
130            loop {
131                match rx.recv().await {
132                    Ok(event) => {
133                        let msg = jsonrpsee::SubscriptionMessage::new(
134                            sink.method_name(),
135                            sink.subscription_id().clone(),
136                            &event,
137                        )
138                        .expect("Event should be serializable");
139                        if sink.send(msg).await.is_err() {
140                            break;
141                        }
142                    }
143                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
144                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
145                }
146            }
147        });
148
149        Ok(())
150    }
151
152    async fn get_identity_transition_proof(
153        &self,
154        from_epoch: Option<u64>,
155        full: Option<bool>,
156    ) -> RpcResult<IdentityTransitionResponse> {
157        self.consensus_feed
158            .get_identity_transition_proof(from_epoch, full.unwrap_or(false))
159            .await
160            .map_err(|e| ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::<()>))
161    }
162}