use std::{
io::{self, ErrorKind},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use crate::{
shim::sector::SectorSize,
utils::{
misc::env::is_env_truthy,
net::{download_ipfs_file_trustlessly, global_http_client},
},
};
use anyhow::{bail, Context};
use backoff::{future::retry, ExponentialBackoffBuilder};
use futures::{stream::FuturesUnordered, AsyncWriteExt, TryStreamExt};
use tokio::fs::{self};
use tracing::{debug, info, warn};
use super::parameters::{
check_parameter_file, param_dir, ParameterData, ParameterMap, DEFAULT_PARAMETERS,
PROOFS_PARAMETER_CACHE_ENV,
};
const DEFAULT_IPFS_GATEWAY: &str = "https://proofs.filecoin.io/ipfs/";
const CLOUDFLARE_PROOF_PARAMETER_DOMAIN: &str = "filecoin-proof-parameters.chainsafe.dev";
const PROOFS_ONLY_IPFS_GATEWAY_ENV: &str = "FOREST_PROOFS_ONLY_IPFS_GATEWAY";
const IPFS_GATEWAY_ENV: &str = "IPFS_GATEWAY";
pub enum SectorSizeOpt {
All,
Keys,
Size(SectorSize),
}
pub async fn ensure_params_downloaded() -> anyhow::Result<()> {
let data_dir = std::env::var(PROOFS_PARAMETER_CACHE_ENV).unwrap_or_default();
if data_dir.is_empty() {
anyhow::bail!("Proof parameter data dir is not set");
}
get_params_default(Path::new(&data_dir), SectorSizeOpt::Keys, false).await?;
Ok(())
}
pub async fn get_params(
data_dir: &Path,
param_json: &str,
storage_size: SectorSizeOpt,
dry_run: bool,
) -> Result<(), anyhow::Error> {
if dry_run {
println!("{}", param_dir(data_dir).to_string_lossy());
return Ok(());
}
fs::create_dir_all(param_dir(data_dir)).await?;
let params: ParameterMap = serde_json::from_str(param_json)?;
FuturesUnordered::from_iter(
params
.into_iter()
.filter(|(name, info)| match storage_size {
SectorSizeOpt::Keys => !name.ends_with("params"),
SectorSizeOpt::Size(size) => {
size as u64 == info.sector_size || !name.ends_with(".params")
}
SectorSizeOpt::All => true,
})
.map(|(name, info)| async move {
let data_dir_clone = data_dir.to_owned();
fetch_verify_params(&data_dir_clone, &name, Arc::new(info)).await
}),
)
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
#[inline]
pub async fn get_params_default(
data_dir: &Path,
storage_size: SectorSizeOpt,
dry_run: bool,
) -> Result<(), anyhow::Error> {
get_params(data_dir, DEFAULT_PARAMETERS, storage_size, dry_run).await
}
async fn fetch_verify_params(
data_dir: &Path,
name: &str,
info: Arc<ParameterData>,
) -> Result<(), anyhow::Error> {
let path: PathBuf = param_dir(data_dir).join(name);
match check_parameter_file(&path, &info).await {
Ok(()) => return Ok(()),
Err(e) => {
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == ErrorKind::NotFound {
}
} else {
warn!("Error checking file: {e:?}");
}
}
}
if is_env_truthy(PROOFS_ONLY_IPFS_GATEWAY_ENV) {
fetch_params_ipfs_gateway(&path, &info).await?;
} else if let Err(e) = fetch_params_cloudflare(name, &path).await {
warn!("Failed to fetch param file from Cloudflare R2: {e:?}. Falling back to IPFS gateway",);
fetch_params_ipfs_gateway(&path, &info).await?;
}
check_parameter_file(&path, &info).await?;
Ok(())
}
async fn fetch_params_ipfs_gateway(path: &Path, info: &ParameterData) -> anyhow::Result<()> {
let gateway = std::env::var(IPFS_GATEWAY_ENV)
.unwrap_or_else(|_| DEFAULT_IPFS_GATEWAY.to_owned())
.parse()?;
info!(
"Fetching param file {path} from {gateway}",
path = path.display()
);
let backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(60 * 30)))
.build();
let result = retry(backoff, || async {
Ok(download_ipfs_file_trustlessly(&info.cid, &gateway, path).await?)
})
.await;
debug!(
"Done fetching param file {path} from {gateway}",
path = path.display(),
);
result
}
async fn fetch_params_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
info!("Fetching param file {name} from Cloudflare R2 {CLOUDFLARE_PROOF_PARAMETER_DOMAIN}");
let backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(60 * 30)))
.build();
let result = retry(backoff, || async {
Ok(download_from_cloudflare(name, path).await?)
})
.await;
debug!(
"Done fetching param file {} from Cloudflare",
path.display()
);
result
}
async fn download_from_cloudflare(name: &str, path: &Path) -> anyhow::Result<()> {
let response = global_http_client()
.get(format!(
"https://{CLOUDFLARE_PROOF_PARAMETER_DOMAIN}/{name}"
))
.send()
.await
.context("Failed to fetch param file from Cloudflare R2")?;
if !response.status().is_success() {
bail!(
"Failed to fetch param file from Cloudflare R2: {:?}",
response
);
}
let tmp = tempfile::NamedTempFile::new_in(path.parent().context("No parent dir")?)
.context("Failed to create temp file")?
.into_temp_path();
let reader = response
.bytes_stream()
.map_err(std::io::Error::other)
.into_async_read();
let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
futures::io::copy(reader, &mut writer)
.await
.context("Failed to write to temp file")?;
writer.flush().await.context("Failed to flush temp file")?;
writer.close().await.context("Failed to close temp file")?;
tmp.persist(path).context("Failed to persist temp file")?;
Ok(())
}