use std::{convert::TryFrom, sync::Arc};
use crate::blocks::{Block, CachingBlockHeader, FullTipset, Tipset, BLOCK_MESSAGE_LIMIT};
use crate::message::SignedMessage;
use crate::shim::message::Message;
use cid::Cid;
use nunny::Vec as NonEmpty;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};
pub const HEADERS: u64 = 0b01;
pub const MESSAGES: u64 = 0b10;
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ChainExchangeRequest {
pub start: NonEmpty<Cid>,
pub request_len: u64,
pub options: u64,
}
impl ChainExchangeRequest {
pub fn include_blocks(&self) -> bool {
self.options & HEADERS > 0
}
pub fn include_messages(&self) -> bool {
self.options & MESSAGES > 0
}
pub fn is_options_valid(&self) -> bool {
self.include_blocks() || self.include_messages()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
pub enum ChainExchangeResponseStatus {
Success,
PartialResponse,
BlockNotFound,
GoAway,
InternalError,
BadRequest,
Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
}
impl Serialize for ChainExchangeResponseStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use ChainExchangeResponseStatus::*;
let code: i32 = match self {
Success => 0,
PartialResponse => 101,
BlockNotFound => 201,
GoAway => 202,
InternalError => 203,
BadRequest => 204,
Other(i) => *i,
};
code.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
where
D: Deserializer<'de>,
{
let code: i32 = Deserialize::deserialize(deserializer)?;
use ChainExchangeResponseStatus::*;
let status = match code {
0 => Success,
101 => PartialResponse,
201 => BlockNotFound,
202 => GoAway,
203 => InternalError,
204 => BadRequest,
x => Other(x),
};
Ok(status)
}
}
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct ChainExchangeResponse {
pub status: ChainExchangeResponseStatus,
pub message: String,
pub chain: Vec<TipsetBundle>,
}
impl ChainExchangeResponse {
pub fn into_result<T>(self) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String>,
{
if self.status != ChainExchangeResponseStatus::Success
&& self.status != ChainExchangeResponseStatus::PartialResponse
{
return Err(format!("Status {:?}: {}", self.status, self.message));
}
self.chain.into_iter().map(T::try_from).collect()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct CompactedMessages {
pub bls_msgs: Vec<Message>,
pub bls_msg_includes: Vec<Vec<u64>>,
pub secp_msgs: Vec<SignedMessage>,
pub secp_msg_includes: Vec<Vec<u64>>,
}
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
pub struct TipsetBundle {
pub blocks: Vec<CachingBlockHeader>,
pub messages: Option<CompactedMessages>,
}
impl TryFrom<TipsetBundle> for Tipset {
type Error = String;
fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
Tipset::new(tsb.blocks).map_err(|e| e.to_string())
}
}
impl TryFrom<TipsetBundle> for Arc<Tipset> {
type Error = String;
fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
Tipset::try_from(tsb).map(Arc::new)
}
}
impl TryFrom<TipsetBundle> for CompactedMessages {
type Error = String;
fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
tsb.messages
.ok_or_else(|| "Request contained no messages".to_string())
}
}
impl TryFrom<TipsetBundle> for FullTipset {
type Error = String;
fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
}
}
impl TryFrom<&TipsetBundle> for FullTipset {
type Error = String;
fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
}
}
fn fts_from_bundle_parts(
headers: Vec<CachingBlockHeader>,
messages: Option<&CompactedMessages>,
) -> Result<FullTipset, String> {
let CompactedMessages {
bls_msgs,
bls_msg_includes,
secp_msg_includes,
secp_msgs,
} = messages.ok_or("Tipset bundle did not contain message bundle")?;
if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
return Err(
format!("Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}", headers.len(), bls_msg_includes.len(), secp_msg_includes.len()),
);
}
let zipped = headers
.into_iter()
.zip(bls_msg_includes.iter())
.zip(secp_msg_includes.iter());
fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> Result<Vec<T>, String> {
indexes
.iter()
.map(|idx| {
values
.get(*idx as usize)
.cloned()
.ok_or_else(|| "Invalid message index".to_string())
})
.collect()
}
let blocks = zipped
.enumerate()
.map(|(i, ((header, bls_msg_include), secp_msg_include))| {
let message_count = bls_msg_include.len() + secp_msg_include.len();
if message_count > BLOCK_MESSAGE_LIMIT {
return Err(format!(
"Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
));
}
let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;
Ok(Block {
header,
bls_messages,
secp_messages,
})
})
.collect::<Result<Vec<_>, _>>()?;
FullTipset::new(blocks).map_err(|e| e.to_string())
}
#[cfg(test)]
mod tests {
use quickcheck_macros::quickcheck;
use serde_json;
use super::*;
#[quickcheck]
fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
let serialized = serde_json::to_string(&status).unwrap();
let parsed = serde_json::from_str(&serialized).unwrap();
assert_eq!(status, parsed);
}
}