use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use crate::blocks::{CachingBlockHeader, Tipset};
use crate::chain::{HeadChange, MINIMUM_BASE_FEE};
#[cfg(test)]
use crate::db::SettingsStore;
use crate::eth::is_valid_eth_tx_for_sending;
use crate::libp2p::{NetworkMessage, Topic, PUBSUB_MSG_STR};
use crate::message::{valid_for_block_inclusion, ChainMessage, Message, SignedMessage};
use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
use crate::shim::{
address::Address,
crypto::{Signature, SignatureType},
econ::TokenAmount,
gas::{price_list_by_network_version, Gas},
};
use crate::state_manager::is_valid_for_sending;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use anyhow::Context as _;
use cid::Cid;
use futures::StreamExt;
use fvm_ipld_encoding::to_vec;
use itertools::Itertools;
use lru::LruCache;
use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock as SyncRwLock};
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
use tracing::warn;
use crate::message_pool::{
config::MpoolConfig,
errors::Error,
head_change, metrics,
msgpool::{
recover_sig, republish_pending_messages, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE,
RBF_DENOM, RBF_NUM,
},
provider::Provider,
utils::get_base_fee_lower_bound,
};
const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
#[derive(Clone, Default, Debug)]
pub struct MsgSet {
pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
next_sequence: u64,
}
impl MsgSet {
pub fn new(sequence: u64) -> Self {
MsgSet {
msgs: HashMap::new(),
next_sequence: sequence,
}
}
pub fn add_trusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
where
T: Provider,
{
self.add(api, m, true)
}
#[allow(dead_code)]
pub fn add_untrusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
where
T: Provider,
{
self.add(api, m, false)
}
fn add<T>(&mut self, api: &T, m: SignedMessage, trusted: bool) -> Result<(), Error>
where
T: Provider,
{
let max_actor_pending_messages = if trusted {
api.max_actor_pending_messages()
} else {
api.max_untrusted_actor_pending_messages()
};
if self.msgs.is_empty() || m.sequence() >= self.next_sequence {
self.next_sequence = m.sequence() + 1;
}
if let Some(exms) = self.msgs.get(&m.sequence()) {
if m.cid() != exms.cid() {
let premium = &exms.message().gas_premium;
let min_price = premium.clone()
+ ((premium * RBF_NUM).div_floor(RBF_DENOM))
+ TokenAmount::from_atto(1u8);
if m.message().gas_premium <= min_price {
return Err(Error::GasPriceTooLow);
}
} else {
return Err(Error::DuplicateSequence);
}
}
if self.msgs.len() as u64 >= max_actor_pending_messages {
return Err(Error::TooManyPendingMessages(
m.message.from().to_string(),
trusted,
));
}
if self.msgs.insert(m.sequence(), m).is_none() {
metrics::MPOOL_MESSAGE_TOTAL.inc();
}
Ok(())
}
pub fn rm(&mut self, sequence: u64, applied: bool) {
if self.msgs.remove(&sequence).is_none() {
if applied && sequence >= self.next_sequence {
self.next_sequence = sequence + 1;
while self.msgs.contains_key(&self.next_sequence) {
self.next_sequence += 1;
}
}
return;
}
metrics::MPOOL_MESSAGE_TOTAL.dec();
if applied {
if sequence >= self.next_sequence {
self.next_sequence = sequence + 1;
}
return;
}
if sequence < self.next_sequence {
self.next_sequence = sequence;
}
}
}
pub struct MessagePool<T> {
local_addrs: Arc<SyncRwLock<Vec<Address>>>,
pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
pub cur_tipset: Arc<Mutex<Arc<Tipset>>>,
pub api: Arc<T>,
pub network_name: String,
pub network_sender: flume::Sender<NetworkMessage>,
pub bls_sig_cache: Arc<Mutex<LruCache<Cid, Signature>>>,
pub sig_val_cache: Arc<Mutex<LruCache<Cid, ()>>>,
pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
pub repub_trigger: flume::Sender<()>,
local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
pub config: MpoolConfig,
pub chain_config: Arc<ChainConfig>,
}
impl<T> MessagePool<T>
where
T: Provider,
{
fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
self.local_addrs.write().push(m.from());
self.local_msgs.write().insert(m);
Ok(())
}
pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
self.check_message(&msg)?;
let cid = msg.cid();
let cur_ts = self.cur_tipset.lock().clone();
let publish = self.add_tipset(msg.clone(), &cur_ts, true)?;
let msg_ser = to_vec(&msg)?;
self.add_local(msg)?;
if publish {
self.network_sender
.send_async(NetworkMessage::PubsubMessage {
topic: Topic::new(format!("{}/{}", PUBSUB_MSG_STR, self.network_name)),
message: msg_ser,
})
.await
.map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
}
Ok(cid)
}
fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
if to_vec(msg)?.len() > 32 * 1024 {
return Err(Error::MessageTooBig);
}
valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
return Err(Error::MessageValueTooHigh);
}
if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
return Err(Error::GasFeeCapTooLow);
}
self.verify_msg_sig(msg)
}
pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
self.check_message(&msg)?;
let tip = self.cur_tipset.lock().clone();
self.add_tipset(msg, &tip, false)?;
Ok(())
}
fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
let cid = msg.cid();
if let Some(()) = self.sig_val_cache.lock().get(&cid) {
return Ok(());
}
msg.verify().map_err(Error::Other)?;
self.sig_val_cache.lock().put(cid, ());
Ok(())
}
fn add_tipset(&self, msg: SignedMessage, cur_ts: &Tipset, local: bool) -> Result<bool, Error> {
let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
if sequence > msg.message().sequence {
return Err(Error::SequenceTooLow);
}
let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
let eth_chain_id = self.chain_config.eth_chain_id;
if msg.signature().signature_type() == SignatureType::Delegated
&& !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
{
return Err(Error::Other(
"Invalid Ethereum message for the current network version".to_owned(),
));
}
if !is_valid_for_sending(nv, &sender_actor) {
return Err(Error::Other(
"Sender actor is not a valid top-level sender".to_owned(),
));
}
let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
let balance = self.get_state_balance(&msg.from(), cur_ts)?;
let msg_balance = msg.required_funds();
if balance < msg_balance {
return Err(Error::NotEnoughFunds);
}
self.add_helper(msg)?;
Ok(publish)
}
fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> {
let from = msg.from();
let cur_ts = self.cur_tipset.lock().clone();
add_helper(
self.api.as_ref(),
self.bls_sig_cache.as_ref(),
self.pending.as_ref(),
msg,
self.get_state_sequence(&from, &cur_ts)?,
)
}
pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
let cur_ts = self.cur_tipset.lock().clone();
let sequence = self.get_state_sequence(addr, &cur_ts)?;
let pending = self.pending.read();
let msgset = pending.get(addr);
match msgset {
Some(mset) => {
if sequence > mset.next_sequence {
return Ok(sequence);
}
Ok(mset.next_sequence)
}
None => Ok(sequence),
}
}
fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
let actor = self.api.get_actor_after(addr, cur_ts)?;
Ok(actor.sequence)
}
fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
let actor = self.api.get_actor_after(addr, ts)?;
Ok(TokenAmount::from(&actor.balance))
}
pub fn pending(&self) -> Result<(Vec<SignedMessage>, Arc<Tipset>), Error> {
let mut out: Vec<SignedMessage> = Vec::new();
let pending = self.pending.read().clone();
for (addr, _) in pending {
out.append(
self.pending_for(&addr)
.ok_or(Error::InvalidFromAddr)?
.as_mut(),
)
}
let cur_ts = self.cur_tipset.lock().clone();
Ok((out, cur_ts))
}
pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
let pending = self.pending.read();
let mset = pending.get(a)?;
if mset.msgs.is_empty() {
return None;
}
Some(
mset.msgs
.values()
.cloned()
.sorted_by_key(|v| v.message().sequence)
.collect(),
)
}
pub fn messages_for_blocks<'a>(
&self,
blks: impl Iterator<Item = &'a CachingBlockHeader>,
) -> Result<Vec<SignedMessage>, Error> {
let mut msg_vec: Vec<SignedMessage> = Vec::new();
for block in blks {
let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
msg_vec.append(smsgs.as_mut());
for msg in umsg {
let smsg = recover_sig(&mut self.bls_sig_cache.lock(), msg)?;
msg_vec.push(smsg)
}
}
Ok(msg_vec)
}
pub fn load_local(&mut self) -> Result<(), Error> {
let mut local_msgs = self.local_msgs.write();
for k in local_msgs.iter().cloned().collect::<Vec<SignedMessage>>() {
self.add(k.clone()).unwrap_or_else(|err| {
if err == Error::SequenceTooLow {
warn!("error adding message: {:?}", err);
local_msgs.remove(&k);
}
})
}
Ok(())
}
#[cfg(test)]
pub fn get_config(&self) -> &MpoolConfig {
&self.config
}
#[cfg(test)]
pub fn set_config<DB: SettingsStore>(
&mut self,
db: &DB,
cfg: MpoolConfig,
) -> Result<(), Error> {
cfg.save_config(db)
.map_err(|e| Error::Other(e.to_string()))?;
self.config = cfg;
Ok(())
}
}
impl<T> MessagePool<T>
where
T: Provider + Send + Sync + 'static,
{
pub fn new(
api: T,
network_name: String,
network_sender: flume::Sender<NetworkMessage>,
config: MpoolConfig,
chain_config: Arc<ChainConfig>,
services: &mut JoinSet<anyhow::Result<()>>,
) -> Result<MessagePool<T>, Error>
where
T: Provider,
{
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset()));
let bls_sig_cache = Arc::new(Mutex::new(LruCache::new(BLS_SIG_CACHE_SIZE)));
let sig_val_cache = Arc::new(Mutex::new(LruCache::new(SIG_VAL_CACHE_SIZE)));
let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
let republished = Arc::new(SyncRwLock::new(HashSet::new()));
let block_delay = chain_config.block_delay_secs;
let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
let mut mp = MessagePool {
local_addrs,
pending,
cur_tipset: tipset,
api: Arc::new(api),
network_name,
bls_sig_cache,
sig_val_cache,
local_msgs,
republished,
config,
network_sender,
repub_trigger,
chain_config: Arc::clone(&chain_config),
};
mp.load_local()?;
let mut subscriber = mp.api.subscribe_head_changes();
let api = mp.api.clone();
let bls_sig_cache = mp.bls_sig_cache.clone();
let pending = mp.pending.clone();
let republished = mp.republished.clone();
let cur_tipset = mp.cur_tipset.clone();
let repub_trigger = Arc::new(mp.repub_trigger.clone());
services.spawn(async move {
loop {
match subscriber.recv().await {
Ok(ts) => {
let (cur, rev, app) = match ts {
HeadChange::Apply(tipset) => (
cur_tipset.clone(),
Vec::new(),
vec![tipset.as_ref().clone()],
),
};
head_change(
api.as_ref(),
bls_sig_cache.as_ref(),
repub_trigger.clone(),
republished.as_ref(),
pending.as_ref(),
cur.as_ref(),
rev,
app,
)
.await
.context("Error changing head")?;
}
Err(RecvError::Lagged(e)) => {
warn!("Head change subscriber lagged: skipping {} events", e);
}
Err(RecvError::Closed) => {
break Ok(());
}
}
}
});
let api = mp.api.clone();
let pending = mp.pending.clone();
let cur_tipset = mp.cur_tipset.clone();
let republished = mp.republished.clone();
let local_addrs = mp.local_addrs.clone();
let network_sender = Arc::new(mp.network_sender.clone());
let network_name = mp.network_name.clone();
let republish_interval = (10 * block_delay + chain_config.propagation_delay_secs) as u64;
services.spawn(async move {
let mut repub_trigger_rx = repub_trigger_rx.stream();
let mut interval = interval(Duration::from_secs(republish_interval));
loop {
tokio::select! {
_ = interval.tick() => (),
_ = repub_trigger_rx.next() => (),
}
if let Err(e) = republish_pending_messages(
api.as_ref(),
network_sender.as_ref(),
network_name.as_ref(),
pending.as_ref(),
cur_tipset.as_ref(),
republished.as_ref(),
local_addrs.as_ref(),
&chain_config,
)
.await
{
warn!("Failed to republish pending messages: {}", e.to_string());
}
}
});
Ok(mp)
}
}
pub(in crate::message_pool) fn add_helper<T>(
api: &T,
bls_sig_cache: &Mutex<LruCache<Cid, Signature>>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
msg: SignedMessage,
sequence: u64,
) -> Result<(), Error>
where
T: Provider,
{
if msg.signature().signature_type() == SignatureType::Bls {
bls_sig_cache.lock().put(msg.cid(), msg.signature().clone());
}
if msg.message().gas_limit > 100_000_000 {
return Err(Error::Other(
"given message has too high of a gas limit".to_string(),
));
}
api.put_message(&ChainMessage::Signed(msg.clone()))?;
api.put_message(&ChainMessage::Unsigned(msg.message().clone()))?;
let mut pending = pending.write();
let msett = pending.get_mut(&msg.from());
match msett {
Some(mset) => mset.add_trusted(api, msg)?,
None => {
let mut mset = MsgSet::new(sequence);
let from = msg.from();
mset.add_trusted(api, msg)?;
pending.insert(from, mset);
}
}
Ok(())
}
fn verify_msg_before_add(
m: &SignedMessage,
cur_ts: &Tipset,
local: bool,
chain_config: &ChainConfig,
) -> Result<bool, Error> {
let epoch = cur_ts.epoch();
let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
.on_chain_message(to_vec(m)?.len());
valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
if !cur_ts.block_headers().is_empty() {
let base_fee = &cur_ts.block_headers().first().parent_base_fee;
let base_fee_lower_bound =
get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
if m.gas_fee_cap() < base_fee_lower_bound {
if local {
warn!("local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",m.gas_fee_cap(), base_fee_lower_bound);
return Ok(false);
}
return Err(Error::SoftValidationFailure(format!("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
m.gas_fee_cap(), base_fee_lower_bound)));
}
}
Ok(local)
}
pub fn remove(
from: &Address,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
sequence: u64,
applied: bool,
) -> Result<(), Error> {
let mut pending = pending.write();
let mset = if let Some(mset) = pending.get_mut(from) {
mset
} else {
return Ok(());
};
mset.rm(sequence, applied);
if mset.msgs.is_empty() {
pending.remove(from);
}
Ok(())
}