1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT
//! This module contains the logic for fetching the proofs parameters from the network.
//! As a general rule, the parameters are first fetched from ChainSafe's Cloudflare R2 bucket, if
//! that fails (or is overridden by [`PROOFS_ONLY_IPFS_GATEWAY_ENV`]), the IPFS gateway is used as a fallback.
//!
//! The reason for this is that the IPFS gateway is not as reliable and performant as the centralized solution, which contributed to
//! issues in CI in the past.

use std::{
    io::{self, ErrorKind},
    path::{Path, PathBuf},
    sync::Arc,
    time::Duration,
};

use crate::{
    shim::sector::SectorSize,
    utils::{
        misc::env::is_env_truthy,
        net::{download_ipfs_file_trustlessly, global_http_client},
    },
};
use anyhow::{bail, Context};
use backoff::{future::retry, ExponentialBackoffBuilder};
use futures::{stream::FuturesUnordered, AsyncWriteExt, TryStreamExt};
use tokio::fs::{self};
use tracing::{debug, info, warn};

use super::parameters::{
    check_parameter_file, param_dir, ParameterData, ParameterMap, DEFAULT_PARAMETERS,
    PROOFS_PARAMETER_CACHE_ENV,
};

/// Default IPFS gateway to use for fetching parameters.
/// Set via the [`IPFS_GATEWAY_ENV`] environment variable.
const DEFAULT_IPFS_GATEWAY: &str = "https://proofs.filecoin.io/ipfs/";
/// Domain bound to the Cloudflare R2 bucket.
const CLOUDFLARE_PROOF_PARAMETER_DOMAIN: &str = "filecoin-proof-parameters.chainsafe.dev";

/// If set to 1, enforce using the IPFS gateway for fetching parameters.
const PROOFS_ONLY_IPFS_GATEWAY_ENV: &str = "FOREST_PROOFS_ONLY_IPFS_GATEWAY";

/// Running Forest requires the download of chain's proof parameters which are large files, by default are hosted outside of China and very slow to download there.
/// To get around that, users should set this variable to:
/// <https://proof-parameters.s3.cn-south-1.jdcloud-oss.com/ipfs/>
const IPFS_GATEWAY_ENV: &str = "IPFS_GATEWAY";

/// Sector size options for fetching.
pub enum SectorSizeOpt {
    /// All keys and proofs gen parameters
    All,
    /// Only verification parameters
    Keys,
    /// All keys and proofs gen parameters for a given size
    Size(SectorSize),
}

/// Ensures the parameter files are downloaded to cache dir
pub async fn ensure_params_downloaded() -> anyhow::Result<()> {
    let data_dir = std::env::var(PROOFS_PARAMETER_CACHE_ENV).unwrap_or_default();
    if data_dir.is_empty() {
        anyhow::bail!("Proof parameter data dir is not set");
    }
    get_params_default(Path::new(&data_dir), SectorSizeOpt::Keys, false).await?;

    Ok(())
}

/// Get proofs parameters and all verification keys for a given sector size
/// given a parameter JSON manifest.
pub async fn get_params(
    data_dir: &Path,
    param_json: &str,
    storage_size: SectorSizeOpt,
    dry_run: bool,
) -> Result<(), anyhow::Error> {
    // Just print out the parameters download directory path and exit.
    if dry_run {
        println!("{}", param_dir(data_dir).to_string_lossy());
        return Ok(());
    }

    fs::create_dir_all(param_dir(data_dir)).await?;

    let params: ParameterMap = serde_json::from_str(param_json)?;

    FuturesUnordered::from_iter(
        params
            .into_iter()
            .filter(|(name, info)| match storage_size {
                SectorSizeOpt::Keys => !name.ends_with("params"),
                SectorSizeOpt::Size(size) => {
                    size as u64 == info.sector_size || !name.ends_with(".params")
                }
                SectorSizeOpt::All => true,
            })
            .map(|(name, info)| async move {
                let data_dir_clone = data_dir.to_owned();
                fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await
            }),
    )
    .try_collect::<Vec<_>>()
    .await?;

    Ok(())
}

