use std::sync::Arc;
use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta};
use crate::fil_cns;
use crate::interpreter::BlockMessages;
use crate::interpreter::VMTrace;
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
use crate::networks::{ChainConfig, Height};
use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
use crate::shim::clock::ChainEpoch;
use crate::shim::{
address::Address, econ::TokenAmount, executor::Receipt, message::Message,
state_tree::StateTree, version::NetworkVersion,
};
use crate::utils::db::{BlockstoreExt, CborStoreExt};
use ahash::{HashMap, HashMapExt, HashSet};
use anyhow::Context;
use cid::Cid;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use itertools::Itertools;
use nunny::vec as nonempty;
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::broadcast::{self, Sender as Publisher};
use tracing::{debug, info, trace, warn};
use super::{
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
Error,
};
use crate::db::setting_keys::HEAD_KEY;
use crate::db::{EthMappingsStore, EthMappingsStoreExt, SettingsStore, SettingsStoreExt};
const SINK_CAP: usize = 200;
pub type ChainEpochDelta = ChainEpoch;
#[derive(Clone, Debug)]
pub enum HeadChange {
Apply(Arc<Tipset>),
}
pub struct ChainStore<DB> {
publisher: Publisher<HeadChange>,
pub db: Arc<DB>,
settings: Arc<dyn SettingsStore + Sync + Send>,
pub chain_index: Arc<ChainIndex<Arc<DB>>>,
tipset_tracker: TipsetTracker<DB>,
genesis_block_header: CachingBlockHeader,
validated_blocks: Mutex<HashSet<Cid>>,
eth_mappings: Arc<dyn EthMappingsStore + Sync + Send>,
pub chain_config: Arc<ChainConfig>,
}
impl<DB> BitswapStoreRead for ChainStore<DB>
where
DB: BitswapStoreRead,
{
fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
self.db.contains(cid)
}
fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
self.db.get(cid)
}
}
impl<DB> BitswapStoreReadWrite for ChainStore<DB>
where
DB: BitswapStoreReadWrite,
{
type Params = <DB as BitswapStoreReadWrite>::Params;
fn insert(&self, block: &libipld::Block<Self::Params>) -> anyhow::Result<()> {
self.db.insert(block)
}
}
impl<DB> ChainStore<DB>
where
DB: Blockstore,
{
pub fn new(
db: Arc<DB>,
settings: Arc<dyn SettingsStore + Sync + Send>,
eth_mappings: Arc<dyn EthMappingsStore + Sync + Send>,
chain_config: Arc<ChainConfig>,
genesis_block_header: CachingBlockHeader,
) -> anyhow::Result<Self> {
let (publisher, _) = broadcast::channel(SINK_CAP);
let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db)));
if !settings
.read_obj::<TipsetKey>(HEAD_KEY)?
.is_some_and(|tipset_keys| chain_index.load_tipset(&tipset_keys).is_ok())
{
let tipset_keys = TipsetKey::from(nonempty![*genesis_block_header.cid()]);
settings.write_obj(HEAD_KEY, &tipset_keys)?;
}
let validated_blocks = Mutex::new(HashSet::default());
let cs = Self {
publisher,
chain_index,
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()),
db,
settings,
genesis_block_header,
validated_blocks,
eth_mappings,
chain_config,
};
Ok(cs)
}
pub fn set_heaviest_tipset(&self, ts: Arc<Tipset>) -> Result<(), Error> {
self.settings.write_obj(HEAD_KEY, ts.key())?;
if self.publisher.send(HeadChange::Apply(ts)).is_err() {
debug!("did not publish head change, no active receivers");
}
Ok(())
}
pub fn add_to_tipset_tracker(&self, header: &CachingBlockHeader) {
self.tipset_tracker.add(header);
}
pub fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> {
persist_objects(self.blockstore(), ts.block_headers().iter())?;
let expanded = self.expand_tipset(ts.min_ticket_block().clone())?;
self.put_tipset_key(expanded.key())?;
self.update_heaviest(Arc::new(expanded))?;
Ok(())
}
pub fn put_tipset_key(&self, tsk: &TipsetKey) -> Result<(), Error> {
let hash = tsk.cid()?.into();
self.eth_mappings.write_obj(&hash, tsk)?;
Ok(())
}
pub fn put_delegated_message_hashes<'a>(
&self,
headers: impl Iterator<Item = &'a CachingBlockHeader>,
) -> Result<(), Error> {
tracing::debug!("persist eth mapping");
let delegated_messages = self.headers_delegated_messages(headers)?;
self.process_signed_messages(&delegated_messages)?;
Ok(())
}
pub fn get_required_tipset_key(&self, hash: &EthHash) -> Result<TipsetKey, Error> {
let tsk = self
.eth_mappings
.read_obj::<TipsetKey>(hash)?
.with_context(|| format!("cannot find tipset with hash {}", hash))?;
Ok(tsk)
}
pub fn put_mapping(&self, k: EthHash, v: Cid, timestamp: u64) -> Result<(), Error> {
self.eth_mappings.write_obj(&k, &(v, timestamp))?;
Ok(())
}
pub fn get_mapping(&self, hash: &EthHash) -> Result<Option<Cid>, Error> {
Ok(self
.eth_mappings
.read_obj::<(Cid, u64)>(hash)?
.map(|(cid, _)| cid))
}
fn expand_tipset(&self, header: CachingBlockHeader) -> Result<Tipset, Error> {
self.tipset_tracker.expand(header)
}
pub fn genesis_block_header(&self) -> &CachingBlockHeader {
&self.genesis_block_header
}
pub fn heaviest_tipset(&self) -> Arc<Tipset> {
self.chain_index
.load_required_tipset(
&self
.settings
.require_obj::<TipsetKey>(HEAD_KEY)
.expect("failed to load heaviest tipset"),
)
.expect("failed to load heaviest tipset")
}
pub fn publisher(&self) -> &Publisher<HeadChange> {
&self.publisher
}
pub fn blockstore(&self) -> &DB {
&self.db
}
#[tracing::instrument(skip_all)]
pub fn load_required_tipset_or_heaviest<'a>(
&self,
maybe_key: impl Into<Option<&'a TipsetKey>>,
) -> Result<Arc<Tipset>, Error> {
match maybe_key.into() {
Some(key) => self.chain_index.load_required_tipset(key),
None => Ok(self.heaviest_tipset()),
}
}
fn update_heaviest(&self, ts: Arc<Tipset>) -> Result<(), Error> {
let heaviest_weight = fil_cns::weight(self.blockstore(), &self.heaviest_tipset())?;
let new_weight = fil_cns::weight(self.blockstore(), ts.as_ref())?;
let curr_weight = heaviest_weight;
if new_weight > curr_weight {
info!("New heaviest tipset! {} (EPOCH = {})", ts.key(), ts.epoch());
self.set_heaviest_tipset(ts)?;
}
Ok(())
}
pub fn is_block_validated(&self, cid: &Cid) -> bool {
let validated = self.validated_blocks.lock().contains(cid);
if validated {
trace!("Block {cid} was previously validated");
}
validated
}
pub fn mark_block_as_validated(&self, cid: &Cid) {
let mut file = self.validated_blocks.lock();
file.insert(*cid);
}
pub fn unmark_block_as_validated(&self, cid: &Cid) {
let mut file = self.validated_blocks.lock();
let _did_work = file.remove(cid);
}
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Vec<ChainMessage>, Error> {
let bmsgs = BlockMessages::for_tipset(&self.db, ts)?;
Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect())
}
pub fn get_lookback_tipset_for_round(
chain_index: Arc<ChainIndex<Arc<DB>>>,
chain_config: Arc<ChainConfig>,
heaviest_tipset: Arc<Tipset>,
round: ChainEpoch,
) -> Result<(Arc<Tipset>, Cid), Error>
where
DB: Send + Sync + 'static,
{
let version = chain_config.network_version(round);
let lb = if version <= NetworkVersion::V3 {
ChainEpoch::from(10)
} else {
chain_config.policy.chain_finality
};
let lbr = (round - lb).max(0);
if lbr >= heaviest_tipset.epoch() {
let genesis_timestamp = heaviest_tipset.genesis(&chain_index.db)?.timestamp;
let beacon = Arc::new(chain_config.get_beacon_schedule(genesis_timestamp));
let (state, _) = crate::state_manager::apply_block_messages(
genesis_timestamp,
Arc::clone(&chain_index),
Arc::clone(&chain_config),
beacon,
&crate::shim::machine::MultiEngine::default(),
Arc::clone(&heaviest_tipset),
crate::state_manager::NO_CALLBACK,
VMTrace::NotTraced,
)
.map_err(|e| Error::Other(e.to_string()))?;
return Ok((heaviest_tipset, state));
}
let next_ts = chain_index
.tipset_by_height(
lbr + 1,
heaviest_tipset.clone(),
ResolveNullTipset::TakeNewer,
)
.map_err(|e| Error::Other(format!("Could not get tipset by height {e:?}")))?;
if lbr > next_ts.epoch() {
return Err(Error::Other(format!(
"failed to find non-null tipset {:?} {} which is known to exist, found {:?} {}",
heaviest_tipset.key(),
heaviest_tipset.epoch(),
next_ts.key(),
next_ts.epoch()
)));
}
let lbts = chain_index
.load_required_tipset(next_ts.parents())
.map_err(|e| Error::Other(format!("Could not get tipset from keys {e:?}")))?;
Ok((lbts, *next_ts.parent_state()))
}
pub fn settings(&self) -> Arc<dyn SettingsStore + Sync + Send> {
self.settings.clone()
}
pub fn process_signed_messages(&self, messages: &[(SignedMessage, u64)]) -> anyhow::Result<()>
where
DB: fvm_ipld_blockstore::Blockstore,
{
let eth_txs: Vec<(EthHash, Cid, u64, usize)> = messages
.iter()
.enumerate()
.filter_map(|(i, (smsg, timestamp))| {
if let Ok((_, tx)) =
eth_tx_from_signed_eth_message(smsg, self.chain_config.eth_chain_id)
{
if let Ok(hash) = tx.eth_hash() {
Some((hash.into(), smsg.cid(), *timestamp, i))
} else {
None
}
} else {
None
}
})
.collect();
let filtered = filter_lowest_index(eth_txs);
let num_entries = filtered.len();
for (k, v, timestamp) in filtered.into_iter() {
tracing::trace!("Insert mapping {} => {}", k, v);
self.put_mapping(k, v, timestamp)?;
}
tracing::debug!("Wrote {} entries in Ethereum mapping", num_entries);
Ok(())
}
pub fn headers_delegated_messages<'a>(
&self,
headers: impl Iterator<Item = &'a CachingBlockHeader>,
) -> anyhow::Result<Vec<(SignedMessage, u64)>>
where
DB: fvm_ipld_blockstore::Blockstore,
{
let mut delegated_messages = vec![];
let filtered_headers =
headers.filter(|bh| bh.epoch >= self.chain_config.epoch(Height::Hygge));
for bh in filtered_headers {
if let Ok((_, secp_cids)) = block_messages(self.blockstore(), bh) {
let mut messages: Vec<_> = secp_cids
.into_iter()
.filter(|msg| msg.is_delegated())
.map(|m| (m, bh.timestamp))
.collect();
delegated_messages.append(&mut messages);
}
}
Ok(delegated_messages)
}
}
fn filter_lowest_index(values: Vec<(EthHash, Cid, u64, usize)>) -> Vec<(EthHash, Cid, u64)> {
let map: HashMap<EthHash, (Cid, u64, usize)> = values.into_iter().fold(
HashMap::default(),
|mut acc, (hash, cid, timestamp, index)| {
acc.entry(hash)
.and_modify(|&mut (_, _, ref mut min_index)| {
if index < *min_index {
*min_index = index;
}
})
.or_insert((cid, timestamp, index));
acc
},
);
map.into_iter()
.map(|(hash, (cid, timestamp, _))| (hash, cid, timestamp))
.collect()
}
pub fn block_messages<DB>(
db: &DB,
bh: &CachingBlockHeader,
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>
where
DB: Blockstore,
{
let (bls_cids, secpk_cids) = read_msg_cids(db, &bh.messages)?;
let bls_msgs: Vec<Message> = messages_from_cids(db, &bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, &secpk_cids)?;
Ok((bls_msgs, secp_msgs))
}
pub fn block_messages_from_cids<DB>(
db: &DB,
bls_cids: &[Cid],
secp_cids: &[Cid],
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>
where
DB: Blockstore,
{
let bls_msgs: Vec<Message> = messages_from_cids(db, bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, secp_cids)?;
Ok((bls_msgs, secp_msgs))
}
pub fn read_msg_cids<DB>(db: &DB, msg_cid: &Cid) -> Result<(Vec<Cid>, Vec<Cid>), Error>
where
DB: Blockstore,
{
if let Some(roots) = db.get_cbor::<TxMeta>(msg_cid)? {
let bls_cids = read_amt_cids(db, &roots.bls_message_root)?;
let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?;
Ok((bls_cids, secpk_cids))
} else {
Err(Error::UndefinedKey(format!(
"no msg root with cid {msg_cid}"
)))
}
}
pub fn persist_objects<'a, DB, C>(
db: &DB,
headers: impl Iterator<Item = &'a C>,
) -> Result<(), Error>
where
DB: Blockstore,
C: 'a + Serialize,
{
for chunk in &headers.chunks(256) {
db.bulk_put(chunk, DB::default_code())?;
}
Ok(())
}
fn read_amt_cids<DB>(db: &DB, root: &Cid) -> Result<Vec<Cid>, Error>
where
DB: Blockstore,
{
let amt = Amt::<Cid, _>::load(root, db)?;
let mut cids = Vec::new();
for i in 0..amt.count() {
if let Some(c) = amt.get(i)? {
cids.push(*c);
}
}
Ok(cids)
}
pub fn get_chain_message<DB>(db: &DB, key: &Cid) -> Result<ChainMessage, Error>
where
DB: Blockstore,
{
db.get_cbor(key)?
.ok_or_else(|| Error::UndefinedKey(key.to_string()))
}
pub fn messages_for_tipset<DB>(db: Arc<DB>, ts: &Tipset) -> Result<Vec<ChainMessage>, Error>
where
DB: Blockstore,
{
let mut applied: HashMap<Address, u64> = HashMap::new();
let mut balances: HashMap<Address, TokenAmount> = HashMap::new();
let state = StateTree::new_from_tipset(Arc::clone(&db), ts)?;
let mut get_message_for_block_header =
|b: &CachingBlockHeader| -> Result<Vec<ChainMessage>, Error> {
let (unsigned, signed) = block_messages(&db, b)?;
let mut messages = Vec::with_capacity(unsigned.len() + signed.len());
let unsigned_box = unsigned.into_iter().map(ChainMessage::Unsigned);
let signed_box = signed.into_iter().map(ChainMessage::Signed);
for message in unsigned_box.chain(signed_box) {
let from_address = &message.from();
if applied.contains_key(from_address) {
let actor_state = state
.get_actor(from_address)?
.ok_or_else(|| Error::Other("Actor state not found".to_string()))?;
applied.insert(*from_address, actor_state.sequence);
balances.insert(*from_address, actor_state.balance.clone().into());
}
if let Some(seq) = applied.get_mut(from_address) {
if *seq != message.sequence() {
continue;
}
*seq += 1;
} else {
continue;
}
if let Some(bal) = balances.get_mut(from_address) {
if *bal < message.required_funds() {
continue;
}
*bal -= message.required_funds();
} else {
continue;
}
messages.push(message)
}
Ok(messages)
};
ts.block_headers()
.iter()
.try_fold(Vec::new(), |mut message_vec, b| {
let mut messages = get_message_for_block_header(b)?;
message_vec.append(&mut messages);
Ok(message_vec)
})
}
pub fn messages_from_cids<DB, T>(db: &DB, keys: &[Cid]) -> Result<Vec<T>, Error>
where
DB: Blockstore,
T: DeserializeOwned,
{
keys.iter().map(|k| message_from_cid(db, k)).collect()
}
pub fn message_from_cid<DB, T>(db: &DB, key: &Cid) -> Result<T, Error>
where
DB: Blockstore,
T: DeserializeOwned,
{
db.get_cbor(key)?
.ok_or_else(|| Error::UndefinedKey(key.to_string()))
}
pub fn get_parent_receipt(
db: &impl Blockstore,
block_header: &CachingBlockHeader,
i: usize,
) -> Result<Option<Receipt>, Error> {
Ok(Receipt::get_receipt(
db,
&block_header.message_receipts,
i as u64,
)?)
}
pub mod headchange_json {
use serde::{Deserialize, Serialize};
use super::*;
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type", content = "val")]
pub enum HeadChangeJson {
#[serde(with = "crate::lotus_json")]
Apply(Tipset),
}
impl From<HeadChange> for HeadChangeJson {
fn from(wrapper: HeadChange) -> Self {
match wrapper {
HeadChange::Apply(arc) => Self::Apply((*arc).clone()),
}
}
}
}
#[cfg(test)]
mod tests {
use crate::{blocks::RawBlockHeader, shim::address::Address};
use cid::{
multihash::{
Code::{Blake2b256, Identity},
MultihashDigest,
},
Cid,
};
use fvm_ipld_encoding::DAG_CBOR;
use super::*;
#[test]
fn genesis_test() {
let db = Arc::new(crate::db::MemoryDB::default());
let chain_config = Arc::new(ChainConfig::default());
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
state_root: Cid::new_v1(DAG_CBOR, Identity.digest(&[])),
epoch: 1,
weight: 2u32.into(),
messages: Cid::new_v1(DAG_CBOR, Identity.digest(&[])),
message_receipts: Cid::new_v1(DAG_CBOR, Identity.digest(&[])),
..Default::default()
});
let cs =
ChainStore::new(db.clone(), db.clone(), db, chain_config, gen_block.clone()).unwrap();
assert_eq!(cs.genesis_block_header(), &gen_block);
}
#[test]
fn block_validation_cache_basic() {
let db = Arc::new(crate::db::MemoryDB::default());
let chain_config = Arc::new(ChainConfig::default());
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
..Default::default()
});
let cs = ChainStore::new(db.clone(), db.clone(), db, chain_config, gen_block).unwrap();
let cid = Cid::new_v1(DAG_CBOR, Blake2b256.digest(&[1, 2, 3]));
assert!(!cs.is_block_validated(&cid));
cs.mark_block_as_validated(&cid);
assert!(cs.is_block_validated(&cid));
}
}