|             Line data    Source code 
       1              : //! This is what the compaction implementation needs to know about
       2              : //! layers, keyspace etc.
       3              : //!
       4              : //! All the heavy lifting is done by the create_image and create_delta
       5              : //! functions that the implementor provides.
       6              : use futures::Future;
       7              : use pageserver_api::{key::Key, keyspace::ShardedRange, shard::ShardIdentity};
       8              : use std::ops::Range;
       9              : use utils::lsn::Lsn;
      10              : 
      11              : /// Public interface. This is the main thing that the implementor needs to provide
      12              : pub trait CompactionJobExecutor {
      13              :     // Type system.
      14              :     //
      15              :     // We assume that there are two kinds of layers, deltas and images. The
      16              :     // compaction doesn't distinguish whether they are stored locally or
      17              :     // remotely.
      18              :     //
      19              :     // The keyspace is defined by the CompactionKey trait.
      20              :     type Key: CompactionKey;
      21              : 
      22              :     type Layer: CompactionLayer<Self::Key> + Clone;
      23              :     type DeltaLayer: CompactionDeltaLayer<Self> + Clone;
      24              :     type ImageLayer: CompactionImageLayer<Self> + Clone;
      25              : 
      26              :     // This is passed through to all the interface functions. The compaction
      27              :     // implementation doesn't do anything with it, but it might be useful for
      28              :     // the interface implementation.
      29              :     type RequestContext: CompactionRequestContext;
      30              : 
      31              :     // ----
      32              :     // Functions that the planner uses to support its decisions
      33              :     // ----
      34              : 
      35              :     fn get_shard_identity(&self) -> &ShardIdentity;
      36              : 
      37              :     /// Return all layers that overlap the given bounding box.
      38              :     fn get_layers(
      39              :         &mut self,
      40              :         key_range: &Range<Self::Key>,
      41              :         lsn_range: &Range<Lsn>,
      42              :         ctx: &Self::RequestContext,
      43              :     ) -> impl Future<Output = anyhow::Result<Vec<Self::Layer>>> + Send;
      44              : 
      45              :     fn get_keyspace(
      46              :         &mut self,
      47              :         key_range: &Range<Self::Key>,
      48              :         lsn: Lsn,
      49              :         ctx: &Self::RequestContext,
      50              :     ) -> impl Future<Output = anyhow::Result<CompactionKeySpace<Self::Key>>> + Send;
      51              : 
      52              :     /// NB: This is a pretty expensive operation. In the real pageserver
      53              :     /// implementation, it downloads the layer, and keeps it resident
      54              :     /// until the DeltaLayer is dropped.
      55              :     fn downcast_delta_layer(
      56              :         &self,
      57              :         layer: &Self::Layer,
      58              :     ) -> impl Future<Output = anyhow::Result<Option<Self::DeltaLayer>>> + Send;
      59              : 
      60              :     // ----
      61              :     // Functions to execute the plan
      62              :     // ----
      63              : 
      64              :     /// Create a new image layer, materializing all the values in the key range,
      65              :     /// at given 'lsn'.
      66              :     fn create_image(
      67              :         &mut self,
      68              :         lsn: Lsn,
      69              :         key_range: &Range<Self::Key>,
      70              :         ctx: &Self::RequestContext,
      71              :     ) -> impl Future<Output = anyhow::Result<()>> + Send;
      72              : 
      73              :     /// Create a new delta layer, containing all the values from 'input_layers'
      74              :     /// in the given key and LSN range.
      75              :     fn create_delta(
      76              :         &mut self,
      77              :         lsn_range: &Range<Lsn>,
      78              :         key_range: &Range<Self::Key>,
      79              :         input_layers: &[Self::DeltaLayer],
      80              :         ctx: &Self::RequestContext,
      81              :     ) -> impl Future<Output = anyhow::Result<()>> + Send;
      82              : 
      83              :     /// Delete a layer. The compaction implementation will call this only after
      84              :     /// all the create_image() or create_delta() calls that deletion of this
      85              :     /// layer depends on have finished. But if the implementor has extra lazy
      86              :     /// background tasks, like uploading the index json file to remote storage.
      87              :     /// it is the implementation's responsibility to track those.
      88              :     fn delete_layer(
      89              :         &mut self,
      90              :         layer: &Self::Layer,
      91              :         ctx: &Self::RequestContext,
      92              :     ) -> impl Future<Output = anyhow::Result<()>> + Send;
      93              : }
      94              : 
      95              : pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
      96              :     const MIN: Self;
      97              :     const MAX: Self;
      98              : 
      99              :     /// Calculate distance between key_range.start and key_range.end.
     100              :     ///
     101              :     /// This returns u32, for compatibility with Repository::key. If the
     102              :     /// distance is larger, return u32::MAX.
     103              :     fn key_range_size(key_range: &Range<Self>, shard_identity: &ShardIdentity) -> u32;
     104              : 
     105              :     // return "self + 1"
     106              :     fn next(&self) -> Self;
     107              : 
     108              :     // return "self + <some decent amount to skip>". The amount to skip
     109              :     // is left to the implementation.
     110              :     // FIXME: why not just "add(u32)" ?  This is hard to use
     111              :     fn skip_some(&self) -> Self;
     112              : }
     113              : 
     114              : impl CompactionKey for Key {
     115              :     const MIN: Self = Self::MIN;
     116              :     const MAX: Self = Self::MAX;
     117              : 
     118            0 :     fn key_range_size(r: &std::ops::Range<Self>, shard_identity: &ShardIdentity) -> u32 {
     119            0 :         ShardedRange::new(r.clone(), shard_identity).page_count()
     120            0 :     }
     121            0 :     fn next(&self) -> Key {
     122            0 :         (self as &Key).next()
     123            0 :     }
     124            0 :     fn skip_some(&self) -> Key {
     125            0 :         self.add(128)
     126            0 :     }
     127              : }
     128              : 
     129              : /// Contiguous ranges of keys that belong to the key space. In key order, and
     130              : /// with no overlap.
     131              : pub type CompactionKeySpace<K> = Vec<Range<K>>;
     132              : 
     133              : /// Functions needed from all layers.
     134              : pub trait CompactionLayer<K: CompactionKey + ?Sized> {
     135              :     fn key_range(&self) -> &Range<K>;
     136              :     fn lsn_range(&self) -> &Range<Lsn>;
     137              : 
     138              :     fn file_size(&self) -> u64;
     139              : 
     140              :     /// For debugging, short human-readable representation of the layer. E.g. filename.
     141              :     fn short_id(&self) -> String;
     142              : 
     143              :     fn is_delta(&self) -> bool;
     144              : }
     145              : pub trait CompactionDeltaLayer<E: CompactionJobExecutor + ?Sized>: CompactionLayer<E::Key> {
     146              :     type DeltaEntry<'a>: CompactionDeltaEntry<'a, E::Key>
     147              :     where
     148              :         Self: 'a;
     149              : 
     150              :     /// Return all keys in this delta layer.
     151              :     fn load_keys<'a>(
     152              :         &self,
     153              :         ctx: &E::RequestContext,
     154              :     ) -> impl Future<Output = anyhow::Result<Vec<Self::DeltaEntry<'_>>>> + Send;
     155              : }
     156              : 
     157              : pub trait CompactionImageLayer<E: CompactionJobExecutor + ?Sized>: CompactionLayer<E::Key> {}
     158              : 
     159              : pub trait CompactionDeltaEntry<'a, K> {
     160              :     fn key(&self) -> K;
     161              :     fn lsn(&self) -> Lsn;
     162              :     fn size(&self) -> u64;
     163              : }
     164              : 
     165              : pub trait CompactionRequestContext {}
         |