use crate::blocks::{Tipset, TipsetKey};
use crate::chain::{ChainStore, Error as ChainError};
use ahash::{HashMap, HashMapExt};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use super::{
ChainExchangeRequest, ChainExchangeResponse, ChainExchangeResponseStatus, CompactedMessages,
TipsetBundle,
};
pub fn make_chain_exchange_response<DB>(
cs: &ChainStore<DB>,
request: &ChainExchangeRequest,
) -> ChainExchangeResponse
where
DB: Blockstore + Send + Sync + 'static,
{
if !request.is_options_valid() {
return ChainExchangeResponse {
chain: Default::default(),
status: ChainExchangeResponseStatus::BadRequest,
message: format!("Invalid options {}", request.options),
};
}
let inner = move || {
let root = match cs
.chain_index
.load_tipset(&TipsetKey::from(request.start.clone()))?
{
Some(tipset) => tipset,
None => {
return Ok(ChainExchangeResponse {
status: ChainExchangeResponseStatus::BlockNotFound,
chain: Default::default(),
message: "Start tipset was not found in the database".into(),
});
}
};
let chain: Vec<_> = cs
.chain_index
.chain(root)
.take(request.request_len as _)
.map(|tipset| {
let mut tipset_bundle: TipsetBundle = TipsetBundle::default();
if request.include_messages() {
tipset_bundle.messages = Some(compact_messages(cs.blockstore(), &tipset)?);
}
if request.include_blocks() {
tipset_bundle.blocks = tipset.block_headers().iter().cloned().collect_vec();
}
anyhow::Ok(tipset_bundle)
})
.try_collect()?;
anyhow::Ok(ChainExchangeResponse {
status: if request.request_len > chain.len() as u64 {
ChainExchangeResponseStatus::PartialResponse
} else {
ChainExchangeResponseStatus::Success
},
chain,
message: "Success".into(),
})
};
match inner() {
Ok(r) => r,
Err(e) => ChainExchangeResponse {
chain: Default::default(),
status: ChainExchangeResponseStatus::InternalError,
message: e.to_string(),
},
}
}
fn compact_messages<DB>(db: &DB, tipset: &Tipset) -> Result<CompactedMessages, ChainError>
where
DB: Blockstore,
{
let mut bls_messages_order = HashMap::new();
let mut secp_messages_order = HashMap::new();
let mut bls_cids_combined: Vec<Cid> = vec![];
let mut secp_cids_combined: Vec<Cid> = vec![];
let mut bls_msg_includes: Vec<Vec<u64>> = vec![];
let mut secp_msg_includes: Vec<Vec<u64>> = vec![];
for block_header in tipset.block_headers().iter() {
let (bls_cids, secp_cids) = crate::chain::read_msg_cids(db, &block_header.messages)?;
let mut bls_include = Vec::with_capacity(bls_cids.len());
for bls_cid in bls_cids.into_iter() {
let order = match bls_messages_order.get(&bls_cid) {
Some(order) => *order,
None => {
let order = bls_cids_combined.len() as u64;
bls_cids_combined.push(bls_cid);
bls_messages_order.insert(bls_cid, order);
order
}
};
bls_include.push(order);
}
bls_msg_includes.push(bls_include);
let mut secp_include = Vec::with_capacity(secp_cids.len());
for secp_cid in secp_cids.into_iter() {
let order = match secp_messages_order.get(&secp_cid) {
Some(order) => *order,
None => {
let order = secp_cids_combined.len() as u64;
secp_cids_combined.push(secp_cid);
secp_messages_order.insert(secp_cid, order);
order
}
};
secp_include.push(order);
}
secp_msg_includes.push(secp_include);
}
let (bls_msgs, secp_msgs) =
crate::chain::block_messages_from_cids(db, &bls_cids_combined, &secp_cids_combined)?;
Ok(CompactedMessages {
bls_msgs,
bls_msg_includes,
secp_msgs,
secp_msg_includes,
})
}
#[cfg(test)]
mod tests {
use super::{
super::{HEADERS, MESSAGES},
*,
};
use crate::blocks::{CachingBlockHeader, RawBlockHeader};
use crate::db::MemoryDB;
use crate::genesis::EXPORT_SR_40;
use crate::networks::ChainConfig;
use crate::shim::address::Address;
use crate::utils::db::car_util::load_car;
use nunny::Vec as NonEmpty;
use std::sync::Arc;
async fn populate_db() -> (NonEmpty<Cid>, Arc<MemoryDB>) {
let db = Arc::new(MemoryDB::default());
let header = load_car(&db, EXPORT_SR_40).await.unwrap();
(header.roots, db)
}
#[tokio::test]
async fn compact_messages_test() {
let (cids, db) = populate_db().await;
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
..Default::default()
});
let response = make_chain_exchange_response(
&ChainStore::new(
db.clone(),
db.clone(),
db,
Arc::new(ChainConfig::default()),
gen_block,
)
.unwrap(),
&ChainExchangeRequest {
start: cids,
request_len: 2,
options: HEADERS | MESSAGES,
},
);
assert_eq!(response.chain.len(), 2);
let tipset_39_idx = 0;
let tipset_38_idx = 1;
let ts_39_msgs = response.chain[tipset_39_idx].messages.as_ref().unwrap();
let ts_38_msgs = response.chain[tipset_38_idx].messages.as_ref().unwrap();
assert_eq!(ts_39_msgs.secp_msgs.len(), 22);
assert_eq!(ts_39_msgs.bls_msgs.len(), 12);
assert_eq!(ts_39_msgs.secp_msg_includes.len(), 1);
assert_eq!(ts_39_msgs.bls_msg_includes.len(), 1);
assert_eq!(ts_39_msgs.secp_msg_includes[0].len(), 22);
assert_eq!(ts_39_msgs.bls_msg_includes[0].len(), 12);
assert_eq!(response.chain[tipset_38_idx].blocks.len(), 2);
assert_eq!(ts_38_msgs.secp_msgs.len(), 1);
assert_eq!(ts_38_msgs.bls_msgs.len(), 11);
assert_eq!(ts_38_msgs.secp_msg_includes.len(), 2);
assert_eq!(ts_38_msgs.bls_msg_includes.len(), 2);
assert_eq!(ts_38_msgs.secp_msg_includes[0].len(), 1);
assert_eq!(ts_38_msgs.bls_msg_includes[0].len(), 11);
assert_eq!(ts_38_msgs.secp_msg_includes[1].len(), 1);
assert_eq!(ts_38_msgs.bls_msg_includes[1].len(), 11);
}
}