Skip to main content

tempo_node/rpc/consensus/
mod.rs

1//! Consensus namespace RPC implementation.
2//!
3//! Provides query methods and subscriptions for consensus data:
4//! - `consensus_getFinalization(query)` - Get finalization by height from marshal archive
5//! - `consensus_getLatest()` - Get the current consensus state snapshot
6//! - `consensus_subscribe()` - Subscribe to consensus events stream
7
8pub mod types;
9
10use jsonrpsee::{
11    core::RpcResult,
12    proc_macros::rpc,
13    types::{ErrorObject, error::INTERNAL_ERROR_CODE},
14};
15
16pub use types::{
17    CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError, IdentityTransition,
18    IdentityTransitionResponse, Query, TransitionProofData,
19};
20
21/// Consensus namespace RPC trait.
22#[rpc(server, client, namespace = "consensus")]
23pub trait TempoConsensusApi {
24    /// Get finalization by height query.
25    ///
26    /// Use `"latest"` to get the most recent finalization, or `{"height": N}` for a specific height.
27    #[method(name = "getFinalization")]
28    async fn get_finalization(&self, query: Query) -> RpcResult<Option<CertifiedBlock>>;
29
30    /// Get the current consensus state snapshot.
31    ///
32    /// Returns the latest finalized block and the latest notarized block (if not yet finalized).
33    #[method(name = "getLatest")]
34    async fn get_latest(&self) -> RpcResult<ConsensusState>;
35
36    /// Subscribe to all consensus events (Notarized, Finalized, Nullified).
37    #[subscription(name = "subscribe" => "event", unsubscribe = "unsubscribe", item = Event)]
38    async fn subscribe_events(&self) -> jsonrpsee::core::SubscriptionResult;
39
40    /// Get identity transition proofs (full DKG events).
41    ///
42    /// Each proof contains the block header with the new DKG outcome, and a BLS certificate from the OLD
43    /// network identity that signs the block.
44    ///
45    /// - `from_epoch`: Optional epoch to start searching from (defaults to latest finalized)
46    /// - `full = false` (default): Returns only the most recent transition
47    /// - `full = true`: Returns all transitions from the starting epoch back to genesis
48    #[method(name = "getIdentityTransitionProof")]
49    async fn get_identity_transition_proof(
50        &self,
51        from_epoch: Option<u64>,
52        full: Option<bool>,
53    ) -> RpcResult<IdentityTransitionResponse>;
54}
55
56/// Tempo consensus RPC implementation.
57#[derive(Debug, Clone)]
58pub struct TempoConsensusRpc<I> {
59    consensus_feed: I,
60}
61
62impl<I: ConsensusFeed> TempoConsensusRpc<I> {
63    /// Create a new consensus RPC handler.
64    pub fn new(consensus_feed: I) -> Self {
65        Self { consensus_feed }
66    }
67}
68
69#[async_trait::async_trait]
70impl<I: ConsensusFeed> TempoConsensusApiServer for TempoConsensusRpc<I> {
71    async fn get_finalization(&self, query: Query) -> RpcResult<Option<CertifiedBlock>> {
72        Ok(self.consensus_feed.get_finalization(query).await)
73    }
74
75    async fn get_latest(&self) -> RpcResult<ConsensusState> {
76        Ok(self.consensus_feed.get_latest().await)
77    }
78
79    async fn subscribe_events(
80        &self,
81        pending: jsonrpsee::PendingSubscriptionSink,
82    ) -> jsonrpsee::core::SubscriptionResult {
83        let sink = pending.accept().await?;
84        let mut rx = self.consensus_feed.subscribe().await.ok_or_else(|| {
85            ErrorObject::owned(INTERNAL_ERROR_CODE, "Failed to subscribe", None::<()>)
86        })?;
87
88        tokio::spawn(async move {
89            loop {
90                match rx.recv().await {
91                    Ok(event) => {
92                        let msg = jsonrpsee::SubscriptionMessage::new(
93                            sink.method_name(),
94                            sink.subscription_id().clone(),
95                            &event,
96                        )
97                        .expect("Event should be serializable");
98                        if sink.send(msg).await.is_err() {
99                            break;
100                        }
101                    }
102                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
103                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
104                }
105            }
106        });
107
108        Ok(())
109    }
110
111    async fn get_identity_transition_proof(
112        &self,
113        from_epoch: Option<u64>,
114        full: Option<bool>,
115    ) -> RpcResult<IdentityTransitionResponse> {
116        self.consensus_feed
117            .get_identity_transition_proof(from_epoch, full.unwrap_or(false))
118            .await
119            .map_err(|e| ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::<()>))
120    }
121}