Line data Source code
1 : //! The validator is responsible for validating DeletionLists for execution,
2 : //! based on whether the generation in the DeletionList is still the latest
3 : //! generation for a tenant.
4 : //!
5 : //! The purpose of validation is to ensure split-brain safety in the cluster
6 : //! of pageservers: a deletion may only be executed if the tenant generation
7 : //! that originated it is still current. See docs/rfcs/025-generation-numbers.md
8 : //! The purpose of accumulating lists before validating them is to reduce load
9 : //! on the control plane API by issuing fewer, larger requests.
10 : //!
11 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
12 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
13 : //! to decide when old
14 : //!
15 : //! Deletions are passed onward to the Deleter.
16 :
17 : use std::collections::HashMap;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 :
21 : use camino::Utf8PathBuf;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{debug, info, warn};
24 :
25 : use super::deleter::DeleterMessage;
26 : use super::{DeletionHeader, DeletionList, DeletionQueueError, FlushOp, VisibleLsnUpdates};
27 : use crate::config::PageServerConf;
28 : use crate::controller_upcall_client::{RetryForeverError, StorageControllerUpcallApi};
29 : use crate::metrics;
30 : use crate::virtual_file::MaybeFatalIo;
31 :
32 : // After this length of time, do any validation work that is pending,
33 : // even if we haven't accumulated many keys to delete.
34 : //
35 : // This also causes updates to remote_consistent_lsn to be validated, even
36 : // if there were no deletions enqueued.
37 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
38 :
39 : // If we have received this number of keys, proceed with attempting to execute
40 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
41 :
42 : #[derive(Debug)]
43 : pub(super) enum ValidatorQueueMessage {
44 : Delete(DeletionList),
45 : Flush(FlushOp),
46 : }
47 : pub(super) struct Validator<C>
48 : where
49 : C: StorageControllerUpcallApi,
50 : {
51 : conf: &'static PageServerConf,
52 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
53 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
54 :
55 : // Client for calling into control plane API for validation of deletes
56 : controller_upcall_client: C,
57 :
58 : // DeletionLists which are waiting generation validation. Not safe to
59 : // execute until [`validate`] has processed them.
60 : pending_lists: Vec<DeletionList>,
61 :
62 : // DeletionLists which have passed validation and are ready to execute.
63 : validated_lists: Vec<DeletionList>,
64 :
65 : // Sum of all the lengths of lists in pending_lists
66 : pending_key_count: usize,
67 :
68 : // Lsn validation state: we read projected LSNs and write back visible LSNs
69 : // after validation. This is the LSN equivalent of `pending_validation_lists`:
70 : // it is drained in [`validate`]
71 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
72 :
73 : // If we failed to rewrite a deletion list due to local filesystem I/O failure,
74 : // we must remember that and refuse to advance our persistent validated sequence
75 : // number past the failure.
76 : list_write_failed: Option<u64>,
77 :
78 : cancel: CancellationToken,
79 : }
80 :
81 : impl<C> Validator<C>
82 : where
83 : C: StorageControllerUpcallApi,
84 : {
85 4 : pub(super) fn new(
86 4 : conf: &'static PageServerConf,
87 4 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
88 4 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
89 4 : controller_upcall_client: C,
90 4 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
91 4 : cancel: CancellationToken,
92 4 : ) -> Self {
93 4 : Self {
94 4 : conf,
95 4 : rx,
96 4 : tx,
97 4 : controller_upcall_client,
98 4 : lsn_table,
99 4 : pending_lists: Vec::new(),
100 4 : validated_lists: Vec::new(),
101 4 : pending_key_count: 0,
102 4 : list_write_failed: None,
103 4 : cancel,
104 4 : }
105 4 : }
106 : /// Process any outstanding validations of generations of pending LSN updates or pending
107 : /// DeletionLists.
108 : ///
109 : /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
110 : /// go into the queue of ready-to-execute lists.
111 6 : async fn validate(&mut self) -> Result<(), DeletionQueueError> {
112 6 : let mut tenant_generations = HashMap::new();
113 11 : for list in &self.pending_lists {
114 10 : for (tenant_id, tenant_list) in &list.tenants {
115 5 : // Note: DeletionLists are in logical time order, so generation always
116 5 : // goes up. By doing a simple insert() we will always end up with
117 5 : // the latest generation seen for a tenant.
118 5 : tenant_generations.insert(*tenant_id, tenant_list.generation);
119 5 : }
120 : }
121 :
122 6 : let pending_lsn_updates = {
123 6 : let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
124 6 : std::mem::take(&mut *lsn_table)
125 : };
126 6 : for (tenant_id, update) in &pending_lsn_updates.tenants {
127 0 : let entry = tenant_generations
128 0 : .entry(*tenant_id)
129 0 : .or_insert(update.generation);
130 0 : if update.generation > *entry {
131 0 : *entry = update.generation;
132 0 : }
133 : }
134 :
135 6 : if tenant_generations.is_empty() {
136 : // No work to do
137 2 : return Ok(());
138 4 : }
139 :
140 4 : let tenants_valid = match self
141 4 : .controller_upcall_client
142 4 : .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
143 4 : .await
144 : {
145 4 : Ok(tenants) => tenants,
146 : Err(RetryForeverError::ShuttingDown) => {
147 : // The only way a validation call returns an error is when the cancellation token fires
148 0 : return Err(DeletionQueueError::ShuttingDown);
149 : }
150 : };
151 :
152 4 : let mut validated_sequence: Option<u64> = None;
153 :
154 : // Apply the validation results to the pending LSN updates
155 4 : for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
156 0 : let validated_generation = tenant_generations
157 0 : .get(&tenant_id)
158 0 : .expect("Map was built from the same keys we're reading");
159 :
160 0 : let valid = tenants_valid
161 0 : .get(&tenant_id)
162 0 : .copied()
163 : // If the tenant was missing from the validation response, it has been deleted.
164 : // The Timeline that requested the LSN update is probably already torn down,
165 : // or will be torn down soon. In this case, drop the update by setting valid=false.
166 0 : .unwrap_or(false);
167 :
168 0 : if valid && *validated_generation == tenant_lsn_state.generation {
169 0 : for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
170 0 : tracing::debug!(
171 : %tenant_id,
172 : %timeline_id,
173 0 : current = %pending_lsn.result_slot.load(),
174 : projected = %pending_lsn.projected,
175 0 : "advancing validated remote_consistent_lsn",
176 : );
177 0 : pending_lsn.result_slot.store(pending_lsn.projected);
178 : }
179 : } else {
180 : // If we failed validation, then do not apply any of the projected updates
181 0 : info!(
182 0 : "Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}",
183 : tenant_lsn_state.generation
184 : );
185 0 : metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
186 : }
187 : }
188 :
189 : // Apply the validation results to the pending deletion lists
190 9 : for list in &mut self.pending_lists {
191 : // Filter the list based on whether the server responded valid: true.
192 : // If a tenant is omitted in the response, it has been deleted, and we should
193 : // proceed with deletion.
194 5 : let mut mutated = false;
195 5 : list.tenants.retain(|tenant_id, tenant| {
196 5 : let validated_generation = tenant_generations
197 5 : .get(tenant_id)
198 5 : .expect("Map was built from the same keys we're reading");
199 :
200 : // If the tenant was missing from the validation response, it has been deleted.
201 : // This means that a deletion is valid, but also redundant since the tenant's
202 : // objects should have already been deleted. Treat it as invalid to drop the
203 : // redundant deletion.
204 5 : let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
205 :
206 : // A list is valid if it comes from the current _or previous_ generation.
207 : // - The previous generation case is permitted due to how we store deletion lists locally:
208 : // if we see the immediately previous generation in a locally stored deletion list,
209 : // it proves that this node's disk was used for both current & previous generations,
210 : // and therefore no other node was involved in between: the two generations may be
211 : // logically treated as the same.
212 : // - In that previous generation case, we rewrote it to the current generation
213 : // in recover(), so the comparison here is simply an equality.
214 :
215 5 : let this_list_valid = valid
216 4 : && (tenant.generation == *validated_generation);
217 :
218 5 : if !this_list_valid {
219 2 : info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
220 2 : metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
221 2 : mutated = true;
222 3 : } else {
223 3 : metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
224 3 : }
225 5 : this_list_valid
226 5 : });
227 5 : list.validated = true;
228 :
229 5 : if mutated {
230 : // Save the deletion list if we had to make changes due to stale generations. The
231 : // saved list is valid for execution.
232 2 : if let Err(e) = list.save(self.conf).await {
233 : // Highly unexpected. Could happen if e.g. disk full.
234 : // If we didn't save the trimmed list, it is _not_ valid to execute.
235 0 : warn!("Failed to save modified deletion list {list}: {e:#}");
236 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
237 :
238 : // Rather than have a complex retry process, just drop it and leak the objects,
239 : // scrubber will clean up eventually.
240 0 : list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
241 :
242 : // We must remember this failure, to prevent later writing out a header that
243 : // would imply the unwritable list was valid on disk.
244 0 : if self.list_write_failed.is_none() {
245 0 : self.list_write_failed = Some(list.sequence);
246 0 : }
247 2 : }
248 3 : }
249 :
250 5 : validated_sequence = Some(list.sequence);
251 : }
252 :
253 4 : if let Some(validated_sequence) = validated_sequence {
254 4 : if let Some(list_write_failed) = self.list_write_failed {
255 : // Rare error case: we failed to write out a deletion list to excise invalid
256 : // entries, so we cannot advance the header's valid sequence number past that point.
257 : //
258 : // In this state we will continue to validate, execute and delete deletion lists,
259 : // we just cannot update the header. It should be noticed and fixed by a human due to
260 : // the nonzero value of our unexpected_errors metric.
261 0 : warn!(
262 : sequence_number = list_write_failed,
263 0 : "Cannot write header because writing a deletion list failed earlier",
264 : );
265 : } else {
266 : // Write the queue header to record how far validation progressed. This avoids having
267 : // to rewrite each DeletionList to set validated=true in it.
268 4 : let header = DeletionHeader::new(validated_sequence);
269 :
270 : // Drop result because the validated_sequence is an optimization. If we fail to save it,
271 : // then restart, we will drop some deletion lists, creating work for scrubber.
272 : // The save() function logs a warning on error.
273 4 : if let Err(e) = header.save(self.conf).await {
274 0 : warn!("Failed to write deletion queue header: {e:#}");
275 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
276 4 : }
277 : }
278 0 : }
279 :
280 : // Transfer the validated lists to the validated queue, for eventual execution
281 4 : self.validated_lists.append(&mut self.pending_lists);
282 :
283 4 : Ok(())
284 6 : }
285 :
286 4 : async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
287 9 : for list_path in list_paths {
288 5 : debug!("Removing deletion list {list_path}");
289 5 : tokio::fs::remove_file(&list_path)
290 5 : .await
291 5 : .fatal_err("remove deletion list");
292 : }
293 4 : }
294 :
295 6 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
296 6 : tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
297 :
298 : // Issue any required generation validation calls to the control plane
299 6 : self.validate().await?;
300 :
301 : // After successful validation, nothing is pending: any lists that
302 : // made it through validation will be in validated_lists.
303 6 : assert!(self.pending_lists.is_empty());
304 6 : self.pending_key_count = 0;
305 :
306 6 : tracing::debug!(
307 0 : "Validation complete, have {} validated lists",
308 0 : self.validated_lists.len()
309 : );
310 :
311 : // Return quickly if we have no validated lists to execute. This avoids flushing the
312 : // executor when an idle backend hits its autoflush interval
313 6 : if self.validated_lists.is_empty() {
314 2 : return Ok(());
315 4 : }
316 :
317 : // Drain `validated_lists` into the executor
318 4 : let mut executing_lists = Vec::new();
319 5 : for list in self.validated_lists.drain(..) {
320 5 : let list_path = self.conf.deletion_list_path(list.sequence);
321 5 : let objects = list.into_remote_paths();
322 5 : self.tx
323 5 : .send(DeleterMessage::Delete(objects))
324 5 : .await
325 5 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
326 5 : executing_lists.push(list_path);
327 : }
328 :
329 4 : self.flush_executor().await?;
330 :
331 : // Erase the deletion lists whose keys have all be deleted from remote storage
332 4 : self.cleanup_lists(executing_lists).await;
333 :
334 4 : Ok(())
335 6 : }
336 :
337 4 : async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
338 : // Flush the executor, so that all the keys referenced by these deletion lists
339 : // are actually removed from remote storage. This is a precondition to deleting
340 : // the deletion lists themselves.
341 4 : let (flush_op, rx) = FlushOp::new();
342 4 : self.tx
343 4 : .send(DeleterMessage::Flush(flush_op))
344 4 : .await
345 4 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
346 :
347 4 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
348 4 : }
349 :
350 4 : pub(super) async fn background(&mut self) {
351 4 : tracing::info!("Started deletion backend worker");
352 :
353 17 : while !self.cancel.is_cancelled() {
354 17 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
355 12 : Ok(Some(m)) => m,
356 : Ok(None) => {
357 : // All queue senders closed
358 1 : info!("Shutting down");
359 1 : break;
360 : }
361 : Err(_) => {
362 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
363 : // return immediately if no work is pending.
364 1 : match self.flush().await {
365 1 : Ok(()) => {}
366 0 : Err(DeletionQueueError::ShuttingDown) => {
367 0 : // If we are shutting down, then auto-flush can safely be skipped
368 0 : }
369 : }
370 :
371 1 : continue;
372 : }
373 : };
374 :
375 12 : match msg {
376 7 : ValidatorQueueMessage::Delete(list) => {
377 7 : if list.validated {
378 : // A pre-validated list may only be seen during recovery, if we are recovering
379 : // a DeletionList whose on-disk state has validated=true
380 0 : self.validated_lists.push(list)
381 7 : } else {
382 7 : self.pending_key_count += list.len();
383 7 : self.pending_lists.push(list);
384 7 : }
385 :
386 7 : if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
387 0 : match self.flush().await {
388 0 : Ok(()) => {}
389 0 : Err(DeletionQueueError::ShuttingDown) => {
390 0 : // If we are shutting down, then auto-flush can safely be skipped
391 0 : }
392 : }
393 7 : }
394 : }
395 5 : ValidatorQueueMessage::Flush(op) => {
396 5 : match self.flush().await {
397 5 : Ok(()) => {
398 5 : op.notify();
399 5 : }
400 0 : Err(DeletionQueueError::ShuttingDown) => {
401 0 : // If we fail due to shutting down, we will just drop `op` to propagate that status.
402 0 : }
403 : }
404 : }
405 : }
406 : }
407 1 : }
408 : }
|