/// Get proofs parameters and all verification keys for a given sector size
/// using default manifest.
#[inline]
pub async fn get_params_default(
    data_dir: &Path,
    storage_size: SectorSizeOpt,
    dry_run: bool,
) -> Result<(), anyhow::Error> {
    get_params(data_dir, DEFAULT_PARAMETERS, storage_size, dry_run).await
}

async fn fetch_verify_params(
    data_dir: &Path,
    name: &str,
    info: Arc<ParameterData>,
) -> Result<(), anyhow::Error> {
    let path: PathBuf = param_dir(data_dir).join(name);

    match check_parameter_file(&path, &info).await {
        Ok(()) => return Ok(()),
        Err(e) => {
            if let Some(e) = e.downcast_ref::<io::Error>() {
                if e.kind() == ErrorKind::NotFound {
                    // File is missing, download it
                }
            } else {
                warn!("Error checking file: {e:?}");
            }
        }
    }

    if is_env_truthy(PROOFS_ONLY_IPFS_GATEWAY_ENV) {
        fetch_params_ipfs_gateway(&path, &info).await?;
    } else if let Err(e) = fetch_params_cloudflare(name, &path).await {
        warn!("Failed to fetch param file from Cloudflare R2: {e:?}. Falling back to IPFS gateway",);
        fetch_params_ipfs_gateway(&path, &info).await?;
    }

    check_parameter_file(&path, &info).await?;
    Ok(())
}

async fn fetch_params_ipfs_gateway(path: &Path, info: &ParameterData) -> anyhow::Result<()> {
    let gateway = std::env::var(IPFS_GATEWAY_ENV)
        .unwrap_or_else(|_| DEFAULT_IPFS_GATEWAY.to_owned())
        .parse()?;
    info!(
        "Fetching param file {path} from {gateway}",
        path = path.display()
    );
    let backoff = ExponentialBackoffBuilder::default()
        // Up to 30 minutes for downloading the file. This may be drastic,
        // but the gateway proved to be unreliable at times and we
        // don't want to get stuck here. Better to fail fast and retry.
        .with_max_elapsed_time(Some(Duration::from_secs(60 * 30)))
        .build();
    let result = retry(backoff, || async {
        Ok(download_ipfs_file_trustlessly(&info.cid, &gateway, path).await?)
    })
    .await;
    debug!(
        "Done fetching param file {path} from {gateway}",
        path = path.display(),
    );
    result
}

/// Downloads the parameter file from Cloudflare R2 to the given path. It wraps the [`download_from_cloudflare`] function with a retry and timeout mechanisms.
async fn fetch_params_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
    info!("Fetching param file {name} from Cloudflare R2 {CLOUDFLARE_PROOF_PARAMETER_DOMAIN}");
    let backoff = ExponentialBackoffBuilder::default()
        .with_max_elapsed_time(Some(Duration::from_secs(60 * 30)))
        .build();
    let result = retry(backoff, || async {
        Ok(download_from_cloudflare(name, path).await?)
    })
    .await;
    debug!(
        "Done fetching param file {} from Cloudflare",
        path.display()
    );
    result
}

/// Downloads the parameter file from Cloudflare R2 to the given path. In case of an error,
/// the file is not written to the final path to avoid corrupted files.
async fn download_from_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
    let response = global_http_client()
        .get(format!(
            "https://{CLOUDFLARE_PROOF_PARAMETER_DOMAIN}/{name}"
        ))
        .send()
        .await
        .context("Failed to fetch param file from Cloudflare R2")?;

    if !response.status().is_success() {
        bail!(
            "Failed to fetch param file from Cloudflare R2: {:?}",
            response
        );
    }
    // Create a temporary file to write the response to. This is to avoid writing
    // to the final file path in case of an error and ending up with corrupted files.
    //
    // Note that we're using the same directory as the final path to avoid moving the file
    // across filesystems.
    let tmp = tempfile::NamedTempFile::new_in(path.parent().context("No parent dir")?)
        .context("Failed to create temp file")?
        .into_temp_path();

    let reader = response
        .bytes_stream()
        .map_err(std::io::Error::other)
        .into_async_read();

    let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
    futures::io::copy(reader, &mut writer)
        .await
        .context("Failed to write to temp file")?;

    writer.flush().await.context("Failed to flush temp file")?;
    writer.close().await.context("Failed to close temp file")?;

    tmp.persist(path).context("Failed to persist temp file")?;
    Ok(())
}