use std::fs::{remove_file, File, OpenOptions};
use std::io::{copy, Seek, SeekFrom};
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::ops;
use std::path::Path;
use std::sync::{Arc, RwLock};
use anyhow::{Context, Result};
use log::warn;
use memmap2::MmapOptions;
use positioned_io::{ReadAt, WriteAt};
use rayon::iter::*;
use rayon::prelude::*;
use tempfile::tempfile;
use typenum::marker_traits::Unsigned;
use crate::hash::Algorithm;
use crate::merkle::{
get_merkle_tree_cache_size, get_merkle_tree_leafs, get_merkle_tree_len, log2_pow2, next_pow2,
Element,
};
use crate::store::{Store, StoreConfig, StoreConfigDataVersion, BUILD_CHUNK_NODES};
#[derive(Debug)]
pub struct DiskStore<E: Element> {
len: usize,
elem_len: usize,
_e: PhantomData<E>,
file: File,
loaded_from_disk: bool,
store_size: usize,
}
impl<E: Element> Store<E> for DiskStore<E> {
fn new_with_config(size: usize, branches: usize, config: StoreConfig) -> Result<Self> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
if Path::new(&data_path).exists() {
return Self::new_from_disk(size, branches, &config);
}
let file = OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(data_path)?;
let store_size = E::byte_len() * size;
file.set_len(store_size as u64)?;
Ok(DiskStore {
len: 0,
elem_len: E::byte_len(),
_e: Default::default(),
file,
loaded_from_disk: false,
store_size,
})
}
fn new(size: usize) -> Result<Self> {
let store_size = E::byte_len() * size;
let file = tempfile()?;
file.set_len(store_size as u64)?;
Ok(DiskStore {
len: 0,
elem_len: E::byte_len(),
_e: Default::default(),
file,
loaded_from_disk: false,
store_size,
})
}
fn new_from_slice_with_config(
size: usize,
branches: usize,
data: &[u8],
config: StoreConfig,
) -> Result<Self> {
ensure!(
data.len() % E::byte_len() == 0,
"data size must be a multiple of {}",
E::byte_len()
);
let mut store = Self::new_with_config(size, branches, config)?;
if !store.loaded_from_disk {
store.store_copy_from_slice(0, data)?;
store.len = data.len() / store.elem_len;
}
Ok(store)
}
fn new_from_slice(size: usize, data: &[u8]) -> Result<Self> {
ensure!(
data.len() % E::byte_len() == 0,
"data size must be a multiple of {}",
E::byte_len()
);
let mut store = Self::new(size)?;
store.store_copy_from_slice(0, data)?;
store.len = data.len() / store.elem_len;
Ok(store)
}
fn new_from_disk(size: usize, _branches: usize, config: &StoreConfig) -> Result<Self> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
Self::new_from_disk_with_path(size, &data_path)
}
fn write_at(&mut self, el: E, index: usize) -> Result<()> {
self.store_copy_from_slice(index * self.elem_len, el.as_ref())?;
self.len = std::cmp::max(self.len, index + 1);
Ok(())
}
fn copy_from_slice(&mut self, buf: &[u8], start: usize) -> Result<()> {
ensure!(
buf.len() % self.elem_len == 0,
"buf size must be a multiple of {}",
self.elem_len
);
self.store_copy_from_slice(start * self.elem_len, buf)?;
self.len = std::cmp::max(self.len, start + buf.len() / self.elem_len);
Ok(())
}
fn read_at(&self, index: usize) -> Result<E> {
let start = index * self.elem_len;
let end = start + self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
Ok(E::from_slice(&self.store_read_range(start, end)?))
}
fn read_into(&self, index: usize, buf: &mut [u8]) -> Result<()> {
let start = index * self.elem_len;
let end = start + self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
self.store_read_into(start, end, buf)
}
fn read_range_into(&self, start: usize, end: usize, buf: &mut [u8]) -> Result<()> {
let start = start * self.elem_len;
let end = end * self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
self.store_read_into(start, end, buf)
}
fn read_range(&self, r: ops::Range<usize>) -> Result<Vec<E>> {
let start = r.start * self.elem_len;
let end = r.end * self.elem_len;
let len = self.len * self.elem_len;
ensure!(start < len, "start out of range {} >= {}", start, len);
ensure!(end <= len, "end out of range {} > {}", end, len);
Ok(self
.store_read_range(start, end)?
.chunks(self.elem_len)
.map(E::from_slice)
.collect())
}
fn len(&self) -> usize {
self.len
}
fn loaded_from_disk(&self) -> bool {
self.loaded_from_disk
}
fn compact(
&mut self,
branches: usize,
config: StoreConfig,
store_version: u32,
) -> Result<bool> {
let leafs = get_merkle_tree_leafs(self.len, branches)?;
let data_width = leafs * self.elem_len;
let cache_size =
get_merkle_tree_cache_size(leafs, branches, config.rows_to_discard)? * self.elem_len;
ensure!(
cache_size < self.len * self.elem_len && cache_size != 0,
"Cannot compact with this configuration"
);
let v1 = store_version == StoreConfigDataVersion::One as u32;
let start: u64 = if v1 { data_width as u64 } else { 0 };
let cache_start = self.store_size - cache_size;
let mut reader = OpenOptions::new()
.read(true)
.open(StoreConfig::data_path(&config.path, &config.id))?;
reader.seek(SeekFrom::Start(cache_start as u64))?;
self.file = OpenOptions::new()
.read(true)
.write(true)
.open(StoreConfig::data_path(&config.path, &config.id))?;
self.file.seek(SeekFrom::Start(start))?;
let written = copy(&mut reader, &mut self.file)?;
ensure!(written == cache_size as u64, "Failed to copy all data");
if v1 {
self.file.set_len((data_width + cache_size) as u64)?;
self.len = (data_width + cache_size) / self.elem_len;
} else {
self.file.set_len(cache_size as u64)?;
self.len = cache_size / self.elem_len;
}
self.sync()?;
let metadata = self.file.metadata()?;
let store_size = metadata.len() as usize;
ensure!(
self.len * self.elem_len == store_size,
"Inconsistent metadata detected"
);
Ok(true)
}
fn delete(config: StoreConfig) -> Result<()> {
let path = StoreConfig::data_path(&config.path, &config.id);
remove_file(&path).with_context(|| format!("Failed to delete {:?}", &path))
}
fn is_empty(&self) -> bool {
self.len == 0
}
fn push(&mut self, el: E) -> Result<()> {
let len = self.len;
ensure!(
(len + 1) * self.elem_len <= self.store_size(),
"not enough space, len: {}, E size {}, store len {}",
len,
self.elem_len,
self.store_size()
);
self.write_at(el, len)
}
fn sync(&self) -> Result<()> {
self.file.sync_all().context("failed to sync file")
}
#[allow(unsafe_code)]
fn process_layer<A: Algorithm<E>, U: Unsigned>(
&mut self,
width: usize,
level: usize,
read_start: usize,
write_start: usize,
) -> Result<()> {
let mut mmap = unsafe {
let mut mmap_options = MmapOptions::new();
mmap_options
.offset((write_start * E::byte_len()) as u64)
.len(width * E::byte_len())
.map_mut(&self.file)
}?;
let data_lock = Arc::new(RwLock::new(self));
let branches = U::to_usize();
let shift = log2_pow2(branches);
let write_chunk_width = (BUILD_CHUNK_NODES >> shift) * E::byte_len();
ensure!(BUILD_CHUNK_NODES % branches == 0, "Invalid chunk size");
Vec::from_iter((read_start..read_start + width).step_by(BUILD_CHUNK_NODES))
.into_par_iter()
.zip(mmap.par_chunks_mut(write_chunk_width))
.try_for_each(|(chunk_index, write_mmap)| -> Result<()> {
let chunk_size = std::cmp::min(BUILD_CHUNK_NODES, read_start + width - chunk_index);
let chunk_nodes = {
data_lock
.read()
.expect("[process_layer] error occurred while thread blocking")
.read_range(chunk_index..chunk_index + chunk_size)?
};
let nodes_size = (chunk_nodes.len() / branches) * E::byte_len();
let hashed_nodes_as_bytes = chunk_nodes.chunks(branches).fold(
Vec::with_capacity(nodes_size),
|mut acc, nodes| {
let h = A::default().multi_node(nodes, level);
acc.extend_from_slice(h.as_ref());
acc
},
);
let hashed_nodes_as_bytes_len = hashed_nodes_as_bytes.len();
ensure!(
hashed_nodes_as_bytes.len() == chunk_size / branches * E::byte_len(),
"Invalid hashed node length"
);
write_mmap[0..hashed_nodes_as_bytes_len].copy_from_slice(&hashed_nodes_as_bytes);
Ok(())
})
}
fn build<A: Algorithm<E>, U: Unsigned>(
&mut self,
leafs: usize,
row_count: usize,
_config: Option<StoreConfig>,
) -> Result<E> {
let branches = U::to_usize();
ensure!(
next_pow2(branches) == branches,
"branches MUST be a power of 2"
);
ensure!(Store::len(self) == leafs, "Inconsistent data");
ensure!(leafs % 2 == 0, "Leafs must be a power of two");
let mut level: usize = 0;
let mut width = leafs;
let mut level_node_index = 0;
let shift = log2_pow2(branches);
while width > 1 {
let (read_start, write_start) = if level == 0 {
(0, Store::len(self))
} else {
(level_node_index, level_node_index + width)
};
self.process_layer::<A, U>(width, level, read_start, write_start)?;
level_node_index += width;
level += 1;
width >>= shift; self.set_len(Store::len(self) + width);
}
ensure!(
Store::len(self) == get_merkle_tree_len(leafs, branches)?,
"Invalid merkle tree length"
);
ensure!(row_count == level + 1, "Invalid tree row_count");
self.last()
}
}
impl<E: Element> DiskStore<E> {
pub fn new_from_disk_with_path<P: AsRef<Path>>(size: usize, data_path: P) -> Result<Self> {
ensure!(data_path.as_ref().exists(), "[DiskStore] new_from_disk constructor can be used only for instantiating already existing storages");
let file = match OpenOptions::new().write(true).read(true).open(&data_path) {
Ok(file) => file,
Err(e) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
warn!(
"[DiskStore] Permission denied occurred. Try to open storage as read-only"
);
}
OpenOptions::new()
.write(false)
.read(true)
.open(&data_path)?
}
};
let metadata = file.metadata()?;
let store_size = metadata.len() as usize;
ensure!(
store_size == size * E::byte_len(),
"Invalid formatted file provided. Expected {} bytes, found {} bytes",
size * E::byte_len(),
store_size
);
Ok(DiskStore {
len: size,
elem_len: E::byte_len(),
_e: Default::default(),
file,
loaded_from_disk: true,
store_size,
})
}
fn set_len(&mut self, len: usize) {
self.len = len;
}
pub fn is_consistent(
store_range: usize,
_branches: usize,
config: &StoreConfig,
) -> Result<bool> {
let data_path = StoreConfig::data_path(&config.path, &config.id);
let file = File::open(&data_path)?;
let metadata = file.metadata()?;
let store_size = metadata.len() as usize;
Ok(store_size == store_range * E::byte_len())
}
pub fn store_size(&self) -> usize {
self.store_size
}
pub fn store_read_range(&self, start: usize, end: usize) -> Result<Vec<u8>> {
let read_len = end - start;
let mut read_data = vec![0; read_len];
self.file
.read_exact_at(start as u64, &mut read_data)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
read_len, start
)
})?;
ensure!(read_data.len() == read_len, "Failed to read the full range");
Ok(read_data)
}
pub fn store_read_into(&self, start: usize, end: usize, buf: &mut [u8]) -> Result<()> {
self.file
.read_exact_at(start as u64, buf)
.with_context(|| {
format!(
"failed to read {} bytes from file at offset {}",
end - start,
start
)
})?;
Ok(())
}
pub fn store_copy_from_slice(&mut self, start: usize, slice: &[u8]) -> Result<()> {
ensure!(
start + slice.len() <= self.store_size,
"Requested slice too large (max: {})",
self.store_size
);
self.file.write_all_at(start as u64, slice)?;
Ok(())
}
}