1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::{
    networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES},
    utils::{
        db::{
            car_stream::{CarBlock, CarStream},
            car_util::load_car,
        },
        net::http_get,
    },
};
use ahash::HashSet;
use anyhow::ensure;
use cid::Cid;
use futures::{stream::FuturesUnordered, TryStreamExt};
use fvm_ipld_blockstore::Blockstore;
use std::mem::discriminant;
use std::{io::Cursor, path::Path};
use tracing::{info, warn};

/// Tries to load the missing actor bundles to the blockstore. If the bundle is
/// not present, it will be downloaded.
pub async fn load_actor_bundles(
    db: &impl Blockstore,
    network: &NetworkChain,
) -> anyhow::Result<()> {
    if let Some(bundle_path) = match std::env::var("FOREST_ACTOR_BUNDLE_PATH") {
        Ok(path) if !path.is_empty() => Some(path),
        _ => None,
    } {
        info!("Loading actor bundle from {bundle_path} set by FOREST_ACTOR_BUNDLE_PATH environment variable");
        load_actor_bundles_from_path(db, network, &bundle_path).await?;
    } else {
        load_actor_bundles_from_server(db, network, &ACTOR_BUNDLES).await?;
    }

    Ok(())
}

pub async fn load_actor_bundles_from_path(
    db: &impl Blockstore,
    network: &NetworkChain,
    bundle_path: impl AsRef<Path>,
) -> anyhow::Result<()> {
    anyhow::ensure!(
        bundle_path.as_ref().is_file(),
        "Bundle file not found at {}",
        bundle_path.as_ref().display()
    );
    let mut car_stream = CarStream::new(tokio::io::BufReader::new(
        tokio::fs::File::open(bundle_path.as_ref()).await?,
    ))
    .await?;

    // Validate the bundle
    let roots = HashSet::from_iter(car_stream.header.roots.iter());
    for ActorBundleInfo {
        manifest, network, ..
    } in ACTOR_BUNDLES.iter().filter(|bundle| {
        // Comparing only the discriminant is enough. All devnets share the same
        // actor bundle.
        discriminant(network) == discriminant(&bundle.network)
    }) {
        anyhow::ensure!(
                roots.contains(manifest),
                "actor {manifest} for {network} is missing from {}, try regenerating the bundle with `forest-tool state-migration actor-bundle`", bundle_path.as_ref().display());
    }

    // Load into DB
    while let Some(CarBlock { cid, data }) = car_stream.try_next().await? {
        db.put_keyed(&cid, &data)?;
    }

    Ok(())
}

/// Loads the missing actor bundle, returns the CIDs of the loaded bundles.
pub async fn load_actor_bundles_from_server(
    db: &impl Blockstore,
    network: &NetworkChain,
    bundles: &[ActorBundleInfo],
) -> anyhow::Result<Vec<Cid>> {
    FuturesUnordered::from_iter(
        bundles
            .iter()
            .filter(|bundle| {
                !db.has(&bundle.manifest).unwrap_or(false) &&
                // Comparing only the discriminant is enough. All devnets share the same
                // actor bundle.
                discriminant(network) == discriminant(&bundle.network)
            })
            .map(
                |ActorBundleInfo {
                     manifest: root,
                     url,
                     alt_url,
                     network: _,
                     version: _,
                 }| async move {
                    let response = if let Ok(response) = http_get(url).await {
                        response
                    } else {
                        warn!("failed to download bundle from primary URL, trying alternative URL");
                        http_get(alt_url).await?
                    };
                    let bytes = response.bytes().await?;
                    let header = load_car(db, Cursor::new(bytes)).await?;
                    ensure!(header.roots.len() == 1);
                    ensure!(header.roots.first() == root);
                    Ok(*header.roots.first())
                },
            ),
    )
    .try_collect::<Vec<_>>()
    .await
}