1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::convert::TryInto;

use cid::Cid;
use fil_actors_shared::v12::{ActorDowncast, Array};
use fvm_ipld_amt::Error as AmtError;
use fvm_ipld_bitfield::BitField;
use fvm_ipld_blockstore::Blockstore;
use fvm_shared4::clock::{ChainEpoch, QuantSpec};
use itertools::Itertools;

/// Wrapper for working with an AMT[ChainEpoch]*Bitfield functioning as a queue, bucketed by epoch.
/// Keys in the queue are quantized (upwards), modulo some offset, to reduce the cardinality of keys.
pub struct BitFieldQueue<'db, BS> {
    pub amt: Array<'db, BitField, BS>,
    quant: QuantSpec,
}

impl<'db, BS: Blockstore> BitFieldQueue<'db, BS> {
    pub fn new(store: &'db BS, root: &Cid, quant: QuantSpec) -> Result<Self, AmtError> {
        Ok(Self {
            amt: Array::load(root, store)?,
            quant,
        })
    }

    /// Adds values to the queue entry for an epoch.
    pub fn add_to_queue(&mut self, raw_epoch: ChainEpoch, values: &BitField) -> anyhow::Result<()> {
        if values.is_empty() {
            // nothing to do.
            return Ok(());
        }

        let epoch: u64 = self.quant.quantize_up(raw_epoch).try_into()?;

        let bitfield = self
            .amt
            .get(epoch)
            .map_err(|e| e.downcast_wrap(format!("failed to lookup queue epoch {}", epoch)))?
            .cloned()
            .unwrap_or_default();

        self.amt
            .set(epoch, &bitfield | values)
            .map_err(|e| e.downcast_wrap(format!("failed to set queue epoch {}", epoch)))?;

        Ok(())
    }

    pub fn add_to_queue_values(
        &mut self,
        epoch: ChainEpoch,
        values: impl IntoIterator<Item = u64>,
    ) -> anyhow::Result<()> {
        self.add_to_queue(epoch, &BitField::try_from_bits(values)?)
    }

    /// Cut cuts the elements from the bits in the given bitfield out of the queue,
    /// shifting other bits down and removing any newly empty entries.
    ///
    /// See the docs on `BitField::cut` to better understand what it does.
    pub fn cut(&mut self, to_cut: &BitField) -> anyhow::Result<()> {
        let mut epochs_to_remove = Vec::<u64>::new();

        self.amt
            .for_each_mut(|epoch, bitfield| {
                let bf = bitfield.cut(to_cut);

                if bf.is_empty() {
                    epochs_to_remove.push(epoch);
                } else {
                    **bitfield = bf;
                }

                Ok(())
            })
            .map_err(|e| e.downcast_wrap("failed to cut from bitfield queue"))?;

        self.amt
            .batch_delete(epochs_to_remove, true)
            .map_err(|e| e.downcast_wrap("failed to remove empty epochs from bitfield queue"))?;

        Ok(())
    }

    pub fn add_many_to_queue_values(
        &mut self,
        values: impl IntoIterator<Item = (ChainEpoch, u64)>,
    ) -> anyhow::Result<()> {
        // Pre-quantize to reduce the number of updates.
        let mut quantized_values: Vec<_> = values
            .into_iter()
            .map(|(raw_epoch, value)| (self.quant.quantize_up(raw_epoch), value))
            .collect();

        // Sort and dedup.
        quantized_values.sort_unstable();
        quantized_values.dedup();

        // Add to queue.
        let mut iter = quantized_values.into_iter().peekable();
        while let Some(&(epoch, _)) = iter.peek() {
            self.add_to_queue_values(
                epoch,
                iter.peeking_take_while(|&(e, _)| e == epoch)
                    .map(|(_, v)| v),
            )?;
        }

        Ok(())
    }

    /// Removes and returns all values with keys less than or equal to until.
    /// Modified return value indicates whether this structure has been changed by the call.
    pub fn pop_until(&mut self, until: ChainEpoch) -> anyhow::Result<(BitField, bool)> {
        let mut popped_values = BitField::new();
        let mut popped_keys = Vec::<u64>::new();

        self.amt.for_each_while(|epoch, bitfield| {
            if epoch as ChainEpoch > until {
                // break
                return Ok(false);
            }

            popped_keys.push(epoch);
            popped_values |= bitfield;
            Ok(true)
        })?;

        if popped_keys.is_empty() {
            // Nothing expired.
            return Ok((BitField::new(), false));
        }

        self.amt.batch_delete(popped_keys, true)?;
        Ok((popped_values, true))
    }
}