use std::sync::Arc;
use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
use crate::chain::HeadChange;
use crate::message::{ChainMessage, SignedMessage};
use crate::message_pool::msg_pool::{
MAX_ACTOR_PENDING_MESSAGES, MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES,
};
use crate::networks::Height;
use crate::shim::{
address::Address,
econ::TokenAmount,
message::Message,
state_tree::{ActorState, StateTree},
};
use crate::state_manager::StateManager;
use crate::utils::db::CborStoreExt;
use async_trait::async_trait;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher};
use crate::message_pool::errors::Error;
#[async_trait]
pub trait Provider {
fn subscribe_head_changes(&self) -> Subscriber<HeadChange>;
fn get_heaviest_tipset(&self) -> Arc<Tipset>;
fn put_message(&self, msg: &ChainMessage) -> Result<Cid, Error>;
fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result<ActorState, Error>;
fn messages_for_block(
&self,
h: &CachingBlockHeader,
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>;
fn load_tipset(&self, tsk: &TipsetKey) -> Result<Arc<Tipset>, Error>;
fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error>;
fn max_actor_pending_messages(&self) -> u64 {
MAX_ACTOR_PENDING_MESSAGES
}
fn max_untrusted_actor_pending_messages(&self) -> u64 {
MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES
}
}
pub struct MpoolRpcProvider<DB> {
subscriber: Publisher<HeadChange>,
sm: Arc<StateManager<DB>>,
}
impl<DB> MpoolRpcProvider<DB>
where
DB: Blockstore,
{
pub fn new(subscriber: Publisher<HeadChange>, sm: Arc<StateManager<DB>>) -> Self {
MpoolRpcProvider { subscriber, sm }
}
}
#[async_trait]
impl<DB> Provider for MpoolRpcProvider<DB>
where
DB: Blockstore + Sync + Send + 'static,
{
fn subscribe_head_changes(&self) -> Subscriber<HeadChange> {
self.subscriber.subscribe()
}
fn get_heaviest_tipset(&self) -> Arc<Tipset> {
self.sm.chain_store().heaviest_tipset()
}
fn put_message(&self, msg: &ChainMessage) -> Result<Cid, Error> {
let cid = self
.sm
.blockstore()
.put_cbor_default(msg)
.map_err(|err| Error::Other(err.to_string()))?;
Ok(cid)
}
fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result<ActorState, Error> {
let state = StateTree::new_from_root(self.sm.blockstore_owned(), ts.parent_state())
.map_err(|e| Error::Other(e.to_string()))?;
Ok(state.get_required_actor(addr)?)
}
fn messages_for_block(
&self,
h: &CachingBlockHeader,
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error> {
crate::chain::block_messages(self.sm.blockstore(), h).map_err(|err| err.into())
}
fn load_tipset(&self, tsk: &TipsetKey) -> Result<Arc<Tipset>, Error> {
Ok(self
.sm
.chain_store()
.chain_index
.load_required_tipset(tsk)?)
}
fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error> {
let smoke_height = self.sm.chain_config().epoch(Height::Smoke);
crate::chain::compute_base_fee(self.sm.blockstore(), ts, smoke_height)
.map_err(|err| err.into())
.map(Into::into)
}
}