use std::convert::TryInto;
use std::ops::{self, Neg};
use super::{
power_for_sectors, select_sectors, validate_partition_contains_sectors, BitFieldQueue,
ExpirationQueue, ExpirationSet, SectorOnChainInfo, Sectors, TerminationResult,
};
use anyhow::{anyhow, Context};
use cid::Cid;
use fil_actors_shared::actor_error_v13;
use fil_actors_shared::v13::runtime::Policy;
use fil_actors_shared::v13::{ActorDowncast, Array};
use fvm_ipld_bitfield::BitField;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::tuple::*;
use fvm_shared4::bigint::bigint_ser;
use fvm_shared4::clock::{ChainEpoch, QuantSpec, NO_QUANTIZATION};
use fvm_shared4::econ::TokenAmount;
use fvm_shared4::error::ExitCode;
use fvm_shared4::sector::{SectorSize, StoragePower};
use num_traits::{Signed, Zero};
const PARTITION_EXPIRATION_AMT_BITWIDTH: u32 = 4;
const PARTITION_EARLY_TERMINATION_ARRAY_AMT_BITWIDTH: u32 = 3;
#[derive(Serialize_tuple, Deserialize_tuple, Clone)]
pub struct Partition {
pub sectors: BitField,
pub unproven: BitField,
pub faults: BitField,
pub recoveries: BitField,
pub terminated: BitField,
pub expirations_epochs: Cid, pub early_terminated: Cid, pub live_power: PowerPair,
pub unproven_power: PowerPair,
pub faulty_power: PowerPair,
pub recovering_power: PowerPair,
}
impl Partition {
pub fn new<BS: Blockstore>(store: &BS) -> anyhow::Result<Self> {
let empty_expiration_array =
Array::<Cid, BS>::new_with_bit_width(store, PARTITION_EXPIRATION_AMT_BITWIDTH)
.flush()?;
let empty_early_termination_array = Array::<Cid, BS>::new_with_bit_width(
store,
PARTITION_EARLY_TERMINATION_ARRAY_AMT_BITWIDTH,
)
.flush()?;
Ok(Self {
sectors: BitField::new(),
unproven: BitField::new(),
faults: BitField::new(),
recoveries: BitField::new(),
terminated: BitField::new(),
expirations_epochs: empty_expiration_array,
early_terminated: empty_early_termination_array,
live_power: PowerPair::zero(),
unproven_power: PowerPair::zero(),
faulty_power: PowerPair::zero(),
recovering_power: PowerPair::zero(),
})
}
pub fn live_sectors(&self) -> BitField {
&self.sectors - &self.terminated
}
pub fn active_sectors(&self) -> BitField {
let non_faulty = &self.live_sectors() - &self.faults;
&non_faulty - &self.unproven
}
pub fn active_power(&self) -> PowerPair {
&(&self.live_power - &self.faulty_power) - &self.unproven_power
}
pub fn add_sectors<BS: Blockstore>(
&mut self,
store: &BS,
proven: bool,
sectors: &[SectorOnChainInfo],
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<PowerPair> {
let mut expirations = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load sector expirations"))?;
let (sector_numbers, power, _) = expirations
.add_active_sectors(sectors, sector_size)
.map_err(|e| e.downcast_wrap("failed to record new sector expirations"))?;
self.expirations_epochs = expirations
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed to store sector expirations"))?;
if self.sectors.contains_any(§or_numbers) {
return Err(anyhow!("not all added sectors are new"));
}
self.sectors |= §or_numbers;
self.live_power += &power;
if !proven {
self.unproven_power += &power;
self.unproven |= §or_numbers;
}
self.validate_state()?;
Ok(power)
}
pub fn add_faults<BS: Blockstore>(
&mut self,
store: &BS,
sector_numbers: &BitField,
sectors: &[SectorOnChainInfo],
fault_expiration: ChainEpoch,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<(PowerPair, PowerPair)> {
let mut queue = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load partition queue"))?;
let new_faulty_power = queue
.reschedule_as_faults(fault_expiration, sectors, sector_size)
.map_err(|e| e.downcast_wrap("failed to add faults to partition queue"))?;
self.expirations_epochs = queue.amt.flush()?;
self.faults |= sector_numbers;
self.faulty_power += &new_faulty_power;
let unproven = sector_numbers & &self.unproven;
self.unproven -= &unproven;
let mut power_delta = new_faulty_power.clone().neg();
let unproven_infos = select_sectors(sectors, &unproven)
.map_err(|e| e.downcast_wrap("failed to select unproven sectors"))?;
if !unproven_infos.is_empty() {
let lost_unproven_power = power_for_sectors(sector_size, &unproven_infos);
self.unproven_power -= &lost_unproven_power;
power_delta += &lost_unproven_power;
}
self.validate_state()?;
Ok((power_delta, new_faulty_power))
}
pub fn record_faults<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
sector_numbers: &BitField,
fault_expiration_epoch: ChainEpoch,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<(BitField, PowerPair, PowerPair)> {
validate_partition_contains_sectors(self, sector_numbers)
.map_err(|e| actor_error_v13!(illegal_argument; "failed fault declaration: {}", e))?;
let retracted_recoveries = &self.recoveries & sector_numbers;
let mut new_faults = sector_numbers - &retracted_recoveries;
new_faults -= &self.terminated;
new_faults -= &self.faults;
let new_fault_sectors = sectors
.load_sector(&new_faults)
.map_err(|e| e.wrap("failed to load fault sectors"))?;
let (power_delta, new_faulty_power) = if !new_fault_sectors.is_empty() {
self.add_faults(
store,
&new_faults,
&new_fault_sectors,
fault_expiration_epoch,
sector_size,
quant,
)
.map_err(|e| e.downcast_wrap("failed to add faults"))?
} else {
Default::default()
};
let retracted_recovery_sectors = sectors
.load_sector(&retracted_recoveries)
.map_err(|e| e.wrap("failed to load recovery sectors"))?;
if !retracted_recovery_sectors.is_empty() {
let retracted_recovery_power =
power_for_sectors(sector_size, &retracted_recovery_sectors);
self.remove_recoveries(&retracted_recoveries, &retracted_recovery_power);
}
self.validate_state()?;
Ok((new_faults, power_delta, new_faulty_power))
}
pub fn recover_faults<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<PowerPair> {
let recovered_sectors = sectors
.load_sector(&self.recoveries)
.map_err(|e| e.wrap("failed to load recovered sectors"))?;
let mut queue = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| anyhow!("failed to load partition queue: {:?}", e))?;
let power = queue
.reschedule_recovered(recovered_sectors, sector_size)
.map_err(|e| e.downcast_wrap("failed to reschedule faults in partition queue"))?;
self.expirations_epochs = queue.amt.flush()?;
self.faults -= &self.recoveries;
self.recoveries = BitField::new();
self.faulty_power -= &power;
self.recovering_power -= &power;
self.validate_state()?;
Ok(power)
}
pub fn activate_unproven(&mut self) -> PowerPair {
self.unproven = BitField::default();
std::mem::take(&mut self.unproven_power)
}
pub fn declare_faults_recovered<BS: Blockstore>(
&mut self,
sectors: &Sectors<'_, BS>,
sector_size: SectorSize,
sector_numbers: &BitField,
) -> anyhow::Result<()> {
validate_partition_contains_sectors(self, sector_numbers)
.map_err(|e| actor_error_v13!(illegal_argument; "failed fault declaration: {}", e))?;
let mut recoveries = sector_numbers & &self.faults;
recoveries -= &self.recoveries;
let recovery_sectors = sectors
.load_sector(&recoveries)
.map_err(|e| e.wrap("failed to load recovery sectors"))?;
self.recoveries |= &recoveries;
let power = power_for_sectors(sector_size, &recovery_sectors);
self.recovering_power += &power;
self.validate_state()?;
Ok(())
}
pub fn remove_recoveries(&mut self, sector_numbers: &BitField, power: &PowerPair) {
if sector_numbers.is_empty() {
return;
}
self.recoveries -= sector_numbers;
self.recovering_power -= power;
}
pub fn reschedule_expirations<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
new_expiration: ChainEpoch,
sector_numbers: &BitField,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<Vec<SectorOnChainInfo>> {
let present = sector_numbers & &self.sectors;
let live = &present - &self.terminated;
let active = &live - &self.faults;
let sector_infos = sectors.load_sector(&active)?;
let mut expirations = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load sector expirations"))?;
expirations.reschedule_expirations(new_expiration, §or_infos, sector_size)?;
self.expirations_epochs = expirations.amt.flush()?;
self.validate_state()?;
Ok(sector_infos)
}
pub fn replace_sectors<BS: Blockstore>(
&mut self,
store: &BS,
old_sectors: &[SectorOnChainInfo],
new_sectors: &[SectorOnChainInfo],
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<(PowerPair, TokenAmount)> {
let mut expirations = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load sector expirations"))?;
let (old_sector_numbers, new_sector_numbers, power_delta, pledge_delta) = expirations
.replace_sectors(old_sectors, new_sectors, sector_size)
.map_err(|e| e.downcast_wrap("failed to replace sector expirations"))?;
self.expirations_epochs = expirations
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed to save sector expirations"))?;
let active = self.active_sectors();
let all_active = active.contains_all(&old_sector_numbers);
if !all_active {
return Err(anyhow!(
"refusing to replace inactive sectors in {:?} (active: {:?})",
old_sector_numbers,
active
));
}
self.sectors -= &old_sector_numbers;
self.sectors |= &new_sector_numbers;
self.live_power += &power_delta;
self.validate_state()?;
Ok((power_delta, pledge_delta))
}
pub fn record_early_termination<BS: Blockstore>(
&mut self,
store: &BS,
epoch: ChainEpoch,
sectors: &BitField,
) -> anyhow::Result<()> {
let mut early_termination_queue =
BitFieldQueue::new(store, &self.early_terminated, NO_QUANTIZATION)
.map_err(|e| e.downcast_wrap("failed to load early termination queue"))?;
early_termination_queue
.add_to_queue(epoch, sectors)
.map_err(|e| e.downcast_wrap("failed to add to early termination queue"))?;
self.early_terminated = early_termination_queue
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed to save early termination queue"))?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn terminate_sectors<BS: Blockstore>(
&mut self,
policy: &Policy,
store: &BS,
sectors: &Sectors<'_, BS>,
epoch: ChainEpoch,
sector_numbers: &BitField,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<ExpirationSet> {
let live_sectors = self.live_sectors();
if !live_sectors.contains_all(sector_numbers) {
return Err(
actor_error_v13!(illegal_argument, "can only terminate live sectors").into(),
);
}
let sector_infos = sectors.load_sector(sector_numbers)?;
let mut expirations = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load sector expirations"))?;
let (mut removed, removed_recovering) = expirations
.remove_sectors(
policy,
§or_infos,
&self.faults,
&self.recoveries,
sector_size,
)
.map_err(|e| e.downcast_wrap("failed to remove sector expirations"))?;
self.expirations_epochs = expirations
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed to save sector expirations"))?;
let removed_sectors = &removed.on_time_sectors | &removed.early_sectors;
self.record_early_termination(store, epoch, &removed_sectors)
.map_err(|e| e.downcast_wrap("failed to record early sector termination"))?;
let unproven_nos = &removed_sectors & &self.unproven;
self.faults -= &removed_sectors;
self.recoveries -= &removed_sectors;
self.terminated |= &removed_sectors;
self.live_power -= &removed.active_power;
self.live_power -= &removed.faulty_power;
self.faulty_power -= &removed.faulty_power;
self.recovering_power -= &removed_recovering;
self.unproven -= &unproven_nos;
let unproven_infos = select_sectors(§or_infos, &unproven_nos)?;
let removed_unproven_power = power_for_sectors(sector_size, &unproven_infos);
self.unproven_power -= &removed_unproven_power;
removed.active_power -= &removed_unproven_power;
self.validate_state()?;
Ok(removed)
}
pub fn pop_expired_sectors<BS: Blockstore>(
&mut self,
store: &BS,
until: ChainEpoch,
quant: QuantSpec,
) -> anyhow::Result<ExpirationSet> {
if !self.unproven.is_empty() {
return Err(anyhow!(
"cannot pop expired sectors from a partition with unproven sectors"
));
}
let mut expirations = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load expiration queue"))?;
let popped = expirations.pop_until(until).map_err(|e| {
e.downcast_wrap(format!("failed to pop expiration queue until {}", until))
})?;
self.expirations_epochs = expirations.amt.flush()?;
let expired_sectors = &popped.on_time_sectors | &popped.early_sectors;
if !self.recoveries.is_empty() {
return Err(anyhow!(
"unexpected recoveries while processing expirations"
));
}
if !self.recovering_power.is_zero() {
return Err(anyhow!(
"unexpected recovering power while processing expirations"
));
}
if self.terminated.contains_any(&expired_sectors) {
return Err(anyhow!("expiring sectors already terminated"));
}
self.terminated |= &expired_sectors;
self.faults -= &expired_sectors;
self.live_power -= &(&popped.active_power + &popped.faulty_power);
self.faulty_power -= &popped.faulty_power;
self.record_early_termination(store, until, &popped.early_sectors)
.map_err(|e| e.downcast_wrap("failed to record early terminations"))?;
self.validate_state()?;
Ok(popped)
}
pub fn record_missed_post<BS: Blockstore>(
&mut self,
store: &BS,
fault_expiration: ChainEpoch,
quant: QuantSpec,
) -> anyhow::Result<(PowerPair, PowerPair, PowerPair)> {
let mut queue = ExpirationQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load partition queue"))?;
queue
.reschedule_all_as_faults(fault_expiration)
.map_err(|e| e.downcast_wrap("failed to reschedule all as faults"))?;
self.expirations_epochs = queue.amt.flush()?;
let new_faulty_power = &self.live_power - &self.faulty_power;
let penalized_power = &self.recovering_power + &new_faulty_power;
let power_delta = &self.unproven_power - &new_faulty_power;
let all_faults = self.live_sectors();
self.faults = all_faults;
self.recoveries = BitField::new();
self.unproven = BitField::new();
self.faulty_power = self.live_power.clone();
self.recovering_power = PowerPair::zero();
self.unproven_power = PowerPair::zero();
self.validate_state()?;
Ok((power_delta, penalized_power, new_faulty_power))
}
pub fn pop_early_terminations<BS: Blockstore>(
&mut self,
store: &BS,
max_sectors: u64,
) -> anyhow::Result<(TerminationResult, bool)> {
let mut early_terminated_queue =
BitFieldQueue::new(store, &self.early_terminated, NO_QUANTIZATION)?;
let mut processed = Vec::<u64>::new();
let mut remaining: Option<(BitField, ChainEpoch)> = None;
let mut result = TerminationResult::new();
result.partitions_processed = 1;
early_terminated_queue
.amt
.for_each_while(|i, sectors| {
let epoch: ChainEpoch = i.try_into()?;
let count = sectors.len();
let limit = max_sectors - result.sectors_processed;
let to_process = if limit < count {
let to_process = sectors
.slice(0, limit)
.context("expected more sectors in bitfield")?;
let rest = sectors - &to_process;
remaining = Some((rest, epoch));
result.sectors_processed += limit;
to_process
} else {
processed.push(i);
result.sectors_processed += count;
sectors.clone()
};
result.sectors.insert(epoch, to_process);
let keep_going = result.sectors_processed < max_sectors;
Ok(keep_going)
})
.map_err(|e| e.downcast_wrap("failed to walk early terminations queue"))?;
early_terminated_queue
.amt
.batch_delete(processed, true)
.map_err(|e| {
e.downcast_wrap("failed to remove entries from early terminations queue")
})?;
if let Some((remaining_sectors, remaining_epoch)) = remaining.take() {
early_terminated_queue
.amt
.set(remaining_epoch as u64, remaining_sectors)
.map_err(|e| {
e.downcast_wrap("failed to update remaining entry early terminations queue")
})?;
}
self.early_terminated = early_terminated_queue
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed to store early terminations queue"))?;
self.validate_state()?;
let has_more = early_terminated_queue.amt.count() > 0;
Ok((result, has_more))
}
pub fn record_skipped_faults<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
sector_size: SectorSize,
quant: QuantSpec,
fault_expiration: ChainEpoch,
skipped: &BitField,
) -> anyhow::Result<(PowerPair, PowerPair, PowerPair, bool)> {
if skipped.is_empty() {
return Ok((
PowerPair::zero(),
PowerPair::zero(),
PowerPair::zero(),
false,
));
}
if !self.sectors.contains_all(skipped) {
return Err(actor_error_v13!(
illegal_argument,
"skipped faults contains sectors outside partition"
)
.into());
}
let retracted_recoveries = &self.recoveries & skipped;
let retracted_recovery_sectors = sectors
.load_sector(&retracted_recoveries)
.map_err(|e| e.wrap("failed to load sectors"))?;
let retracted_recovery_power = power_for_sectors(sector_size, &retracted_recovery_sectors);
let new_faults = &(skipped - &self.terminated) - &self.faults;
let new_fault_sectors = sectors
.load_sector(&new_faults)
.map_err(|e| e.wrap("failed to load sectors"))?;
let (power_delta, new_fault_power) = self
.add_faults(
store,
&new_faults,
&new_fault_sectors,
fault_expiration,
sector_size,
quant,
)
.map_err(|e| {
e.downcast_default(ExitCode::USR_ILLEGAL_STATE, "failed to add skipped faults")
})?;
self.remove_recoveries(&retracted_recoveries, &retracted_recovery_power);
self.validate_state()?;
Ok((
power_delta,
new_fault_power,
retracted_recovery_power,
!new_fault_sectors.is_empty(),
))
}
pub fn validate_power_state(&self) -> anyhow::Result<()> {
if self.live_power.raw.is_negative() || self.live_power.qa.is_negative() {
return Err(anyhow!("Partition left with negative live power"));
}
if self.unproven_power.raw.is_negative() || self.unproven_power.qa.is_negative() {
return Err(anyhow!("Partition left with negative unproven power"));
}
if self.faulty_power.raw.is_negative() || self.faulty_power.qa.is_negative() {
return Err(anyhow!("Partition left with negative faulty power"));
}
if self.recovering_power.raw.is_negative() || self.recovering_power.qa.is_negative() {
return Err(anyhow!("Partition left with negative recovering power"));
}
if self.unproven_power.raw > self.live_power.raw {
return Err(anyhow!("Partition left with invalid unproven power"));
}
if self.faulty_power.raw > self.live_power.raw {
return Err(anyhow!("Partition left with invalid faulty power"));
}
if self.recovering_power.raw > self.live_power.raw
|| self.recovering_power.raw > self.faulty_power.raw
{
return Err(anyhow!("Partition left with invalid recovering power"));
}
Ok(())
}
pub fn validate_bf_state(&self) -> anyhow::Result<()> {
let mut merge = &self.unproven | &self.faults;
if self.terminated.contains_any(&merge) {
return Err(anyhow!(
"Partition left with terminated sectors in multiple states"
));
}
merge |= &self.terminated;
if !self.sectors.contains_all(&merge) {
return Err(anyhow!("Partition left with invalid sector state"));
}
if !self.faults.contains_all(&self.recoveries) {
return Err(anyhow!("Partition left with invalid recovery state"));
}
Ok(())
}
pub fn validate_state(&self) -> anyhow::Result<()> {
self.validate_power_state()?;
self.validate_bf_state()?;
Ok(())
}
}
#[derive(Serialize_tuple, Deserialize_tuple, Eq, PartialEq, Clone, Debug, Default)]
pub struct PowerPair {
#[serde(with = "bigint_ser")]
pub raw: StoragePower,
#[serde(with = "bigint_ser")]
pub qa: StoragePower,
}
impl PowerPair {
pub fn new(raw: StoragePower, qa: StoragePower) -> Self {
Self { raw, qa }
}
pub fn zero() -> Self {
Default::default()
}
pub fn is_zero(&self) -> bool {
self.raw.is_zero() && self.qa.is_zero()
}
}
impl ops::Add for &PowerPair {
type Output = PowerPair;
fn add(self, rhs: Self) -> Self::Output {
PowerPair {
raw: &self.raw + &rhs.raw,
qa: &self.qa + &rhs.qa,
}
}
}
impl ops::Add for PowerPair {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
&self + &rhs
}
}
impl ops::AddAssign<&Self> for PowerPair {
fn add_assign(&mut self, rhs: &Self) {
*self = &*self + rhs;
}
}
impl ops::Sub for &PowerPair {
type Output = PowerPair;
fn sub(self, rhs: Self) -> Self::Output {
PowerPair {
raw: &self.raw - &rhs.raw,
qa: &self.qa - &rhs.qa,
}
}
}
impl ops::Sub for PowerPair {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
&self - &rhs
}
}
impl ops::SubAssign<&Self> for PowerPair {
fn sub_assign(&mut self, rhs: &Self) {
*self = &*self - rhs;
}
}
impl ops::Neg for PowerPair {
type Output = PowerPair;
fn neg(self) -> Self::Output {
PowerPair {
raw: -self.raw,
qa: -self.qa,
}
}
}
impl ops::Neg for &PowerPair {
type Output = PowerPair;
fn neg(self) -> Self::Output {
-self.clone()
}
}