use std::{
cmp,
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use libp2p::{
autonat,
core::Multiaddr,
identify,
identity::{PeerId, PublicKey},
kad,
mdns::{tokio::Behaviour as Mdns, Event as MdnsEvent},
multiaddr::Protocol,
swarm::{
behaviour::toggle::Toggle,
derive_prelude::*,
dial_opts::{DialOpts, PeerCondition},
NetworkBehaviour, ToSwarm,
},
upnp, StreamProtocol,
};
use tokio::time::Interval;
use tracing::{debug, info, trace, warn};
use crate::utils::version::FOREST_VERSION_STRING;
#[derive(NetworkBehaviour)]
pub struct DerivedDiscoveryBehaviour {
kademlia: Toggle<kad::Behaviour<kad::store::MemoryStore>>,
mdns: Toggle<Mdns>,
identify: identify::Behaviour,
autonat: autonat::Behaviour,
upnp: upnp::tokio::Behaviour,
}
#[derive(Debug)]
pub enum DiscoveryEvent {
PeerConnected(PeerId),
PeerDisconnected(PeerId),
Discovery(Box<DerivedDiscoveryBehaviourEvent>),
}
pub struct DiscoveryConfig<'a> {
local_peer_id: PeerId,
local_public_key: PublicKey,
user_defined: Vec<(PeerId, Multiaddr)>,
target_peer_count: u64,
enable_mdns: bool,
enable_kademlia: bool,
network_name: &'a str,
}
impl<'a> DiscoveryConfig<'a> {
pub fn new(local_public_key: PublicKey, network_name: &'a str) -> Self {
DiscoveryConfig {
local_peer_id: local_public_key.to_peer_id(),
local_public_key,
user_defined: Vec::new(),
target_peer_count: u64::MAX,
enable_mdns: false,
enable_kademlia: true,
network_name,
}
}
pub fn target_peer_count(mut self, limit: u64) -> Self {
self.target_peer_count = limit;
self
}
pub fn with_user_defined(
mut self,
user_defined: impl IntoIterator<Item = Multiaddr>,
) -> anyhow::Result<Self> {
for mut addr in user_defined.into_iter() {
if let Some(Protocol::P2p(peer_id)) = addr.pop() {
self.user_defined.push((peer_id, addr))
} else {
anyhow::bail!("Failed to parse peer id from {addr}")
}
}
Ok(self)
}
pub fn with_mdns(mut self, value: bool) -> Self {
self.enable_mdns = value;
self
}
pub fn with_kademlia(mut self, value: bool) -> Self {
self.enable_kademlia = value;
self
}
pub fn finish(self) -> anyhow::Result<DiscoveryBehaviour> {
let DiscoveryConfig {
local_peer_id,
local_public_key,
user_defined,
target_peer_count,
enable_mdns,
enable_kademlia,
network_name,
} = self;
let mut peers = HashSet::new();
let kademlia_opt = if enable_kademlia {
let mut kademlia = new_kademlia(
local_peer_id,
StreamProtocol::try_from_owned(format!("/fil/kad/{network_name}/kad/1.0.0"))?,
);
for (peer_id, addr) in &user_defined {
kademlia.add_address(peer_id, addr.clone());
peers.insert(*peer_id);
}
if let Err(e) = kademlia.bootstrap() {
warn!("Kademlia bootstrap failed: {}", e);
}
Some(kademlia)
} else {
None
};
let mdns_opt = if enable_mdns {
Some(Mdns::new(Default::default(), local_peer_id).expect("Could not start mDNS"))
} else {
None
};
Ok(DiscoveryBehaviour {
discovery: DerivedDiscoveryBehaviour {
kademlia: kademlia_opt.into(),
mdns: mdns_opt.into(),
identify: identify::Behaviour::new(
identify::Config::new("ipfs/0.1.0".into(), local_public_key)
.with_agent_version(format!("forest-{}", FOREST_VERSION_STRING.as_str()))
.with_push_listen_addr_updates(true),
),
autonat: autonat::Behaviour::new(local_peer_id, Default::default()),
upnp: Default::default(),
},
next_kad_random_query: tokio::time::interval(Duration::from_secs(1)),
duration_to_next_kad: Duration::from_secs(1),
pending_events: VecDeque::new(),
n_node_connected: 0,
peers,
peer_info: HashMap::new(),
target_peer_count,
custom_seed_peers: user_defined,
pending_dial_opts: VecDeque::new(),
})
}
}
pub fn new_kademlia(
peer_id: PeerId,
protocol: StreamProtocol,
) -> kad::Behaviour<kad::store::MemoryStore> {
let store = kad::store::MemoryStore::new(peer_id);
let kad_config = kad::Config::new(protocol);
let mut kademlia = kad::Behaviour::with_config(peer_id, store, kad_config);
kademlia.set_mode(Some(kad::Mode::Server));
kademlia
}
pub struct DiscoveryBehaviour {
discovery: DerivedDiscoveryBehaviour,
next_kad_random_query: Interval,
duration_to_next_kad: Duration,
pending_events: VecDeque<DiscoveryEvent>,
n_node_connected: u64,
peers: HashSet<PeerId>,
peer_info: HashMap<PeerId, PeerInfo>,
target_peer_count: u64,
custom_seed_peers: Vec<(PeerId, Multiaddr)>,
pending_dial_opts: VecDeque<DialOpts>,
}
#[derive(Default)]
pub struct PeerInfo {
pub addresses: HashSet<Multiaddr>,
pub identify_info: Option<identify::Info>,
}
impl DiscoveryBehaviour {
pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
}
pub fn peer_addresses(&self) -> HashMap<PeerId, HashSet<Multiaddr>> {
self.peer_info
.iter()
.map(|(peer_id, info)| (*peer_id, info.addresses.clone()))
.collect()
}
pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
self.peer_info.get(peer_id)
}
pub fn bootstrap(&mut self) -> Result<kad::QueryId, String> {
if let Some(active_kad) = self.discovery.kademlia.as_mut() {
active_kad.bootstrap().map_err(|e| e.to_string())
} else {
for (peer_id, address) in &self.custom_seed_peers {
self.pending_dial_opts.push_back(
DialOpts::peer_id(*peer_id)
.condition(PeerCondition::Disconnected)
.addresses(vec![address.clone()])
.build(),
);
}
Err("Kademlia is not activated".to_string())
}
}
pub fn nat_status(&self) -> autonat::NatStatus {
self.discovery.autonat.nat_status()
}
}
impl NetworkBehaviour for DiscoveryBehaviour {
type ConnectionHandler = <DerivedDiscoveryBehaviour as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = DiscoveryEvent;
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &libp2p::Multiaddr,
remote_addr: &libp2p::Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.peer_info
.entry(peer)
.or_default()
.addresses
.insert(remote_addr.clone());
self.discovery.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &libp2p::Multiaddr,
role_override: libp2p::core::Endpoint,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
self.peer_info
.entry(peer)
.or_default()
.addresses
.insert(addr.clone());
self.discovery.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)
}
fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &libp2p::Multiaddr,
remote_addr: &libp2p::Multiaddr,
) -> Result<(), ConnectionDenied> {
self.discovery
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}
fn handle_pending_outbound_connection(
&mut self,
connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[libp2p::Multiaddr],
effective_role: libp2p::core::Endpoint,
) -> Result<Vec<libp2p::Multiaddr>, ConnectionDenied> {
self.discovery.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}
fn on_swarm_event(&mut self, event: FromSwarm) {
match &event {
FromSwarm::ConnectionEstablished(e) => {
if e.other_established == 0 {
self.n_node_connected += 1;
self.peers.insert(e.peer_id);
self.pending_events
.push_back(DiscoveryEvent::PeerConnected(e.peer_id));
}
}
FromSwarm::ConnectionClosed(e) => {
if e.remaining_established == 0 {
self.n_node_connected -= 1;
self.peers.remove(&e.peer_id);
self.peer_info.remove(&e.peer_id);
self.pending_events
.push_back(DiscoveryEvent::PeerDisconnected(e.peer_id));
}
}
_ => {}
};
self.discovery.on_swarm_event(event)
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.discovery
.on_connection_handler_event(peer_id, connection, event);
}
#[allow(clippy::type_complexity)]
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(ev));
}
if let Some(opts) = self.pending_dial_opts.pop_front() {
return Poll::Ready(ToSwarm::Dial { opts });
}
while self.next_kad_random_query.poll_tick(cx).is_ready() {
if self.n_node_connected < self.target_peer_count {
let random_peer_id = PeerId::random();
debug!(
"Libp2p <= Starting random Kademlia request for {:?}",
random_peer_id
);
if let Some(kademlia) = self.discovery.kademlia.as_mut() {
kademlia.get_closest_peers(random_peer_id);
}
}
self.next_kad_random_query = tokio::time::interval(self.duration_to_next_kad);
self.next_kad_random_query.reset();
self.duration_to_next_kad =
cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
}
while let Poll::Ready(ev) = self.discovery.poll(cx) {
match ev {
ToSwarm::GenerateEvent(ev) => {
match &ev {
DerivedDiscoveryBehaviourEvent::Identify(ev) => {
if let identify::Event::Received { peer_id, info, .. } = ev {
self.peer_info.entry(*peer_id).or_default().identify_info =
Some(info.clone());
if let Some(kademlia) = self.discovery.kademlia.as_mut() {
for address in &info.listen_addrs {
kademlia.add_address(peer_id, address.clone());
}
}
}
}
DerivedDiscoveryBehaviourEvent::Autonat(_) => {}
DerivedDiscoveryBehaviourEvent::Upnp(ev) => match ev {
upnp::Event::NewExternalAddr(addr) => {
info!("UPnP NewExternalAddr: {addr}");
}
upnp::Event::ExpiredExternalAddr(addr) => {
info!("UPnP ExpiredExternalAddr: {addr}");
}
upnp::Event::GatewayNotFound => {
info!("UPnP GatewayNotFound");
}
upnp::Event::NonRoutableGateway => {
info!("UPnP NonRoutableGateway");
}
},
DerivedDiscoveryBehaviourEvent::Kademlia(ev) => match ev {
kad::Event::RoutingUpdated { .. } => {}
kad::Event::RoutablePeer { .. } => {}
kad::Event::PendingRoutablePeer { .. } => {
}
other => {
trace!("Libp2p => Unhandled Kademlia event: {:?}", other)
}
},
DerivedDiscoveryBehaviourEvent::Mdns(ev) => match ev {
MdnsEvent::Discovered(list) => {
if self.n_node_connected >= self.target_peer_count {
continue;
}
for (peer_id, multiaddr) in list {
if let Some(kad) = self.discovery.kademlia.as_mut() {
kad.add_address(peer_id, multiaddr.clone());
}
}
}
MdnsEvent::Expired(_) => {}
},
}
self.pending_events
.push_back(DiscoveryEvent::Discovery(Box::new(ev)));
}
ToSwarm::Dial { opts } => {
return Poll::Ready(ToSwarm::Dial { opts });
}
ToSwarm::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler,
event,
})
}
ToSwarm::CloseConnection {
peer_id,
connection,
} => {
return Poll::Ready(ToSwarm::CloseConnection {
peer_id,
connection,
})
}
ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
ToSwarm::RemoveListener { id } => {
return Poll::Ready(ToSwarm::RemoveListener { id })
}
ToSwarm::NewExternalAddrCandidate(addr) => {
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr))
}
ToSwarm::ExternalAddrConfirmed(addr) => {
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr))
}
ToSwarm::ExternalAddrExpired(addr) => {
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr))
}
_ => {}
}
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use libp2p::{identity::Keypair, swarm::SwarmEvent, Swarm};
use libp2p_swarm_test::SwarmExt as _;
use super::*;
#[tokio::test]
async fn kademlia_test() {
fn new_discovery(
keypair: Keypair,
seed_peers: impl IntoIterator<Item = Multiaddr>,
) -> DiscoveryBehaviour {
DiscoveryConfig::new(keypair.public(), "calibnet")
.with_mdns(false)
.with_kademlia(true)
.with_user_defined(seed_peers)
.unwrap()
.target_peer_count(128)
.finish()
.unwrap()
}
let mut b = Swarm::new_ephemeral(|k| new_discovery(k, vec![]));
b.listen().with_memory_addr_external().await;
let b_peer_id = *b.local_peer_id();
let b_addresses: Vec<_> = b
.external_addresses()
.map(|addr| {
let mut addr = addr.clone();
addr.push(multiaddr::Protocol::P2p(b_peer_id));
addr
})
.collect();
let mut c = Swarm::new_ephemeral(|k| new_discovery(k, vec![]));
c.listen().with_memory_addr_external().await;
let c_peer_id = *c.local_peer_id();
if let Some(c_kad) = c.behaviour_mut().discovery.kademlia.as_mut() {
for addr in b.external_addresses() {
c_kad.add_address(&b_peer_id, addr.clone());
}
}
let mut a = Swarm::new_ephemeral(|k| new_discovery(k, b_addresses));
a.behaviour_mut().bootstrap().unwrap();
c.behaviour_mut().bootstrap().unwrap();
tokio::spawn(b.loop_on_next());
tokio::spawn(c.loop_on_next());
a.wait(|e| match e {
SwarmEvent::Behaviour(DiscoveryEvent::PeerConnected(peer_id)) => {
if peer_id == c_peer_id {
Some(())
} else {
None
}
}
_ => None,
})
.await;
}
}