Line data Source code
1 : use std::ops::Range;
2 :
3 : // NOTE the `im` crate has 20x more downloads and also has
4 : // persistent/immutable BTree. But it's bugged so rpds is a
5 : // better choice <https://github.com/neondatabase/neon/issues/3395>
6 : use rpds::RedBlackTreeMapSync;
7 :
8 : /// Data structure that can efficiently:
9 : /// - find the latest layer by lsn.end at a given key
10 : /// - iterate the latest layers in a key range
11 : /// - insert layers in non-decreasing lsn.start order
12 : ///
13 : /// For a detailed explanation and justification of this approach, see:
14 : /// <https://neon.tech/blog/persistent-structures-in-neons-wal-indexing>
15 : ///
16 : /// NOTE The struct is parameterized over Value for easier
17 : /// testing, but in practice it's some sort of layer.
18 : pub struct LayerCoverage<Value> {
19 : /// For every change in coverage (as we sweep the key space)
20 : /// we store (lsn.end, value).
21 : ///
22 : /// NOTE We use an immutable/persistent tree so that we can keep historic
23 : /// versions of this coverage without cloning the whole thing and
24 : /// incurring quadratic memory cost. See HistoricLayerCoverage.
25 : ///
26 : /// NOTE We use the Sync version of the map because we want Self to
27 : /// be Sync. Using nonsync might be faster, if we can work with
28 : /// that.
29 : nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
30 : }
31 :
32 : impl<T: Clone> Default for LayerCoverage<T> {
33 4908 : fn default() -> Self {
34 4908 : Self::new()
35 4908 : }
36 : }
37 :
38 : impl<Value: Clone> LayerCoverage<Value> {
39 4908 : pub fn new() -> Self {
40 4908 : Self {
41 4908 : nodes: RedBlackTreeMapSync::default(),
42 4908 : }
43 4908 : }
44 :
45 : /// Helper function to subdivide the key range without changing any values
46 : ///
47 : /// This operation has no semantic effect by itself. It only helps us pin in
48 : /// place the part of the coverage we don't want to change when inserting.
49 : ///
50 : /// As an analogy, think of a polygon. If you add a vertex along one of the
51 : /// segments, the polygon is still the same, but it behaves differently when
52 : /// we move or delete one of the other points.
53 : ///
54 : /// Complexity: O(log N)
55 30000 : fn add_node(&mut self, key: i128) {
56 30000 : let value = match self.nodes.range(..=key).last() {
57 20448 : Some((_, Some(v))) => Some(v.clone()),
58 7848 : Some((_, None)) => None,
59 1704 : None => None,
60 : };
61 30000 : self.nodes.insert_mut(key, value);
62 30000 : }
63 :
64 : /// Insert a layer.
65 : ///
66 : /// Complexity: worst case O(N), in practice O(log N). See NOTE in implementation.
67 15000 : pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) {
68 15000 : // Add nodes at endpoints
69 15000 : //
70 15000 : // NOTE The order of lines is important. We add nodes at the start
71 15000 : // and end of the key range **before updating any nodes** in order
72 15000 : // to pin down the current coverage outside of the relevant key range.
73 15000 : // Only the coverage inside the layer's key range should change.
74 15000 : self.add_node(key.start);
75 15000 : self.add_node(key.end);
76 15000 :
77 15000 : // Raise the height where necessary
78 15000 : //
79 15000 : // NOTE This loop is worst case O(N), but amortized O(log N) in the special
80 15000 : // case when rectangles have no height. In practice I don't think we'll see
81 15000 : // the kind of layer intersections needed to trigger O(N) behavior. The worst
82 15000 : // case is N/2 horizontal layers overlapped with N/2 vertical layers in a
83 15000 : // grid pattern.
84 15000 : let mut to_update = Vec::new();
85 15000 : let mut to_remove = Vec::new();
86 15000 : let mut prev_covered = false;
87 20286 : for (k, node) in self.nodes.range(key) {
88 20286 : let needs_cover = match node {
89 3180 : None => true,
90 17106 : Some((h, _)) => h < &lsn.end,
91 : };
92 20286 : if needs_cover {
93 20244 : match prev_covered {
94 5226 : true => to_remove.push(*k),
95 15018 : false => to_update.push(*k),
96 : }
97 42 : }
98 20286 : prev_covered = needs_cover;
99 : }
100 : // TODO check if the nodes inserted at key.start and key.end are safe
101 : // to remove. It's fine to keep them but they could be redundant.
102 30018 : for k in to_update {
103 15018 : self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
104 15018 : }
105 20226 : for k in to_remove {
106 5226 : self.nodes.remove_mut(&k);
107 5226 : }
108 15000 : }
109 :
110 : /// Get the latest (by lsn.end) layer at a given key
111 : ///
112 : /// Complexity: O(log N)
113 2191418 : pub fn query(&self, key: i128) -> Option<Value> {
114 2191418 : self.nodes
115 2191418 : .range(..=key)
116 2191418 : .next_back()?
117 : .1
118 1893190 : .as_ref()
119 1893190 : .map(|(_, v)| v.clone())
120 2191418 : }
121 :
122 : /// Iterate the changes in layer coverage in a given range. You will likely
123 : /// want to start with self.query(key.start), and then follow up with self.range
124 : ///
125 : /// Complexity: O(log N + result_size)
126 1724366 : pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
127 1724366 : self.nodes
128 1724366 : .range(key)
129 1724366 : .map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
130 1724366 : }
131 :
132 : /// Returns an iterator which includes all coverage changes for layers that intersect
133 : /// with the provided range.
134 1724168 : pub fn range_overlaps(
135 1724168 : &self,
136 1724168 : key_range: &Range<i128>,
137 1724168 : ) -> impl Iterator<Item = (i128, Option<Value>)> + '_
138 1724168 : where
139 1724168 : Value: Eq,
140 1724168 : {
141 1724168 : let first_change = self.query(key_range.start);
142 1724168 : match first_change {
143 1409493 : Some(change) => {
144 1409493 : // If the start of the range is covered, we have to deal with two cases:
145 1409493 : // 1. Start of the range is aligned with the start of a layer.
146 1409493 : // In this case the return of `self.range` will contain the layer which aligns with the start of the key range.
147 1409493 : // We advance said iterator to avoid duplicating the first change.
148 1409493 : // 2. Start of the range is not aligned with the start of a layer.
149 1409493 : let range = key_range.start..key_range.end;
150 1409493 : let mut range_coverage = self.range(range).peekable();
151 1409493 : if range_coverage
152 1409493 : .peek()
153 1409493 : .is_some_and(|c| c.1.as_ref() == Some(&change))
154 11301 : {
155 11301 : range_coverage.next();
156 1398192 : }
157 1409493 : itertools::Either::Left(
158 1409493 : std::iter::once((key_range.start, Some(change))).chain(range_coverage),
159 1409493 : )
160 : }
161 : None => {
162 314675 : let range = key_range.start..key_range.end;
163 314675 : let coverage = self.range(range);
164 314675 : itertools::Either::Right(coverage)
165 : }
166 : }
167 1724168 : }
168 : /// O(1) clone
169 35760 : pub fn clone(&self) -> Self {
170 35760 : Self {
171 35760 : nodes: self.nodes.clone(),
172 35760 : }
173 35760 : }
174 : }
175 :
176 : /// Image and delta coverage at a specific LSN.
177 : pub struct LayerCoverageTuple<Value> {
178 : pub image_coverage: LayerCoverage<Value>,
179 : pub delta_coverage: LayerCoverage<Value>,
180 : }
181 :
182 : impl<T: Clone> Default for LayerCoverageTuple<T> {
183 2454 : fn default() -> Self {
184 2454 : Self {
185 2454 : image_coverage: LayerCoverage::default(),
186 2454 : delta_coverage: LayerCoverage::default(),
187 2454 : }
188 2454 : }
189 : }
190 :
191 : impl<Value: Clone> LayerCoverageTuple<Value> {
192 17880 : pub fn clone(&self) -> Self {
193 17880 : Self {
194 17880 : image_coverage: self.image_coverage.clone(),
195 17880 : delta_coverage: self.delta_coverage.clone(),
196 17880 : }
197 17880 : }
198 : }
|