use std::fmt;
use std::fs::{remove_file, File, OpenOptions};
use std::io::{copy, Read, Seek, SeekFrom};
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::ops;
use std::path::Path;
use std::sync::{Arc, RwLock};
use anyhow::{Context, Result};
use log::warn;
use memmap2::MmapOptions;
use positioned_io::{ReadAt, WriteAt};
use rayon::iter::*;
use rayon::prelude::*;
use tempfile::tempfile;
use typenum::marker_traits::Unsigned;
use crate::hash::Algorithm;
use crate::merkle::{
get_merkle_tree_cache_size, get_merkle_tree_leafs, get_merkle_tree_len, log2_pow2, next_pow2,
Element,
};
use crate::store::{ExternalReader, Store, StoreConfig, BUILD_CHUNK_NODES};
pub struct LevelCacheStore<E: Element, R: Read + Send + Sync> {
len: usize,
elem_len: usize,
file: File,
data_width: usize,
cache_index_start: usize,
loaded_from_disk: bool,
store_size: usize,
reader: Option<ExternalReader<R>>,
_e: PhantomData<E>,
}
impl<E: Element, R: Read + Send + Sync> fmt::Debug for LevelCacheStore<E, R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LevelCacheStore")
.field("len", &self.len)
.field("elem_len", &self.len)
.field("data_width", &self.data_width)
.field("loaded_from_disk", &self.loaded_from_disk)
.field("cache_index_start", &self.cache_index_start)
.field("store_size", &self.store_size)
.finish()
}
}
impl<E: Element, R: Read + Send + Sync> LevelCacheStore<E, R> {
pub fn new_from_disk_with_reader(
store_range: usize,
branches: usize,
config: &StoreConfig,
reader: ExternalReader<R>,
) -> Result<Self> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
ensure!(Path::new(&data_path).exists(), "[LevelCacheStore - new_from_disk_with_reader] new_from_disk_with_reader constructor can be used only for instantiating already existing storages");
let file = match OpenOptions::new().write(true).read(true).open(&data_path) {
Ok(file) => file,
Err(e) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
warn!("[LevelCacheStore - new_from_disk_with_reader] Permission denied occurred. Try to open storage as read-only");
}
OpenOptions::new()
.write(false)
.read(true)
.open(&data_path)?
}
};
let metadata = file.metadata()?;
let store_size = metadata.len() as usize;
let size = get_merkle_tree_leafs(store_range, branches)?;
ensure!(
size == next_pow2(size),
"Inconsistent merkle tree row_count detected"
);
let store_range = store_range * E::byte_len();
let cache_size =
get_merkle_tree_cache_size(size, branches, config.rows_to_discard)? * E::byte_len();
let cache_index_start = store_range - cache_size;
ensure!(
store_size == cache_size,
"Inconsistent store size detected with external reader ({} != {})",
store_size,
cache_size,
);
Ok(LevelCacheStore {
len: store_range / E::byte_len(),
elem_len: E::byte_len(),
file,
data_width: size,
cache_index_start,
store_size,
loaded_from_disk: false,
reader: Some(reader),
_e: Default::default(),
})
}
pub fn set_external_reader(&mut self, reader: ExternalReader<R>) -> Result<()> {
self.reader = Some(reader);
Ok(())
}
}
impl<E: Element, R: Read + Send + Sync> Store<E> for LevelCacheStore<E, R> {
fn new_with_config(size: usize, branches: usize, config: StoreConfig) -> Result<Self> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
if Path::new(&data_path).exists() {
return Self::new_from_disk(size, branches, &config);
}
let file = OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(data_path)?;
let store_size = E::byte_len() * size;
let leafs = get_merkle_tree_leafs(size, branches)?;
ensure!(
leafs == next_pow2(leafs),
"Inconsistent merkle tree row_count detected"
);
let cache_size =
get_merkle_tree_cache_size(leafs, branches, config.rows_to_discard)? * E::byte_len();
let cache_index_start = store_size - cache_size;
file.set_len(store_size as u64)?;
Ok(LevelCacheStore {
len: 0,
elem_len: E::byte_len(),
file,
data_width: leafs,
cache_index_start,
store_size,
loaded_from_disk: false,
reader: None,
_e: Default::default(),
})
}
fn new(size: usize) -> Result<Self> {
let store_size = E::byte_len() * size;
let file = tempfile()?;
file.set_len(store_size as u64)?;
Ok(LevelCacheStore {
len: 0,
elem_len: E::byte_len(),
file,
data_width: size,
cache_index_start: 0,
store_size,
loaded_from_disk: false,
reader: None,
_e: Default::default(),
})
}
fn new_from_slice_with_config(
size: usize,
branches: usize,
data: &[u8],
config: StoreConfig,
) -> Result<Self> {
ensure!(
data.len() % E::byte_len() == 0,
"data size must be a multiple of {}",
E::byte_len()
);
let mut store = Self::new_with_config(size, branches, config)?;
if !store.loaded_from_disk {
store.store_copy_from_slice(0, data)?;
store.len = data.len() / store.elem_len;
}
Ok(store)
}
fn new_from_slice(size: usize, data: &[u8]) -> Result<Self> {
ensure!(
data.len() % E::byte_len() == 0,
"data size must be a multiple of {}",
E::byte_len()
);
let mut store = Self::new(size)?;
store.store_copy_from_slice(0, data)?;
store.len = data.len() / store.elem_len;
Ok(store)
}
fn new_from_disk(store_range: usize, branches: usize, config: &StoreConfig) -> Result<Self> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
ensure!(Path::new(&data_path).exists(), "[LevelCacheStore] new_from_disk constructor can be used only for instantiating already existing storages");
let file = match OpenOptions::new().write(true).read(true).open(&data_path) {
Ok(file) => file,
Err(e) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
warn!("[LevelCacheStore - new_from_disk] Permission denied occurred. Try to open storage as read-only");
}
OpenOptions::new()
.write(false)
.read(true)
.open(&data_path)?
}
};
let metadata = file.metadata()?;
let store_size = metadata.len() as usize;
let size = get_merkle_tree_leafs(store_range, branches)?;
ensure!(
size == next_pow2(size),
"Inconsistent merkle tree row_count detected"
);
let store_range = store_range * E::byte_len();
let cache_size =
get_merkle_tree_cache_size(size, branches, config.rows_to_discard)? * E::byte_len();
let cache_index_start = store_range - cache_size;
Ok(LevelCacheStore {
len: store_range / E::byte_len(),
elem_len: E::byte_len(),
file,
data_width: size,
cache_index_start,
loaded_from_disk: true,
store_size,
reader: None,
_e: Default::default(),
})
}
fn write_at(&mut self, el: E, index: usize) -> Result<()> {
self.store_copy_from_slice(index * self.elem_len, el.as_ref())?;
self.len = std::cmp::max(self.len, index + 1);
Ok(())
}
fn copy_from_slice(&mut self, buf: &[u8], start: usize) -> Result<()> {
ensure!(
buf.len() % self.elem_len == 0,
"buf size must be a multiple of {}",
self.elem_len
);
self.store_copy_from_slice(start * self.elem_len, buf)?;
self.len = std::cmp::max(self.len, start + buf.len() / self.elem_len);
Ok(())
}
fn read_at(&self, index: usize) -> Result<E> {
let start = index * self.elem_len;
let end = start + self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
Ok(E::from_slice(&self.store_read_range(start, end)?))
}
fn read_into(&self, index: usize, buf: &mut [u8]) -> Result<()> {
let start = index * self.elem_len;
let end = start + self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
self.store_read_into(start, end, buf)
}
fn read_range_into(&self, start: usize, end: usize, buf: &mut [u8]) -> Result<()> {
let start = start * self.elem_len;
let end = end * self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
self.store_read_into(start, end, buf)
}
fn read_range(&self, r: ops::Range<usize>) -> Result<Vec<E>> {
let start = r.start * self.elem_len;
let end = r.end * self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
Ok(self
.store_read_range(start, end)?
.chunks(self.elem_len)
.map(E::from_slice)
.collect())
}
fn len(&self) -> usize {
self.len
}
fn loaded_from_disk(&self) -> bool {
self.loaded_from_disk
}
fn compact(
&mut self,
_branches: usize,
_config: StoreConfig,
_store_version: u32,
) -> Result<bool> {
bail!("Cannot compact this type of Store");
}
fn delete(config: StoreConfig) -> Result<()> {
let path = StoreConfig::data_path(&config.path, &config.id);
remove_file(&path).with_context(|| format!("Failed to delete {:?}", &path))
}
fn is_empty(&self) -> bool {
self.len == 0
}
fn push(&mut self, el: E) -> Result<()> {
let len = self.len;
ensure!(
(len + 1) * self.elem_len <= self.store_size(),
"not enough space, len: {}, E size {}, store len {}",
len,
self.elem_len,
self.store_size()
);
self.write_at(el, len)
}
fn sync(&self) -> Result<()> {
self.file.sync_all().context("failed to sync file")
}
#[allow(unsafe_code)]
fn process_layer<A: Algorithm<E>, U: Unsigned>(
&mut self,
width: usize,
level: usize,
read_start: usize,
write_start: usize,
) -> Result<()> {
let mut mmap = unsafe {
let mut mmap_options = MmapOptions::new();
mmap_options
.offset((write_start * E::byte_len()) as u64)
.len(width * E::byte_len())
.map_mut(&self.file)
}?;
let data_lock = Arc::new(RwLock::new(self));
let branches = U::to_usize();
let shift = log2_pow2(branches);
let write_chunk_width = (BUILD_CHUNK_NODES >> shift) * E::byte_len();
ensure!(BUILD_CHUNK_NODES % branches == 0, "Invalid chunk size");
Vec::from_iter((read_start..read_start + width).step_by(BUILD_CHUNK_NODES))
.into_par_iter()
.zip(mmap.par_chunks_mut(write_chunk_width))
.try_for_each(|(chunk_index, write_mmap)| -> Result<()> {
let chunk_size = std::cmp::min(BUILD_CHUNK_NODES, read_start + width - chunk_index);
let chunk_nodes = {
data_lock
.read()
.expect("[process_layer] couldn't block current thread")
.read_range_internal(chunk_index..chunk_index + chunk_size)?
};
let nodes_size = (chunk_nodes.len() / branches) * E::byte_len();
let hashed_nodes_as_bytes = chunk_nodes.chunks(branches).fold(
Vec::with_capacity(nodes_size),
|mut acc, nodes| {
let h = A::default().multi_node(nodes, level);
acc.extend_from_slice(h.as_ref());
acc
},
);
let hashed_nodes_as_bytes_len = hashed_nodes_as_bytes.len();
ensure!(
hashed_nodes_as_bytes.len() == chunk_size / branches * E::byte_len(),
"Invalid hashed node length"
);
write_mmap[0..hashed_nodes_as_bytes_len].copy_from_slice(&hashed_nodes_as_bytes);
Ok(())
})
}
fn build<A: Algorithm<E>, U: Unsigned>(
&mut self,
leafs: usize,
row_count: usize,
config: Option<StoreConfig>,
) -> Result<E> {
let branches = U::to_usize();
ensure!(
next_pow2(branches) == branches,
"branches MUST be a power of 2"
);
ensure!(Store::len(self) == leafs, "Inconsistent data");
ensure!(leafs % 2 == 0, "Leafs must be a power of two");
let mut level: usize = 0;
let mut width = leafs;
let mut level_node_index = 0;
let config = config.context("LevelCacheStore build requires a valid config")?;
let shift = log2_pow2(branches);
let cache_size = get_merkle_tree_cache_size(leafs, branches, config.rows_to_discard)?;
let cache_index_start = (get_merkle_tree_len(leafs, branches)?) - cache_size;
while width > 1 {
let (read_start, write_start) = if level == 0 {
(0, Store::len(self))
} else if level_node_index < cache_index_start {
(0, width)
} else {
(
level_node_index - cache_index_start,
(level_node_index + width) - cache_index_start,
)
};
self.process_layer::<A, U>(width, level, read_start, write_start)?;
if level_node_index < cache_index_start {
self.front_truncate(&config, width)?;
}
level_node_index += width;
level += 1;
width >>= shift; self.set_len(level_node_index);
}
self.set_len(Store::len(self) + 1);
ensure!(
Store::len(self) == get_merkle_tree_len(leafs, branches)?,
"Invalid merkle tree length"
);
ensure!(row_count == level + 1, "Invalid tree row_count");
self.read_at_internal(self.len() - cache_index_start - 1)
}
}
impl<E: Element, R: Read + Send + Sync> LevelCacheStore<E, R> {
pub fn set_len(&mut self, len: usize) {
self.len = len;
}
pub fn front_truncate(&mut self, config: &StoreConfig, len: usize) -> Result<()> {
let metadata = self.file.metadata()?;
let store_size = metadata.len();
let len = (len * E::byte_len()) as u64;
ensure!(store_size >= len, "Invalid truncation length");
let mut reader = OpenOptions::new()
.read(true)
.open(StoreConfig::data_path(&config.path, &config.id))?;
reader.seek(SeekFrom::Start(len))?;
self.file = OpenOptions::new()
.read(true)
.write(true)
.open(StoreConfig::data_path(&config.path, &config.id))?;
self.file.seek(SeekFrom::Start(0))?;
let written = copy(&mut reader, &mut self.file)?;
ensure!(written == store_size - len, "Failed to copy all data");
self.file.set_len(written)?;
Ok(())
}
pub fn store_size(&self) -> usize {
self.store_size
}
pub fn is_consistent_v1(
store_range: usize,
branches: usize,
config: &StoreConfig,
) -> Result<bool> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
let file = File::open(data_path)?;
let metadata = file.metadata()?;
let store_size = metadata.len() as usize;
let size = get_merkle_tree_leafs(store_range, branches)?;
ensure!(
size == next_pow2(size),
"Inconsistent merkle tree row_count detected"
);
let cache_size =
get_merkle_tree_cache_size(size, branches, config.rows_to_discard)? * E::byte_len();
Ok(store_size == size * E::byte_len() + cache_size)
}
pub fn is_consistent(
store_range: usize,
branches: usize,
config: &StoreConfig,
) -> Result<bool> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
let file = File::open(data_path)?;
let metadata = file.metadata()?;
let store_size = metadata.len() as usize;
let size = get_merkle_tree_leafs(store_range, branches)?;
ensure!(
size == next_pow2(size),
"Inconsistent merkle tree row_count detected"
);
let cache_size =
get_merkle_tree_cache_size(size, branches, config.rows_to_discard)? * E::byte_len();
Ok(store_size == cache_size)
}
pub fn store_read_range(&self, start: usize, end: usize) -> Result<Vec<u8>> {
let read_len = end - start;
let mut read_data = vec![0; read_len];
let mut adjusted_start = start;
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
if start < self.data_width * self.elem_len && self.reader.is_some() {
self.reader
.as_ref()
.unwrap() .read(start, end, &mut read_data)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
end - start,
start
)
})?;
return Ok(read_data);
}
if start >= self.cache_index_start {
let v1 = self.reader.is_none();
adjusted_start = if v1 {
start - self.cache_index_start + (self.data_width * self.elem_len)
} else {
start - self.cache_index_start
};
}
self.file
.read_exact_at(adjusted_start as u64, &mut read_data)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
read_len, start
)
})?;
Ok(read_data)
}
fn store_read_range_internal(&self, start: usize, end: usize) -> Result<Vec<u8>> {
let read_len = end - start;
let mut read_data = vec![0; read_len];
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
self.file
.read_exact_at(start as u64, &mut read_data)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
read_len, start
)
})?;
Ok(read_data)
}
fn read_range_internal(&self, r: ops::Range<usize>) -> Result<Vec<E>> {
let start = r.start * self.elem_len;
let end = r.end * self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
Ok(self
.store_read_range_internal(start, end)?
.chunks(self.elem_len)
.map(E::from_slice)
.collect())
}
fn read_at_internal(&self, index: usize) -> Result<E> {
let start = index * self.elem_len;
let end = start + self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"out of bounds"
);
Ok(E::from_slice(&self.store_read_range_internal(start, end)?))
}
pub fn store_read_into(&self, start: usize, end: usize, buf: &mut [u8]) -> Result<()> {
ensure!(
start <= self.data_width * self.elem_len || start >= self.cache_index_start,
"Invalid read start"
);
if start < self.data_width * self.elem_len && self.reader.is_some() {
self.reader
.as_ref()
.unwrap() .read(start, end, buf)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
end - start,
start
)
})?;
} else {
let adjusted_start = if start >= self.cache_index_start {
if self.reader.is_none() {
start - self.cache_index_start + (self.data_width * self.elem_len)
} else {
start - self.cache_index_start
}
} else {
start
};
self.file
.read_exact_at(adjusted_start as u64, buf)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
end - start,
start
)
})?;
}
Ok(())
}
pub fn store_copy_from_slice(&mut self, start: usize, slice: &[u8]) -> Result<()> {
ensure!(
start + slice.len() <= self.store_size,
"Requested slice too large (max: {})",
self.store_size
);
self.file.write_all_at(start as u64, slice)?;
Ok(())
}
}