1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{num::NonZeroUsize, sync::Arc};

use super::{
    discovery::{DerivedDiscoveryBehaviourEvent, DiscoveryEvent, PeerInfo},
    PeerManager,
};
use crate::libp2p::{
    chain_exchange::ChainExchangeBehaviour,
    config::Libp2pConfig,
    discovery::{DiscoveryBehaviour, DiscoveryConfig},
    gossip_params::{build_peer_score_params, build_peer_score_threshold},
    hello::HelloBehaviour,
};
use crate::libp2p_bitswap::BitswapBehaviour;
use crate::utils::{encoding::blake2b_256, version::FOREST_VERSION_STRING};
use ahash::{HashMap, HashSet};
use libp2p::{
    allow_block_list, connection_limits,
    gossipsub::{
        self, IdentTopic as Topic, MessageAuthenticity, MessageId, PublishError, SubscriptionError,
        ValidationMode,
    },
    identity::{Keypair, PeerId},
    kad::QueryId,
    metrics::{Metrics, Recorder},
    ping, request_response,
    swarm::NetworkBehaviour,
    Multiaddr,
};
use once_cell::sync::Lazy;
use tracing::info;

/// Libp2p behavior for the Forest node. This handles all sub protocols needed
/// for a Filecoin node.
#[derive(NetworkBehaviour)]
pub(in crate::libp2p) struct ForestBehaviour {
    // Behaviours that manage connections should come first, to get rid of some panics in debug build.
    // See <https://github.com/libp2p/rust-libp2p/issues/4773#issuecomment-2042676966>
    connection_limits: connection_limits::Behaviour,
    pub(super) blocked_peers: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,
    pub(super) discovery: DiscoveryBehaviour,
    ping: ping::Behaviour,
    gossipsub: gossipsub::Behaviour,
    pub(super) hello: HelloBehaviour,
    pub(super) chain_exchange: ChainExchangeBehaviour,
    pub(super) bitswap: BitswapBehaviour,
}

impl Recorder<ForestBehaviourEvent> for Metrics {
    fn record(&self, event: &ForestBehaviourEvent) {
        match event {
            ForestBehaviourEvent::Gossipsub(e) => self.record(e),
            ForestBehaviourEvent::Ping(ping_event) => self.record(ping_event),
            ForestBehaviourEvent::Discovery(DiscoveryEvent::Discovery(e)) => match e.as_ref() {
                DerivedDiscoveryBehaviourEvent::Identify(e) => self.record(e),
                DerivedDiscoveryBehaviourEvent::Kademlia(e) => self.record(e),
                _ => {}
            },
            _ => {}
        }
    }
}

impl ForestBehaviour {
    pub fn new(
        local_key: &Keypair,
        config: &Libp2pConfig,
        network_name: &str,
        peer_manager: Arc<PeerManager>,
    ) -> anyhow::Result<Self> {
        const MAX_ESTABLISHED_PER_PEER: u32 = 4;
        static MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER: Lazy<usize> = Lazy::new(|| {
            std::env::var("FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER")
                .ok()
                .map(|it|
                    it.parse::<NonZeroUsize>()
                        .expect("Failed to parse the `FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER` environment variable value, a positive integer is expected.")
                        .get())
                .unwrap_or(10)
        });

        let max_concurrent_request_response_streams = (config.target_peer_count as usize)
            .saturating_mul(*MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER);

        let mut gs_config_builder = gossipsub::ConfigBuilder::default();
        gs_config_builder.max_transmit_size(1 << 20);
        gs_config_builder.validation_mode(ValidationMode::Strict);
        gs_config_builder.message_id_fn(|msg: &gossipsub::Message| {
            let s = blake2b_256(&msg.data);
            MessageId::from(s)
        });

        let gossipsub_config = gs_config_builder.build().unwrap();
        let mut gossipsub = gossipsub::Behaviour::new(
            MessageAuthenticity::Signed(local_key.clone()),
            gossipsub_config,
        )
        .unwrap();

        gossipsub
            .with_peer_score(
                build_peer_score_params(network_name),
                build_peer_score_threshold(),
            )
            .unwrap();

        let bitswap = BitswapBehaviour::new(
            &[
                "/chain/ipfs/bitswap/1.2.0",
                "/chain/ipfs/bitswap/1.1.0",
                "/chain/ipfs/bitswap/1.0.0",
                "/chain/ipfs/bitswap",
            ],
            request_response::Config::default()
                .with_max_concurrent_streams(max_concurrent_request_response_streams),
        );
        crate::libp2p_bitswap::register_metrics(&mut crate::metrics::default_registry());

        let discovery = DiscoveryConfig::new(local_key.public(), network_name)
            .with_mdns(config.mdns)
            .with_kademlia(config.kademlia)
            .with_user_defined(config.bootstrap_peers.clone())?
            .target_peer_count(config.target_peer_count as u64)
            .finish()?;

        let connection_limits = connection_limits::Behaviour::new(
            connection_limits::ConnectionLimits::default()
                .with_max_pending_incoming(Some(
                    config
                        .target_peer_count
                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
                ))
                .with_max_pending_outgoing(Some(
                    config
                        .target_peer_count
                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
                ))
                .with_max_established_incoming(Some(
                    config
                        .target_peer_count
                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
                ))
                .with_max_established_outgoing(Some(
                    config
                        .target_peer_count
                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
                ))
                .with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER)),
        );

        info!("libp2p Forest version: {}", FOREST_VERSION_STRING.as_str());
        Ok(ForestBehaviour {
            gossipsub,
            discovery,
            ping: Default::default(),
            connection_limits,
            blocked_peers: Default::default(),
            bitswap,
            hello: HelloBehaviour::new(
                request_response::Config::default()
                    .with_max_concurrent_streams(max_concurrent_request_response_streams),
                peer_manager,
            ),
            chain_exchange: ChainExchangeBehaviour::new(
                request_response::Config::default()
                    .with_max_concurrent_streams(max_concurrent_request_response_streams),
            ),
        })
    }

    /// Bootstrap Kademlia network
    pub fn bootstrap(&mut self) -> Result<QueryId, String> {
        self.discovery.bootstrap()
    }

    /// Publish data over the gossip network.
    pub fn publish(
        &mut self,
        topic: Topic,
        data: impl Into<Vec<u8>>,
    ) -> Result<MessageId, PublishError> {
        self.gossipsub.publish(topic, data)
    }

    /// Subscribe to a gossip topic.
    pub fn subscribe(&mut self, topic: &Topic) -> Result<bool, SubscriptionError> {
        self.gossipsub.subscribe(topic)
    }

    /// Returns a set of peer ids
    pub fn peers(&self) -> &HashSet<PeerId> {
        self.discovery.peers()
    }

    /// Returns a map of peer ids and their multi-addresses
    pub fn peer_addresses(&self) -> HashMap<PeerId, HashSet<Multiaddr>> {
        self.discovery.peer_addresses()
    }

    pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
        self.discovery.peer_info(peer_id)
    }
}