use super::{CacheKey, ZstdFrameCache};
use crate::blocks::{Tipset, TipsetKey};
use crate::db::car::plain::write_skip_frame_header_async;
use crate::db::car::RandomAccessFileReader;
use crate::utils::db::car_stream::{CarBlock, CarHeader};
use crate::utils::encoding::from_slice_with_fallback;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use ahash::{HashMap, HashMapExt};
use bytes::{buf::Writer, BufMut as _, Bytes, BytesMut};
use cid::Cid;
use futures::{Stream, TryStream, TryStreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use nunny::Vec as NonEmpty;
use parking_lot::{Mutex, RwLock};
use positioned_io::{Cursor, ReadAt, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::path::Path;
use std::sync::Arc;
use std::task::Poll;
use std::{
io,
io::{Read, Write},
};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Encoder as _};
use unsigned_varint::codec::UviBytes;
#[cfg(feature = "benchmark-private")]
pub mod index;
#[cfg(not(feature = "benchmark-private"))]
mod index;
pub const FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst";
pub const DEFAULT_FOREST_CAR_FRAME_SIZE: usize = 8000_usize.next_power_of_two();
pub const DEFAULT_FOREST_CAR_COMPRESSION_LEVEL: u16 = zstd::DEFAULT_COMPRESSION_LEVEL as _;
const ZSTD_SKIP_FRAME_LEN: u64 = 8;
pub struct ForestCar<ReaderT> {
cache_key: CacheKey,
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
frame_cache: Arc<Mutex<ZstdFrameCache>>,
write_cache: Arc<RwLock<ahash::HashMap<Cid, Vec<u8>>>>,
roots: NonEmpty<Cid>,
}
impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
pub fn new(reader: ReaderT) -> io::Result<ForestCar<ReaderT>> {
let (header, footer) = Self::validate_car(&reader)?;
let indexed = index::Reader::new(positioned_io::Slice::new(reader, footer.index, None))?;
Ok(ForestCar {
cache_key: 0,
indexed,
frame_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
write_cache: Arc::new(RwLock::new(ahash::HashMap::default())),
roots: header.roots,
})
}
pub fn is_valid(reader: &ReaderT) -> bool {
Self::validate_car(reader).is_ok()
}
fn validate_car(reader: &ReaderT) -> io::Result<(CarHeader, ForestCarFooter)> {
let mut cursor = SizeCursor::new(&reader);
cursor.seek(SeekFrom::End(-(ForestCarFooter::SIZE as i64)))?;
let mut footer_buffer = [0; ForestCarFooter::SIZE];
cursor.read_exact(&mut footer_buffer)?;
let footer = ForestCarFooter::try_from_le_bytes(footer_buffer).ok_or_else(|| {
invalid_data(format!(
"not recognizable as a `{}` file",
FOREST_CAR_FILE_EXTENSION
))
})?;
let cursor = Cursor::new_pos(&reader, 0);
let mut header_zstd_frame = decode_zstd_single_frame(cursor)?;
let block_frame = UviBytes::<Bytes>::default()
.decode(&mut header_zstd_frame)?
.ok_or_else(|| invalid_data("malformed uvibytes"))?;
let header = from_slice_with_fallback::<CarHeader>(&block_frame)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok((header, footer))
}
pub fn roots(&self) -> &NonEmpty<Cid> {
&self.roots
}
pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
Tipset::load_required(self, &TipsetKey::from(self.roots().clone()))
}
pub fn into_dyn(self) -> ForestCar<Box<dyn super::RandomAccessFileReader>> {
ForestCar {
cache_key: self.cache_key,
indexed: self.indexed.map(|slice| {
let offset = slice.offset();
positioned_io::Slice::new(
Box::new(slice.into_inner()) as Box<dyn RandomAccessFileReader>,
offset,
None,
)
}),
frame_cache: self.frame_cache,
write_cache: self.write_cache,
roots: self.roots,
}
}
pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
Self {
cache_key: key,
frame_cache: cache,
..self
}
}
}
impl TryFrom<&Path> for ForestCar<EitherMmapOrRandomAccessFile> {
type Error = std::io::Error;
fn try_from(path: &Path) -> std::io::Result<Self> {
ForestCar::new(EitherMmapOrRandomAccessFile::open(path)?)
}
}
impl<ReaderT> Blockstore for ForestCar<ReaderT>
where
ReaderT: ReadAt,
{
#[tracing::instrument(level = "trace", skip(self))]
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
if let Some(value) = self.write_cache.read().get(k) {
return Ok(Some(value.clone()));
}
let indexed = &self.indexed;
for position in indexed.get(*k)?.into_iter() {
let cache_query = self.frame_cache.lock().get(position, self.cache_key, *k);
match cache_query {
Some(Some(val)) => return Ok(Some(val)),
Some(None) => {}
None => {
let entire_file = indexed.reader().get_ref(); let cursor = Cursor::new_pos(entire_file, position);
let mut zstd_frame = decode_zstd_single_frame(cursor)?;
let mut block_map = HashMap::new();
while let Some(block_frame) =
UviBytes::<Bytes>::default().decode_eof(&mut zstd_frame)?
{
let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?;
block_map.insert(cid, data);
}
let get_result = block_map.get(k).cloned();
self.frame_cache
.lock()
.put(position, self.cache_key, block_map);
if let Some(value) = get_result {
return Ok(Some(value));
}
}
}
}
Ok(None)
}
#[tracing::instrument(level = "trace", skip(self, block))]
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
debug_assert!(CarBlock {
cid: *k,
data: block.to_vec()
}
.valid());
self.write_cache.write().insert(*k, Vec::from(block));
Ok(())
}
}
fn decode_zstd_single_frame<ReaderT: Read>(reader: ReaderT) -> io::Result<BytesMut> {
let mut zstd_frame = vec![];
zstd::Decoder::new(reader)?
.single_frame()
.read_to_end(&mut zstd_frame)?;
Ok(zstd_frame.into_iter().collect())
}
pub struct Encoder {}
impl Encoder {
pub async fn write(
mut sink: impl AsyncWrite + Unpin,
roots: NonEmpty<Cid>,
mut stream: impl TryStream<Ok = (Vec<Cid>, Bytes), Error = anyhow::Error> + Unpin,
) -> anyhow::Result<()> {
let mut offset = 0;
let mut header_encoder = new_encoder(3)?;
let header = CarHeader { roots, version: 1 };
let mut header_uvi_frame = BytesMut::new();
UviBytes::default().encode(Bytes::from(to_vec(&header)?), &mut header_uvi_frame)?;
header_encoder.write_all(&header_uvi_frame)?;
let header_bytes = header_encoder.finish()?.into_inner().freeze();
sink.write_all(&header_bytes).await?;
let header_len = header_bytes.len();
offset += header_len;
let mut builder = index::Builder::new();
while let Some((cids, zstd_frame)) = stream.try_next().await? {
builder.extend(cids.into_iter().map(|cid| (cid, offset as u64)));
sink.write_all(&zstd_frame).await?;
offset += zstd_frame.len()
}
let writer = builder.into_writer();
write_skip_frame_header_async(&mut sink, writer.written_len().try_into().unwrap()).await?;
writer.write_into(&mut sink).await?;
let footer = ForestCarFooter {
index: offset as u64 + ZSTD_SKIP_FRAME_LEN,
};
sink.write_all(&footer.to_le_bytes()).await?;
Ok(())
}
pub fn compress_stream_default(
stream: impl TryStream<Ok = CarBlock, Error = anyhow::Error>,
) -> impl TryStream<Ok = (Vec<Cid>, Bytes), Error = anyhow::Error> {
Self::compress_stream(
DEFAULT_FOREST_CAR_FRAME_SIZE,
DEFAULT_FOREST_CAR_COMPRESSION_LEVEL,
stream,
)
}
pub fn compress_stream(
zstd_frame_size_tripwire: usize,
zstd_compression_level: u16,
stream: impl TryStream<Ok = CarBlock, Error = anyhow::Error>,
) -> impl TryStream<Ok = (Vec<Cid>, Bytes), Error = anyhow::Error> {
let mut encoder_store = new_encoder(zstd_compression_level);
let mut frame_cids = vec![];
let mut stream = Box::pin(stream.into_stream());
futures::stream::poll_fn(move |cx| {
let encoder = match encoder_store.as_mut() {
Err(e) => {
let dummy_error = io::Error::other("Error already consumed.");
return Poll::Ready(Some(Err(anyhow::Error::from(std::mem::replace(
e,
dummy_error,
)))));
}
Ok(encoder) => encoder,
};
loop {
if compressed_len(encoder) > zstd_frame_size_tripwire {
let cids = std::mem::take(&mut frame_cids);
let frame = finalize_frame(zstd_compression_level, encoder)?;
return Poll::Ready(Some(Ok((cids, frame))));
}
let ret = futures::ready!(stream.as_mut().poll_next(cx));
match ret {
None => {
if compressed_len(encoder) > 0 {
let cids = std::mem::take(&mut frame_cids);
let frame = finalize_frame(zstd_compression_level, encoder)?;
return Poll::Ready(Some(Ok((cids, frame))));
} else {
return Poll::Ready(None);
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
Some(Ok(block)) => {
frame_cids.push(block.cid);
block.write(encoder)?;
encoder.flush()?;
}
}
}
})
}
}
fn invalid_data(inner: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, inner)
}
fn compressed_len(encoder: &zstd::Encoder<'static, Writer<BytesMut>>) -> usize {
encoder.get_ref().get_ref().len()
}
fn finalize_frame(
zstd_compression_level: u16,
encoder: &mut zstd::Encoder<'static, Writer<BytesMut>>,
) -> io::Result<Bytes> {
let prev_encoder = std::mem::replace(encoder, new_encoder(zstd_compression_level)?);
Ok(prev_encoder.finish()?.into_inner().freeze())
}
fn new_encoder(
zstd_compression_level: u16,
) -> io::Result<zstd::Encoder<'static, Writer<BytesMut>>> {
zstd::Encoder::new(BytesMut::new().writer(), i32::from(zstd_compression_level))
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
struct ForestCarFooter {
index: u64,
}
impl ForestCarFooter {
pub const SIZE: usize = 16;
pub fn to_le_bytes(&self) -> [u8; Self::SIZE] {
let footer_data_len: u32 = 8;
let mut buffer = [0; 16];
buffer[0..4].copy_from_slice(&[0x50, 0x2A, 0x4D, 0x18]);
buffer[4..8].copy_from_slice(&footer_data_len.to_le_bytes());
buffer[8..16].copy_from_slice(&self.index.to_le_bytes());
buffer
}
pub fn try_from_le_bytes(bytes: [u8; Self::SIZE]) -> Option<ForestCarFooter> {
let index = u64::from_le_bytes(bytes[8..16].try_into().expect("infallible"));
let footer = ForestCarFooter { index };
if bytes == footer.to_le_bytes() {
Some(footer)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_on;
use nunny::vec as nonempty;
use quickcheck_macros::quickcheck;
fn mk_encoded_car(
zstd_frame_size_tripwire: usize,
zstd_compression_level: u16,
roots: NonEmpty<Cid>,
blocks: NonEmpty<CarBlock>,
) -> Vec<u8> {
block_on(async {
let frame_stream = Encoder::compress_stream(
zstd_frame_size_tripwire,
zstd_compression_level,
futures::stream::iter(blocks.into_iter().map(Ok)),
);
let mut encoded = vec![];
Encoder::write(&mut encoded, roots, frame_stream)
.await
.unwrap();
encoded
})
}
#[quickcheck]
fn forest_car_create_basic(blocks: nunny::Vec<CarBlock>) {
let roots = nonempty!(blocks.first().cid);
let forest_car =
ForestCar::new(mk_encoded_car(1024 * 4, 3, roots.clone(), blocks.clone())).unwrap();
assert_eq!(forest_car.roots(), &roots);
for block in blocks {
assert_eq!(forest_car.get(&block.cid).unwrap(), Some(block.data));
}
}
#[quickcheck]
fn forest_car_create_options(
blocks: nunny::Vec<CarBlock>,
frame_size: usize,
mut compression_level: u16,
) {
compression_level %= 15;
let roots = nonempty!(blocks.first().cid);
let forest_car = ForestCar::new(mk_encoded_car(
frame_size,
compression_level.max(1),
roots.clone(),
blocks.clone(),
))
.unwrap();
assert_eq!(forest_car.roots(), &roots);
for block in blocks {
assert_eq!(forest_car.get(&block.cid).unwrap(), Some(block.data));
}
}
#[quickcheck]
fn forest_car_open_invalid(junk: Vec<u8>) {
assert!(ForestCar::new(junk).is_err());
}
#[quickcheck]
fn forest_footer_roundtrip(footer: ForestCarFooter) {
let footer_recoded = ForestCarFooter::try_from_le_bytes(footer.to_le_bytes());
assert_eq!(footer_recoded, Some(footer));
}
#[test]
fn encode_hash_collisions() {
use cid::multihash::{Code::Identity, MultihashDigest};
let cid_a = Cid::new_v1(0, Identity.digest(&[10]));
let cid_b = Cid::new_v1(0, Identity.digest(&[0]));
assert_ne!(cid_a, cid_b);
assert_eq!(index::hash::summary(&cid_a), index::hash::summary(&cid_b));
let blocks = nonempty![
CarBlock {
cid: cid_a,
data: Vec::from_iter(*b"bill and ben"),
},
CarBlock {
cid: cid_b,
data: Vec::from_iter(*b"the flowerpot men"),
},
];
let forest_car = ForestCar::new(mk_encoded_car(
0,
3,
nonempty![blocks.first().cid],
blocks.clone(),
))
.unwrap();
assert_eq!(forest_car.get(&cid_a).unwrap().unwrap(), blocks[0].data);
assert_eq!(forest_car.get(&cid_b).unwrap().unwrap(), blocks[1].data);
}
}