1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{convert::TryFrom, sync::Arc};

use crate::blocks::{Block, CachingBlockHeader, FullTipset, Tipset, BLOCK_MESSAGE_LIMIT};
use crate::message::SignedMessage;
use crate::shim::message::Message;
use cid::Cid;
use nunny::Vec as NonEmpty;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};

/// `ChainExchange` Filecoin header set bit.
pub const HEADERS: u64 = 0b01;
/// `ChainExchange` Filecoin messages set bit.
pub const MESSAGES: u64 = 0b10;

/// The payload that gets sent to another node to request for blocks and
/// messages.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ChainExchangeRequest {
    /// The tipset [Cid] to start the request from.
    pub start: NonEmpty<Cid>,
    /// The amount of tipsets to request.
    pub request_len: u64,
    /// 1 for Block only, 2 for Messages only, 3 for Blocks and Messages.
    pub options: u64,
}

impl ChainExchangeRequest {
    /// If a request has the [HEADERS] bit set and requests Filecoin headers.
    pub fn include_blocks(&self) -> bool {
        self.options & HEADERS > 0
    }

    /// If a request has the [MESSAGES] bit set and requests messages of a
    /// block.
    pub fn include_messages(&self) -> bool {
        self.options & MESSAGES > 0
    }

    /// If either the [HEADERS] bit or the [MESSAGES] bit is set.
    pub fn is_options_valid(&self) -> bool {
        self.include_blocks() || self.include_messages()
    }
}

/// Status codes of a `chain_exchange` response.
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
pub enum ChainExchangeResponseStatus {
    /// All is well.
    Success,
    /// We could not fetch all blocks requested (but at least we returned
    /// the `Head` requested). Not considered an error.
    PartialResponse,
    /// Request.Start not found.
    BlockNotFound,
    /// Requester is making too many requests.
    GoAway,
    /// Internal error occurred.
    InternalError,
    /// Request was bad.
    BadRequest,
    /// Other undefined response code.
    Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
}

impl Serialize for ChainExchangeResponseStatus {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        use ChainExchangeResponseStatus::*;
        let code: i32 = match self {
            Success => 0,
            PartialResponse => 101,
            BlockNotFound => 201,
            GoAway => 202,
            InternalError => 203,
            BadRequest => 204,
            Other(i) => *i,
        };
        code.serialize(serializer)
    }
}

impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
    fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
    where
        D: Deserializer<'de>,
    {
        let code: i32 = Deserialize::deserialize(deserializer)?;

        use ChainExchangeResponseStatus::*;
        let status = match code {
            0 => Success,
            101 => PartialResponse,
            201 => BlockNotFound,
            202 => GoAway,
            203 => InternalError,
            204 => BadRequest,
            x => Other(x),
        };
        Ok(status)
    }
}

/// The response to a `ChainExchange` request.
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct ChainExchangeResponse {
    /// Status code of the response.
    pub status: ChainExchangeResponseStatus,
    /// Status message indicating failure reason.
    pub message: String,
    /// The tipsets requested.
    pub chain: Vec<TipsetBundle>,
}

impl ChainExchangeResponse {
    /// Converts `chain_exchange` response into result.
    /// Returns an error if the response status is not `Ok`.
    /// Tipset bundle is converted into generic return type with `TryFrom` trait
    /// implementation.
    pub fn into_result<T>(self) -> Result<Vec<T>, String>
    where
        T: TryFrom<TipsetBundle, Error = String>,
    {
        if self.status != ChainExchangeResponseStatus::Success
            && self.status != ChainExchangeResponseStatus::PartialResponse
        {
            return Err(format!("Status {:?}: {}", self.status, self.message));
        }

        self.chain.into_iter().map(T::try_from).collect()
    }
}
/// Contains all BLS and SECP messages and their indexes per block
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct CompactedMessages {
    /// Unsigned BLS messages.
    pub bls_msgs: Vec<Message>,
    /// Describes which block each message belongs to.
    /// if `bls_msg_includes[2] = vec![5]` then `TipsetBundle.blocks[2]` contains `bls_msgs[5]`
    pub bls_msg_includes: Vec<Vec<u64>>,

    /// Signed SECP messages.
    pub secp_msgs: Vec<SignedMessage>,
    /// Describes which block each message belongs to.
    pub secp_msg_includes: Vec<Vec<u64>>,
}

/// Contains the blocks and messages in a particular tipset
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
pub struct TipsetBundle {
    /// The blocks in the tipset.
    pub blocks: Vec<CachingBlockHeader>,

    /// Compressed messages format.
    pub messages: Option<CompactedMessages>,
}

impl TryFrom<TipsetBundle> for Tipset {
    type Error = String;

    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
        Tipset::new(tsb.blocks).map_err(|e| e.to_string())
    }
}

impl TryFrom<TipsetBundle> for Arc<Tipset> {
    type Error = String;

    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
        Tipset::try_from(tsb).map(Arc::new)
    }
}

impl TryFrom<TipsetBundle> for CompactedMessages {
    type Error = String;

    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
        tsb.messages
            .ok_or_else(|| "Request contained no messages".to_string())
    }
}

impl TryFrom<TipsetBundle> for FullTipset {
    type Error = String;

    fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
        fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
    }
}

impl TryFrom<&TipsetBundle> for FullTipset {
    type Error = String;

    fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
        fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
    }
}

/// Constructs a [`FullTipset`] from headers and compacted messages from a
/// bundle.
fn fts_from_bundle_parts(
    headers: Vec<CachingBlockHeader>,
    messages: Option<&CompactedMessages>,
) -> Result<FullTipset, String> {
    let CompactedMessages {
        bls_msgs,
        bls_msg_includes,
        secp_msg_includes,
        secp_msgs,
    } = messages.ok_or("Tipset bundle did not contain message bundle")?;

    if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
        return Err(
            format!("Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}", headers.len(), bls_msg_includes.len(), secp_msg_includes.len()),
        );
    }
    let zipped = headers
        .into_iter()
        .zip(bls_msg_includes.iter())
        .zip(secp_msg_includes.iter());

    fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> Result<Vec<T>, String> {
        indexes
            .iter()
            .map(|idx| {
                values
                    .get(*idx as usize)
                    .cloned()
                    .ok_or_else(|| "Invalid message index".to_string())
            })
            .collect()
    }

    let blocks = zipped
        .enumerate()
        .map(|(i, ((header, bls_msg_include), secp_msg_include))| {
            let message_count = bls_msg_include.len() + secp_msg_include.len();
            if message_count > BLOCK_MESSAGE_LIMIT {
                return Err(format!(
                    "Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
                ));
            }
            let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
            let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;

            Ok(Block {
                header,
                bls_messages,
                secp_messages,
            })
        })
        .collect::<Result<Vec<_>, _>>()?;

    FullTipset::new(blocks).map_err(|e| e.to_string())
}

#[cfg(test)]
mod tests {
    use quickcheck_macros::quickcheck;
    use serde_json;

    use super::*;

    #[quickcheck]
    fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
        let serialized = serde_json::to_string(&status).unwrap();
        let parsed = serde_json::from_str(&serialized).unwrap();
        assert_eq!(status, parsed);
    }
}