use std::cmp;
use std::collections::BTreeSet;
use anyhow::anyhow;
use cid::multihash::Code;
use cid::Cid;
use fvm_ipld_bitfield::BitField;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::tuple::*;
use fvm_ipld_encoding::CborStore;
use fvm_shared4::clock::ChainEpoch;
use fvm_shared4::econ::TokenAmount;
use fvm_shared4::error::ExitCode;
use fvm_shared4::sector::{PoStProof, SectorSize};
use num_traits::{Signed, Zero};
use fil_actors_shared::actor_error_v14;
use fil_actors_shared::v14::runtime::Policy;
use fil_actors_shared::v14::{ActorDowncast, ActorError, Array, AsActorError};
use crate::v14::SECTORS_AMT_BITWIDTH;
use super::{
BitFieldQueue, ExpirationSet, Partition, PartitionSectorMap, PoStPartition, PowerPair,
QuantSpec, SectorOnChainInfo, Sectors, TerminationResult,
};
pub const DEADLINE_PARTITIONS_AMT_BITWIDTH: u32 = 3;
pub const DEADLINE_EXPIRATIONS_AMT_BITWIDTH: u32 = 5;
pub const DEADLINE_OPTIMISTIC_POST_SUBMISSIONS_AMT_BITWIDTH: u32 = 2;
#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct Deadlines {
pub due: Vec<Cid>, }
impl Deadlines {
pub fn new(policy: &Policy, empty_deadline_cid: Cid) -> Self {
Self {
due: vec![empty_deadline_cid; policy.wpost_period_deadlines as usize],
}
}
pub fn load_deadline<BS: Blockstore>(
&self,
store: &BS,
idx: u64,
) -> Result<Deadline, ActorError> {
let idx = idx as usize;
if idx >= self.due.len() {
return Err(actor_error_v14!(
illegal_argument,
"invalid deadline index {} of {}",
idx,
self.due.len()
));
}
store
.get_cbor(&self.due[idx])
.with_context_code(ExitCode::USR_ILLEGAL_STATE, || {
format!("failed to load deadline {}", idx)
})?
.ok_or_else(|| actor_error_v14!(illegal_argument, "no deadline {}", idx))
}
pub fn for_each<BS: Blockstore>(
&self,
store: &BS,
mut f: impl FnMut(u64, Deadline) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
for i in 0..(self.due.len() as u64) {
let index = i;
let deadline = self.load_deadline(store, index)?;
f(index, deadline)?;
}
Ok(())
}
pub fn update_deadline<BS: Blockstore>(
&mut self,
policy: &Policy,
store: &BS,
deadline_idx: u64,
deadline: &Deadline,
) -> anyhow::Result<()> {
if deadline_idx >= policy.wpost_period_deadlines {
return Err(anyhow!("invalid deadline {}", deadline_idx));
}
deadline.validate_state()?;
self.due[deadline_idx as usize] = store.put_cbor(deadline, Code::Blake2b256)?;
Ok(())
}
}
#[derive(Debug, Default, Serialize_tuple, Deserialize_tuple)]
pub struct Deadline {
pub partitions: Cid, pub expirations_epochs: Cid, pub partitions_posted: BitField,
pub early_terminations: BitField,
pub live_sectors: u64,
pub total_sectors: u64,
pub faulty_power: PowerPair,
pub optimistic_post_submissions: Cid,
pub sectors_snapshot: Cid,
pub partitions_snapshot: Cid,
pub optimistic_post_submissions_snapshot: Cid,
}
#[derive(Serialize_tuple, Deserialize_tuple, Clone)]
pub struct WindowedPoSt {
pub partitions: BitField,
pub proofs: Vec<PoStProof>,
}
#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct DisputeInfo {
pub all_sector_nos: BitField,
pub ignored_sector_nos: BitField,
pub disputed_sectors: PartitionSectorMap,
pub disputed_power: PowerPair,
}
impl Deadline {
pub fn new<BS: Blockstore>(store: &BS) -> Result<Self, ActorError> {
let empty_partitions_array =
Array::<(), BS>::new_with_bit_width(store, DEADLINE_PARTITIONS_AMT_BITWIDTH)
.flush()
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"Failed to create empty states array",
)
})?;
let empty_deadline_expiration_array =
Array::<(), BS>::new_with_bit_width(store, DEADLINE_EXPIRATIONS_AMT_BITWIDTH)
.flush()
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"Failed to create empty states array",
)
})?;
let empty_post_submissions_array = Array::<(), BS>::new_with_bit_width(
store,
DEADLINE_OPTIMISTIC_POST_SUBMISSIONS_AMT_BITWIDTH,
)
.flush()
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"Failed to create empty states array",
)
})?;
let empty_sectors_array = Array::<(), BS>::new_with_bit_width(store, SECTORS_AMT_BITWIDTH)
.flush()
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"Failed to construct empty sectors snapshot array",
)
})?;
Ok(Self {
partitions: empty_partitions_array,
expirations_epochs: empty_deadline_expiration_array,
early_terminations: BitField::new(),
live_sectors: 0,
total_sectors: 0,
faulty_power: PowerPair::zero(),
partitions_posted: BitField::new(),
optimistic_post_submissions: empty_post_submissions_array,
partitions_snapshot: empty_partitions_array,
sectors_snapshot: empty_sectors_array,
optimistic_post_submissions_snapshot: empty_post_submissions_array,
})
}
pub fn partitions_amt<'db, BS: Blockstore>(
&self,
store: &'db BS,
) -> anyhow::Result<Array<'db, Partition, BS>> {
Ok(Array::load(&self.partitions, store)?)
}
pub fn optimistic_proofs_amt<'db, BS: Blockstore>(
&self,
store: &'db BS,
) -> anyhow::Result<Array<'db, WindowedPoSt, BS>> {
Ok(Array::load(&self.optimistic_post_submissions, store)?)
}
pub fn partitions_snapshot_amt<'db, BS: Blockstore>(
&self,
store: &'db BS,
) -> anyhow::Result<Array<'db, Partition, BS>> {
Ok(Array::load(&self.partitions_snapshot, store)?)
}
pub fn optimistic_proofs_snapshot_amt<'db, BS: Blockstore>(
&self,
store: &'db BS,
) -> anyhow::Result<Array<'db, WindowedPoSt, BS>> {
Ok(Array::load(
&self.optimistic_post_submissions_snapshot,
store,
)?)
}
pub fn load_partition<BS: Blockstore>(
&self,
store: &BS,
partition_idx: u64,
) -> Result<Partition, ActorError> {
let partitions = Array::<Partition, _>::load(&self.partitions, store)
.context_code(ExitCode::USR_ILLEGAL_STATE, "loading partitions array")?;
let partition = partitions
.get(partition_idx)
.with_context_code(ExitCode::USR_ILLEGAL_STATE, || {
format!("failed to lookup partition {}", partition_idx)
})?
.ok_or_else(|| actor_error_v14!(not_found, "no partition {}", partition_idx))?;
Ok(partition.clone())
}
pub fn load_partition_snapshot<BS: Blockstore>(
&self,
store: &BS,
partition_idx: u64,
) -> anyhow::Result<Partition> {
let partitions = Array::<Partition, _>::load(&self.partitions_snapshot, store)?;
let partition = partitions
.get(partition_idx)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to lookup partition snapshot {}", partition_idx),
)
})?
.ok_or_else(|| {
actor_error_v14!(not_found, "no partition snapshot {}", partition_idx)
})?;
Ok(partition.clone())
}
pub fn add_expiration_partitions<BS: Blockstore>(
&mut self,
store: &BS,
expiration_epoch: ChainEpoch,
partitions: &[u64],
quant: QuantSpec,
) -> anyhow::Result<()> {
if partitions.is_empty() {
return Ok(());
}
let mut queue = BitFieldQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load expiration queue"))?;
queue
.add_to_queue_values(expiration_epoch, partitions.iter().copied())
.map_err(|e| e.downcast_wrap("failed to mutate expiration queue"))?;
self.expirations_epochs = queue
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed to save expiration queue"))?;
Ok(())
}
pub fn pop_expired_sectors<BS: Blockstore>(
&mut self,
store: &BS,
until: ChainEpoch,
quant: QuantSpec,
) -> anyhow::Result<ExpirationSet> {
let (expired_partitions, modified) = self.pop_expired_partitions(store, until, quant)?;
if !modified {
return Ok(ExpirationSet::empty());
}
let mut partitions = self.partitions_amt(store)?;
let mut on_time_sectors = Vec::<BitField>::new();
let mut early_sectors = Vec::<BitField>::new();
let mut all_on_time_pledge = TokenAmount::zero();
let mut all_active_power = PowerPair::zero();
let mut all_faulty_power = PowerPair::zero();
let mut partitions_with_early_terminations = Vec::<u64>::new();
for i in expired_partitions.iter() {
let partition_idx = i;
let mut partition = partitions
.get(partition_idx)?
.cloned()
.ok_or_else(|| anyhow!("missing expected partition {}", partition_idx))?;
let partition_expiration =
partition
.pop_expired_sectors(store, until, quant)
.map_err(|e| {
e.downcast_wrap(format!(
"failed to pop expired sectors from partition {}",
partition_idx
))
})?;
if !partition_expiration.early_sectors.is_empty() {
partitions_with_early_terminations.push(partition_idx);
}
on_time_sectors.push(partition_expiration.on_time_sectors);
early_sectors.push(partition_expiration.early_sectors);
all_active_power += &partition_expiration.active_power;
all_faulty_power += &partition_expiration.faulty_power;
all_on_time_pledge += &partition_expiration.on_time_pledge;
partitions.set(partition_idx, partition)?;
}
self.partitions = partitions.flush()?;
let new_early_terminations = BitField::try_from_bits(partitions_with_early_terminations)
.map_err(
|_| actor_error_v14!(illegal_state; "partition index out of bitfield range"),
)?;
self.early_terminations |= &new_early_terminations;
let all_on_time_sectors = BitField::union(&on_time_sectors);
let all_early_sectors = BitField::union(&early_sectors);
let on_time_count = all_on_time_sectors.len();
let early_count = all_early_sectors.len();
self.live_sectors -= on_time_count + early_count;
self.faulty_power -= &all_faulty_power;
Ok(ExpirationSet {
on_time_sectors: all_on_time_sectors,
early_sectors: all_early_sectors,
on_time_pledge: all_on_time_pledge,
active_power: all_active_power,
faulty_power: all_faulty_power,
})
}
pub fn add_sectors<BS: Blockstore>(
&mut self,
store: &BS,
partition_size: u64,
proven: bool,
mut sectors: &[SectorOnChainInfo],
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<PowerPair> {
let mut total_power = PowerPair::zero();
if sectors.is_empty() {
return Ok(total_power);
}
let mut partition_deadline_updates = Vec::<(ChainEpoch, u64)>::with_capacity(sectors.len());
self.live_sectors += sectors.len() as u64;
self.total_sectors += sectors.len() as u64;
let mut partitions = self.partitions_amt(store)?;
for partition_idx in partitions.count().saturating_sub(1).. {
let mut partition = match partitions.get(partition_idx)? {
Some(partition) => partition.clone(),
None => {
Partition::new(store)?
}
};
let sector_count = partition.sectors.len();
if sector_count >= partition_size {
continue;
}
let size = cmp::min(partition_size - sector_count, sectors.len() as u64) as usize;
let partition_new_sectors = §ors[..size];
sectors = §ors[size..];
let partition_power =
partition.add_sectors(store, proven, partition_new_sectors, sector_size, quant)?;
total_power += &partition_power;
partitions.set(partition_idx, partition)?;
partition_deadline_updates.extend(
partition_new_sectors
.iter()
.map(|s| (s.expiration, partition_idx)),
);
if sectors.is_empty() {
break;
}
}
self.partitions = partitions.flush()?;
let mut deadline_expirations =
BitFieldQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load expiration epochs"))?;
deadline_expirations
.add_many_to_queue_values(partition_deadline_updates.iter().copied())
.map_err(|e| e.downcast_wrap("failed to add expirations for new deadlines"))?;
self.expirations_epochs = deadline_expirations.amt.flush()?;
Ok(total_power)
}
pub fn pop_early_terminations<BS: Blockstore>(
&mut self,
store: &BS,
max_partitions: u64,
max_sectors: u64,
) -> anyhow::Result<(TerminationResult, bool)> {
let mut partitions = self.partitions_amt(store)?;
let mut partitions_finished = Vec::<u64>::new();
let mut result = TerminationResult::new();
for i in self.early_terminations.iter() {
let partition_idx = i;
let mut partition = match partitions.get(partition_idx).map_err(|e| {
e.downcast_wrap(format!("failed to load partition {}", partition_idx))
})? {
Some(partition) => partition.clone(),
None => {
partitions_finished.push(partition_idx);
continue;
}
};
let (partition_result, more) = partition
.pop_early_terminations(store, max_sectors - result.sectors_processed)
.map_err(|e| e.downcast_wrap("failed to pop terminations from partition"))?;
result += partition_result;
if !more {
partitions_finished.push(partition_idx);
}
partitions.set(partition_idx, partition).map_err(|e| {
e.downcast_wrap(format!("failed to store partition {}", partition_idx))
})?;
if !result.below_limit(max_partitions, max_sectors) {
break;
}
}
for finished in partitions_finished {
self.early_terminations.unset(finished);
}
self.partitions = partitions
.flush()
.map_err(|e| e.downcast_wrap("failed to update partitions"))?;
let no_early_terminations = self.early_terminations.is_empty();
Ok((result, !no_early_terminations))
}
pub fn pop_expired_partitions<BS: Blockstore>(
&mut self,
store: &BS,
until: ChainEpoch,
quant: QuantSpec,
) -> anyhow::Result<(BitField, bool)> {
let mut expirations = BitFieldQueue::new(store, &self.expirations_epochs, quant)?;
let (popped, modified) = expirations
.pop_until(until)
.map_err(|e| e.downcast_wrap("failed to pop expiring partitions"))?;
if modified {
self.expirations_epochs = expirations.amt.flush()?;
}
Ok((popped, modified))
}
#[allow(clippy::too_many_arguments)]
pub fn terminate_sectors<BS: Blockstore>(
&mut self,
policy: &Policy,
store: &BS,
sectors: &Sectors<'_, BS>,
epoch: ChainEpoch,
partition_sectors: &mut PartitionSectorMap,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<PowerPair> {
let mut partitions = self.partitions_amt(store)?;
let mut power_lost = PowerPair::zero();
for (partition_idx, sector_numbers) in partition_sectors.iter() {
let mut partition = partitions
.get(partition_idx)
.map_err(|e| {
e.downcast_wrap(format!("failed to load partition {}", partition_idx))
})?
.ok_or_else(
|| actor_error_v14!(not_found; "failed to find partition {}", partition_idx),
)?
.clone();
let removed = partition
.terminate_sectors(
policy,
store,
sectors,
epoch,
sector_numbers,
sector_size,
quant,
)
.map_err(|e| {
e.downcast_wrap(format!(
"failed to terminate sectors in partition {}",
partition_idx
))
})?;
partitions.set(partition_idx, partition).map_err(|e| {
e.downcast_wrap(format!(
"failed to store updated partition {}",
partition_idx
))
})?;
if !removed.is_empty() {
self.early_terminations.set(partition_idx);
self.live_sectors -= removed.len();
} self.faulty_power -= &removed.faulty_power;
power_lost += &removed.active_power;
}
self.partitions = partitions
.flush()
.map_err(|e| e.downcast_wrap("failed to persist partitions"))?;
Ok(power_lost)
}
pub fn remove_partitions<BS: Blockstore>(
&mut self,
store: &BS,
to_remove: &BitField,
quant: QuantSpec,
) -> Result<
(
BitField, BitField, PowerPair, ),
anyhow::Error,
> {
let old_partitions = self
.partitions_amt(store)
.map_err(|e| e.downcast_wrap("failed to load partitions"))?;
let partition_count = old_partitions.count();
let to_remove_set: BTreeSet<_> = to_remove
.bounded_iter(partition_count)
.ok_or_else(
|| actor_error_v14!(illegal_argument; "partitions to remove exceeds total"),
)?
.collect();
if let Some(&max_partition) = to_remove_set.iter().max() {
if max_partition >= partition_count {
return Err(
actor_error_v14!(illegal_argument; "partition index {} out of range [0, {})", max_partition, partition_count).into()
);
}
} else {
return Ok((BitField::new(), BitField::new(), PowerPair::zero()));
}
if !self.early_terminations.is_empty() {
return Err(
actor_error_v14!(illegal_argument; "cannot remove partitions from deadline with early terminations").into(),
);
}
let mut new_partitions =
Array::<Partition, BS>::new_with_bit_width(store, DEADLINE_PARTITIONS_AMT_BITWIDTH);
let mut all_dead_sectors = Vec::<BitField>::with_capacity(to_remove_set.len());
let mut all_live_sectors = Vec::<BitField>::with_capacity(to_remove_set.len());
let mut removed_power = PowerPair::zero();
old_partitions
.for_each(|partition_idx, partition| {
if !to_remove_set.contains(&partition_idx) {
new_partitions.set(new_partitions.count(), partition.clone())?;
return Ok(());
}
let has_no_faults = partition.faults.is_empty();
if !has_no_faults {
return Err(actor_error_v14!(
illegal_argument,
"cannot remove partition {}: has faults",
partition_idx
)
.into());
}
let all_proven = partition.unproven.is_empty();
if !all_proven {
return Err(actor_error_v14!(
illegal_argument,
"cannot remove partition {}: has unproven sectors",
partition_idx
)
.into());
}
let live_sectors = partition.live_sectors();
all_dead_sectors.push(partition.terminated.clone());
all_live_sectors.push(live_sectors);
removed_power += &partition.live_power;
Ok(())
})
.map_err(|e| e.downcast_wrap("while removing partitions"))?;
self.partitions = new_partitions
.flush()
.map_err(|e| e.downcast_wrap("failed to persist new partition table"))?;
let dead = BitField::union(&all_dead_sectors);
let live = BitField::union(&all_live_sectors);
let removed_dead_sectors = dead.len();
let removed_live_sectors = live.len();
self.live_sectors -= removed_live_sectors;
self.total_sectors -= removed_live_sectors + removed_dead_sectors;
let mut expiration_epochs = BitFieldQueue::new(store, &self.expirations_epochs, quant)
.map_err(|e| e.downcast_wrap("failed to load expiration queue"))?;
expiration_epochs.cut(to_remove).map_err(|e| {
e.downcast_wrap("failed cut removed partitions from deadline expiration queue")
})?;
self.expirations_epochs = expiration_epochs
.amt
.flush()
.map_err(|e| e.downcast_wrap("failed persist deadline expiration queue"))?;
Ok((live, dead, removed_power))
}
pub fn record_faults<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
sector_size: SectorSize,
quant: QuantSpec,
fault_expiration_epoch: ChainEpoch,
partition_sectors: &mut PartitionSectorMap,
) -> anyhow::Result<PowerPair> {
let mut partitions = self.partitions_amt(store)?;
let mut partitions_with_fault = Vec::<u64>::with_capacity(partition_sectors.len());
let mut power_delta = PowerPair::zero();
for (partition_idx, sector_numbers) in partition_sectors.iter() {
let mut partition = partitions
.get(partition_idx)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to load partition {}", partition_idx),
)
})?
.ok_or_else(|| actor_error_v14!(not_found; "no such partition {}", partition_idx))?
.clone();
let (new_faults, partition_power_delta, partition_new_faulty_power) = partition
.record_faults(
store,
sectors,
sector_numbers,
fault_expiration_epoch,
sector_size,
quant,
)
.map_err(|e| {
e.downcast_wrap(format!(
"failed to declare faults in partition {}",
partition_idx
))
})?;
self.faulty_power += &partition_new_faulty_power;
power_delta += &partition_power_delta;
if !new_faults.is_empty() {
partitions_with_fault.push(partition_idx);
}
partitions.set(partition_idx, partition).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to store partition {}", partition_idx),
)
})?;
}
self.partitions = partitions.flush().map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to store partitions root",
)
})?;
self.add_expiration_partitions(
store,
fault_expiration_epoch,
&partitions_with_fault,
quant,
)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to update expirations for partitions with faults",
)
})?;
Ok(power_delta)
}
pub fn declare_faults_recovered<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
sector_size: SectorSize,
partition_sectors: &mut PartitionSectorMap,
) -> anyhow::Result<()> {
let mut partitions = self.partitions_amt(store)?;
for (partition_idx, sector_numbers) in partition_sectors.iter() {
let mut partition = partitions
.get(partition_idx)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to load partition {}", partition_idx),
)
})?
.ok_or_else(|| actor_error_v14!(not_found; "no such partition {}", partition_idx))?
.clone();
partition
.declare_faults_recovered(sectors, sector_size, sector_numbers)
.map_err(|e| e.downcast_wrap("failed to add recoveries"))?;
partitions.set(partition_idx, partition).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to update partition {}", partition_idx),
)
})?;
}
self.partitions = partitions.flush().map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to store partitions root",
)
})?;
Ok(())
}
pub fn process_deadline_end<BS: Blockstore>(
&mut self,
store: &BS,
quant: QuantSpec,
fault_expiration_epoch: ChainEpoch,
sectors: Cid,
) -> Result<(PowerPair, PowerPair), ActorError> {
let mut partitions = self.partitions_amt(store).map_err(|e| {
e.downcast_default(ExitCode::USR_ILLEGAL_STATE, "failed to load partitions")
})?;
let mut detected_any = false;
let mut rescheduled_partitions = Vec::<u64>::new();
let mut power_delta = PowerPair::zero();
let mut penalized_power = PowerPair::zero();
for partition_idx in 0..partitions.count() {
let proven = self.partitions_posted.get(partition_idx);
if proven {
continue;
}
let mut partition = partitions
.get(partition_idx)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to load partition {}", partition_idx),
)
})?
.ok_or_else(|| actor_error_v14!(illegal_state; "no partition {}", partition_idx))?
.clone();
if partition.recovering_power.is_zero()
&& partition.faulty_power == partition.live_power
{
continue;
}
detected_any = true;
let (part_power_delta, part_penalized_power, part_new_faulty_power) = partition
.record_missed_post(store, fault_expiration_epoch, quant)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!(
"failed to record missed PoSt for partition {}",
partition_idx
),
)
})?;
if !part_new_faulty_power.is_zero() {
rescheduled_partitions.push(partition_idx);
}
partitions.set(partition_idx, partition).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to update partition {}", partition_idx),
)
})?;
self.faulty_power += &part_new_faulty_power;
power_delta += &part_power_delta;
penalized_power += &part_penalized_power;
}
if detected_any {
self.partitions = partitions.flush().map_err(|e| {
e.downcast_default(ExitCode::USR_ILLEGAL_STATE, "failed to store partitions")
})?;
}
self.add_expiration_partitions(
store,
fault_expiration_epoch,
&rescheduled_partitions,
quant,
)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to update deadline expiration queue",
)
})?;
self.partitions_posted = BitField::new();
self.partitions_snapshot = self.partitions;
self.optimistic_post_submissions_snapshot = self.optimistic_post_submissions;
self.optimistic_post_submissions = Array::<(), BS>::new_with_bit_width(
store,
DEADLINE_OPTIMISTIC_POST_SUBMISSIONS_AMT_BITWIDTH,
)
.flush()
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to clear pending proofs array",
)
})?;
if self.optimistic_post_submissions != self.optimistic_post_submissions_snapshot {
self.sectors_snapshot = sectors;
} else {
self.sectors_snapshot =
Array::<(), BS>::new_with_bit_width(store, SECTORS_AMT_BITWIDTH)
.flush()
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to clear sectors snapshot array",
)
})?;
}
Ok((power_delta, penalized_power))
}
pub fn for_each<BS: Blockstore>(
&self,
store: &BS,
f: impl FnMut(u64, &Partition) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let parts = self.partitions_amt(store)?;
parts.for_each(f)?;
Ok(())
}
pub fn validate_state(&self) -> anyhow::Result<()> {
if self.live_sectors > self.total_sectors {
return Err(anyhow!("deadline left with more live sectors than total"));
}
if self.faulty_power.raw.is_negative() || self.faulty_power.qa.is_negative() {
return Err(anyhow!("deadline left with negative faulty power"));
}
Ok(())
}
pub fn load_partitions_for_dispute<BS: Blockstore>(
&self,
store: &BS,
partitions: BitField,
) -> anyhow::Result<DisputeInfo> {
let partitions_snapshot = self
.partitions_snapshot_amt(store)
.map_err(|e| e.downcast_wrap("failed to load partitions {}"))?;
let mut all_sectors = Vec::new();
let mut all_ignored = Vec::new();
let mut disputed_sectors = PartitionSectorMap::default();
let mut disputed_power = PowerPair::zero();
for part_idx in partitions.iter() {
let partition_snapshot = partitions_snapshot
.get(part_idx)?
.ok_or_else(|| anyhow!("failed to find partition {}", part_idx))?;
all_sectors.push(partition_snapshot.sectors.clone());
all_ignored.push(partition_snapshot.faults.clone());
all_ignored.push(partition_snapshot.terminated.clone());
all_ignored.push(partition_snapshot.unproven.clone());
let active = partition_snapshot.active_sectors();
disputed_sectors.add(part_idx, active)?;
disputed_power += &partition_snapshot.active_power();
}
let all_sector_nos = BitField::union(&all_sectors);
let all_ignored_nos = BitField::union(&all_ignored);
Ok(DisputeInfo {
all_sector_nos,
disputed_sectors,
disputed_power,
ignored_sector_nos: all_ignored_nos,
})
}
pub fn is_live(&self) -> bool {
if self.live_sectors > 0 {
return true;
}
let has_no_proofs = self.partitions_posted.is_empty();
if !has_no_proofs {
return true;
}
if self.partitions != self.partitions_snapshot {
return true;
}
if self.optimistic_post_submissions != self.optimistic_post_submissions_snapshot {
return true;
}
false
}
}
pub struct PoStResult {
pub power_delta: PowerPair,
pub new_faulty_power: PowerPair,
pub retracted_recovery_power: PowerPair,
pub recovered_power: PowerPair,
pub sectors: BitField,
pub ignored_sectors: BitField,
pub partitions: BitField,
}
impl Deadline {
pub fn record_proven_sectors<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
sector_size: SectorSize,
quant: QuantSpec,
fault_expiration: ChainEpoch,
post_partitions: &mut [PoStPartition],
) -> anyhow::Result<PoStResult> {
let partition_indexes = BitField::try_from_bits(post_partitions.iter().map(|p| p.index))
.map_err(
|_| actor_error_v14!(illegal_argument; "partition index out of bitfield range"),
)?;
let num_partitions = partition_indexes.len();
if num_partitions != post_partitions.len() as u64 {
return Err(anyhow!(actor_error_v14!(
illegal_argument,
"duplicate partitions proven"
)));
}
let already_proven = &self.partitions_posted & &partition_indexes;
if !already_proven.is_empty() {
return Err(anyhow!(actor_error_v14!(
illegal_argument,
"partition already proven: {:?}",
already_proven
)));
}
let mut partitions = self.partitions_amt(store)?;
let mut all_sectors = Vec::<BitField>::with_capacity(post_partitions.len());
let mut all_ignored = Vec::<BitField>::with_capacity(post_partitions.len());
let mut new_faulty_power_total = PowerPair::zero();
let mut retracted_recovery_power_total = PowerPair::zero();
let mut recovered_power_total = PowerPair::zero();
let mut rescheduled_partitions = Vec::<u64>::new();
let mut power_delta = PowerPair::zero();
for post in post_partitions {
let mut partition = partitions
.get(post.index)
.map_err(|e| e.downcast_wrap(format!("failed to load partition {}", post.index)))?
.ok_or_else(|| actor_error_v14!(not_found; "no such partition {}", post.index))?
.clone();
let (mut new_power_delta, new_fault_power, retracted_recovery_power, has_new_faults) =
partition
.record_skipped_faults(
store,
sectors,
sector_size,
quant,
fault_expiration,
&post.skipped,
)
.map_err(|e| {
e.downcast_wrap(format!(
"failed to add skipped faults to partition {}",
post.index
))
})?;
if has_new_faults {
rescheduled_partitions.push(post.index);
}
let recovered_power = partition
.recover_faults(store, sectors, sector_size, quant)
.map_err(|e| {
e.downcast_wrap(format!(
"failed to recover faulty sectors for partition {}",
post.index
))
})?;
new_power_delta += &partition.activate_unproven();
all_sectors.push(partition.sectors.clone());
all_ignored.push(partition.faults.clone());
all_ignored.push(partition.terminated.clone());
partitions.set(post.index, partition).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
format!("failed to update partition {}", post.index),
)
})?;
new_faulty_power_total += &new_fault_power;
retracted_recovery_power_total += &retracted_recovery_power;
recovered_power_total += &recovered_power;
power_delta += &new_power_delta;
power_delta += &recovered_power;
self.partitions_posted.set(post.index);
}
self.add_expiration_partitions(store, fault_expiration, &rescheduled_partitions, quant)
.map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"failed to update expirations for partitions with faults",
)
})?;
self.faulty_power -= &recovered_power_total;
self.faulty_power += &new_faulty_power_total;
self.partitions = partitions.flush().map_err(|e| {
e.downcast_default(ExitCode::USR_ILLEGAL_STATE, "failed to persist partitions")
})?;
let all_sector_numbers = BitField::union(&all_sectors);
let all_ignored_sector_numbers = BitField::union(&all_ignored);
Ok(PoStResult {
new_faulty_power: new_faulty_power_total,
retracted_recovery_power: retracted_recovery_power_total,
recovered_power: recovered_power_total,
sectors: all_sector_numbers,
power_delta,
ignored_sectors: all_ignored_sector_numbers,
partitions: partition_indexes,
})
}
pub fn record_post_proofs<BS: Blockstore>(
&mut self,
store: &BS,
partitions: &BitField,
proofs: &[PoStProof],
) -> anyhow::Result<()> {
let mut proof_arr = self
.optimistic_proofs_amt(store)
.map_err(|e| e.downcast_wrap("failed to load post proofs"))?;
proof_arr
.set(
proof_arr.count(),
WindowedPoSt {
partitions: partitions.clone(),
proofs: proofs.to_vec(),
},
)
.map_err(|e| e.downcast_wrap("failed to store proof"))?;
let root = proof_arr
.flush()
.map_err(|e| e.downcast_wrap("failed to save proofs"))?;
self.optimistic_post_submissions = root;
Ok(())
}
pub fn take_post_proofs<BS: Blockstore>(
&mut self,
store: &BS,
idx: u64,
) -> anyhow::Result<(BitField, Vec<PoStProof>)> {
let mut proof_arr = self
.optimistic_proofs_snapshot_amt(store)
.map_err(|e| e.downcast_wrap("failed to load post proofs snapshot amt"))?;
let post = proof_arr
.delete(idx)
.map_err(|e| e.downcast_wrap(format!("failed to retrieve proof {}", idx)))?
.ok_or_else(|| actor_error_v14!(illegal_argument, "proof {} not found", idx))?;
let root = proof_arr
.flush()
.map_err(|e| e.downcast_wrap("failed to save proofs"))?;
self.optimistic_post_submissions_snapshot = root;
Ok((post.partitions, post.proofs))
}
pub fn reschedule_sector_expirations<BS: Blockstore>(
&mut self,
store: &BS,
sectors: &Sectors<'_, BS>,
expiration: ChainEpoch,
partition_sectors: &mut PartitionSectorMap,
sector_size: SectorSize,
quant: QuantSpec,
) -> anyhow::Result<Vec<SectorOnChainInfo>> {
let mut partitions = self.partitions_amt(store)?;
let mut rescheduled_partitions = Vec::<u64>::new();
let mut all_replaced = Vec::new();
for (partition_idx, sector_numbers) in partition_sectors.iter() {
let mut partition = match partitions.get(partition_idx).map_err(|e| {
e.downcast_wrap(format!("failed to load partition {}", partition_idx))
})? {
Some(partition) => partition.clone(),
None => {
continue;
}
};
let replaced = partition
.reschedule_expirations(
store,
sectors,
expiration,
sector_numbers,
sector_size,
quant,
)
.map_err(|e| {
e.downcast_wrap(format!(
"failed to reschedule expirations in partition {}",
partition_idx
))
})?;
if replaced.is_empty() {
continue;
}
all_replaced.extend(replaced);
rescheduled_partitions.push(partition_idx);
partitions.set(partition_idx, partition).map_err(|e| {
e.downcast_wrap(format!("failed to store partition {}", partition_idx))
})?;
}
if !rescheduled_partitions.is_empty() {
self.partitions = partitions
.flush()
.map_err(|e| e.downcast_wrap("failed to save partitions"))?;
self.add_expiration_partitions(store, expiration, &rescheduled_partitions, quant)
.map_err(|e| e.downcast_wrap("failed to reschedule partition expirations"))?;
}
Ok(all_replaced)
}
}