use std::{
convert::TryFrom,
num::NonZeroU64,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use crate::{
blocks::{FullTipset, Tipset, TipsetKey},
libp2p::{
chain_exchange::{
ChainExchangeRequest, ChainExchangeResponse, CompactedMessages, TipsetBundle, HEADERS,
MESSAGES,
},
hello::{HelloRequest, HelloResponse},
rpc::RequestResponseError,
NetworkMessage, PeerId, PeerManager, BITSWAP_TIMEOUT,
},
utils::{
misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
stats::Stats,
},
};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use std::future::Future;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, trace, warn};
static CHAIN_EXCHANGE_TIMEOUT_MILLIS: Lazy<ExponentialAdaptiveValueProvider<u64>> =
Lazy::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
pub(in crate::chain_sync) struct SyncNetworkContext<DB> {
network_send: flume::Sender<NetworkMessage>,
peer_manager: Arc<PeerManager>,
db: Arc<DB>,
}
impl<DB> Clone for SyncNetworkContext<DB> {
fn clone(&self) -> Self {
Self {
network_send: self.network_send.clone(),
peer_manager: self.peer_manager.clone(),
db: self.db.clone(),
}
}
}
struct RaceBatch<T> {
tasks: JoinSet<Result<T, String>>,
semaphore: Arc<Semaphore>,
}
impl<T> RaceBatch<T>
where
T: Send + 'static,
{
pub fn new(max_concurrent_jobs: usize) -> Self {
RaceBatch {
tasks: JoinSet::new(),
semaphore: Arc::new(Semaphore::new(max_concurrent_jobs)),
}
}
pub fn add(&mut self, future: impl Future<Output = Result<T, String>> + Send + 'static) {
let sem = self.semaphore.clone();
self.tasks.spawn(async move {
let permit = sem
.acquire_owned()
.await
.map_err(|_| "Semaphore unexpectedly closed")?;
let result = future.await;
drop(permit);
result
});
}
pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
while let Some(result) = self.tasks.join_next().await {
if let Ok(Ok(value)) = result {
if validate(&value) {
return Some(value);
}
}
}
None
}
}
impl<DB> SyncNetworkContext<DB>
where
DB: Blockstore,
{
pub fn new(
network_send: flume::Sender<NetworkMessage>,
peer_manager: Arc<PeerManager>,
db: Arc<DB>,
) -> Self {
Self {
network_send,
peer_manager,
db,
}
}
pub fn peer_manager(&self) -> &PeerManager {
self.peer_manager.as_ref()
}
pub async fn chain_exchange_headers(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
count: NonZeroU64,
) -> Result<Vec<Arc<Tipset>>, String> {
self.handle_chain_exchange_request(
peer_id,
tsk,
count,
HEADERS,
|tipsets: &Vec<Arc<Tipset>>| validate_network_tipsets(tipsets, tsk),
)
.await
}
pub async fn chain_exchange_messages(
&self,
peer_id: Option<PeerId>,
tipsets: &[Arc<Tipset>],
) -> Result<Vec<CompactedMessages>, String> {
let head = tipsets
.last()
.ok_or_else(|| "tipsets cannot be empty".to_owned())?;
let tsk = head.key();
tracing::trace!(
"ChainExchange message sync tipsets: epoch: {}, len: {}",
head.epoch(),
tipsets.len()
);
self.handle_chain_exchange_request(
peer_id,
tsk,
NonZeroU64::new(tipsets.len() as _).expect("Infallible"),
MESSAGES,
|compacted_messages_vec: &Vec<CompactedMessages>| {
for (msg, ts ) in compacted_messages_vec.iter().zip(tipsets.iter().rev()) {
let header_len = ts.block_headers().len();
if header_len != msg.bls_msg_includes.len()
|| header_len != msg.secp_msg_includes.len()
{
tracing::warn!(
"header_len: {header_len}, msg.bls_msg_includes.len(): {}, msg.secp_msg_includes.len(): {}",
msg.bls_msg_includes.len(),
msg.secp_msg_includes.len()
);
return false;
}
}
true
},
)
.await
}
pub async fn chain_exchange_fts(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
) -> Result<FullTipset, String> {
let mut fts = self
.handle_chain_exchange_request(
peer_id,
tsk,
NonZeroU64::new(1).expect("Infallible"),
HEADERS | MESSAGES,
|_| true,
)
.await?;
if fts.len() != 1 {
return Err(format!(
"Full tipset request returned {} tipsets",
fts.len()
));
}
Ok(fts.remove(0))
}
pub async fn bitswap_get<TMessage: DeserializeOwned>(
&self,
content: Cid,
epoch: Option<i64>,
) -> Result<TMessage, String> {
if let Some(b) = self.db.get_cbor(&content).map_err(|e| e.to_string())? {
return Ok(b);
}
let (tx, rx) = flume::bounded(1);
self.network_send
.send_async(NetworkMessage::BitswapRequest {
cid: content,
response_channel: tx,
epoch,
})
.await
.map_err(|_| "failed to send bitswap request, network receiver dropped")?;
let success = tokio::task::spawn_blocking(move || {
rx.recv_timeout(BITSWAP_TIMEOUT).unwrap_or_default()
})
.await
.is_ok();
match self.db.get_cbor(&content) {
Ok(Some(b)) => Ok(b),
Ok(None) => Err(format!(
"Not found in db, bitswap. success: {success} cid, {content:?}"
)),
Err(e) => Err(format!(
"Error retrieving from db. success: {success} cid, {content:?}, {e}"
)),
}
}
async fn handle_chain_exchange_request<T, F>(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
request_len: NonZeroU64,
options: u64,
validate: F,
) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String> + Send + Sync + 'static,
F: Fn(&Vec<T>) -> bool,
{
let request = ChainExchangeRequest {
start: tsk.to_cids(),
request_len: request_len.get(),
options,
};
let global_pre_time = SystemTime::now();
let network_failures = Arc::new(AtomicU64::new(0));
let lookup_failures = Arc::new(AtomicU64::new(0));
let chain_exchange_result = match peer_id {
Some(id) => Self::chain_exchange_request(
self.peer_manager.clone(),
self.network_send.clone(),
id,
request,
)
.await?
.into_result()?,
None => {
let peers = self.peer_manager.top_peers_shuffled();
if peers.is_empty() {
return Err("chain exchange failed: no peers are available".into());
}
let n_peers = peers.len();
let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
for peer_id in peers.into_iter() {
let peer_manager = self.peer_manager.clone();
let network_send = self.network_send.clone();
let request = request.clone();
let network_failures = network_failures.clone();
let lookup_failures = lookup_failures.clone();
let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
batch.add(async move {
let start = chrono::Utc::now();
match Self::chain_exchange_request(
peer_manager,
network_send,
peer_id,
request,
)
.await
{
Ok(chain_exchange_result) => {
match chain_exchange_result.into_result::<T>() {
Ok(r) => {
success_time_cost_millis_stats.lock().update(
(chrono::Utc::now() - start).num_milliseconds(),
);
Ok(r)
}
Err(error) => {
lookup_failures.fetch_add(1, Ordering::Relaxed);
debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
Err(error)
}
}
}
Err(error) => {
network_failures.fetch_add(1, Ordering::Relaxed);
debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
Err(error)
}
}
});
}
let make_failure_message = || {
CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
tracing::info!(
"Increased chain exchange timeout to {}ms",
CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
);
let mut message = String::new();
message.push_str("ChainExchange request failed for all top peers. ");
message.push_str(&format!(
"{} network failures, ",
network_failures.load(Ordering::Relaxed)
));
message.push_str(&format!(
"{} lookup failures, ",
lookup_failures.load(Ordering::Relaxed)
));
message.push_str(&format!("request:\n{request:?}",));
message
};
let v = batch
.get_ok_validated(validate)
.await
.ok_or_else(make_failure_message)?;
if let Ok(mean) = success_time_cost_millis_stats.lock().mean() {
if CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _) {
tracing::info!(
"Decreased chain exchange timeout to {}ms. Current average: {}ms",
CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
mean,
);
}
}
trace!("Succeed: handle_chain_exchange_request");
v
}
};
match SystemTime::now().duration_since(global_pre_time) {
Ok(t) => self.peer_manager.log_global_success(t),
Err(e) => {
warn!("logged time less than before request: {}", e);
}
}
Ok(chain_exchange_result)
}
async fn chain_exchange_request(
peer_manager: Arc<PeerManager>,
network_send: flume::Sender<NetworkMessage>,
peer_id: PeerId,
request: ChainExchangeRequest,
) -> Result<ChainExchangeResponse, String> {
trace!("Sending ChainExchange Request to {peer_id}");
let req_pre_time = SystemTime::now();
let (tx, rx) = flume::bounded(1);
if network_send
.send_async(NetworkMessage::ChainExchangeRequest {
peer_id,
request,
response_channel: tx,
})
.await
.is_err()
{
return Err("Failed to send chain exchange request to network".to_string());
};
let res = tokio::task::spawn_blocking(move || {
rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
})
.await;
let res_duration = SystemTime::now()
.duration_since(req_pre_time)
.unwrap_or_default();
match res {
Ok(Ok(Ok(bs_res))) => {
peer_manager.log_success(&peer_id, res_duration);
trace!("Succeeded: ChainExchange Request to {peer_id}");
Ok(bs_res)
}
Ok(Ok(Err(e))) => {
match e {
RequestResponseError::UnsupportedProtocols => {
peer_manager
.ban_peer_with_default_duration(
peer_id,
"ChainExchange protocol unsupported",
)
.await;
}
RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
}
RequestResponseError::Timeout | RequestResponseError::Io(_) => {
peer_manager.log_failure(&peer_id, res_duration);
}
}
debug!("Failed: ChainExchange Request to {peer_id}");
Err(format!("Internal libp2p error: {e:?}"))
}
Ok(Err(_)) | Err(_) => {
peer_manager.log_failure(&peer_id, res_duration);
debug!("Timeout: ChainExchange Request to {peer_id}");
Err(format!("Chain exchange request to {peer_id} timed out"))
}
}
}
pub async fn hello_request(
&self,
peer_id: PeerId,
request: HelloRequest,
) -> anyhow::Result<(PeerId, SystemTime, Option<HelloResponse>)> {
trace!("Sending Hello Message to {}", peer_id);
let (tx, rx) = flume::bounded(1);
self.network_send
.send_async(NetworkMessage::HelloRequest {
peer_id,
request,
response_channel: tx,
})
.await
.context("Failed to send hello request: receiver dropped")?;
const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
let sent = SystemTime::now();
let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
.await?
.ok();
Ok((peer_id, sent, res))
}
}
fn validate_network_tipsets(tipsets: &[Arc<Tipset>], start_tipset_key: &TipsetKey) -> bool {
if let Some(start) = tipsets.first() {
if start.key() != start_tipset_key {
tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
return false;
}
for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
if ts.parents() != pts.key() {
tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
return false;
}
}
true
} else {
tracing::warn!("invalid empty chain_exchange_headers response");
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize};
impl<T> RaceBatch<T>
where
T: Send + 'static,
{
pub async fn get_ok(self) -> Option<T> {
self.get_ok_validated(|_| true).await
}
}
#[tokio::test]
async fn race_batch_ok() {
let mut batch = RaceBatch::new(3);
batch.add(async move { Ok(1) });
batch.add(async move { Err("kaboom".into()) });
assert_eq!(batch.get_ok().await, Some(1));
}
#[tokio::test]
async fn race_batch_ok_faster() {
let mut batch = RaceBatch::new(3);
batch.add(async move {
tokio::time::sleep(Duration::from_secs(100)).await;
Ok(1)
});
batch.add(async move { Ok(2) });
batch.add(async move { Err("kaboom".into()) });
assert_eq!(batch.get_ok().await, Some(2));
}
#[tokio::test]
async fn race_batch_none() {
let mut batch: RaceBatch<i32> = RaceBatch::new(3);
batch.add(async move { Err("kaboom".into()) });
batch.add(async move { Err("banana".into()) });
assert_eq!(batch.get_ok().await, None);
}
#[tokio::test]
async fn race_batch_semaphore() {
const MAX_JOBS: usize = 30;
let counter = Arc::new(AtomicUsize::new(0));
let exceeded = Arc::new(AtomicBool::new(false));
let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
for _ in 0..10000 {
let c = counter.clone();
let e = exceeded.clone();
batch.add(async move {
let prev = c.fetch_add(1, Ordering::Relaxed);
if prev >= MAX_JOBS {
e.fetch_or(true, Ordering::Relaxed);
}
tokio::task::yield_now().await;
c.fetch_sub(1, Ordering::Relaxed);
Err("banana".into())
});
}
assert_eq!(batch.get_ok().await, None);
assert!(!exceeded.load(Ordering::Relaxed));
}
#[tokio::test]
async fn race_batch_semaphore_exceeded() {
const MAX_JOBS: usize = 30;
let counter = Arc::new(AtomicUsize::new(0));
let exceeded = Arc::new(AtomicBool::new(false));
let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS + 1);
for _ in 0..10000 {
let c = counter.clone();
let e = exceeded.clone();
batch.add(async move {
let prev = c.fetch_add(1, Ordering::Relaxed);
if prev >= MAX_JOBS {
e.fetch_or(true, Ordering::Relaxed);
}
tokio::task::yield_now().await;
c.fetch_sub(1, Ordering::Relaxed);
Err("banana".into())
});
}
assert_eq!(batch.get_ok().await, None);
assert!(exceeded.load(Ordering::Relaxed));
}
#[test]
#[allow(unused_variables)]
fn validate_network_tipsets_tests() {
use crate::blocks::{chain4u, Chain4U};
let c4u = Chain4U::new();
chain4u! {
in c4u;
t0 @ [genesis_header]
-> t1 @ [first_header]
-> t2 @ [second_left, second_right]
-> t3 @ [third]
-> t4 @ [fourth]
};
let t0 = Arc::new(t0.clone());
let t1 = Arc::new(t1.clone());
let t2 = Arc::new(t2.clone());
let t3 = Arc::new(t3.clone());
let t4 = Arc::new(t4.clone());
assert!(validate_network_tipsets(
&[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
t4.key()
));
assert!(!validate_network_tipsets(
&[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
t3.key()
));
assert!(!validate_network_tipsets(
&[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
t4.key()
));
}
}