Line data Source code
1 : //! The validator is responsible for validating DeletionLists for execution,
2 : //! based on whethe the generation in the DeletionList is still the latest
3 : //! generation for a tenant.
4 : //!
5 : //! The purpose of validation is to ensure split-brain safety in the cluster
6 : //! of pageservers: a deletion may only be executed if the tenant generation
7 : //! that originated it is still current. See docs/rfcs/025-generation-numbers.md
8 : //! The purpose of accumulating lists before validating them is to reduce load
9 : //! on the control plane API by issuing fewer, larger requests.
10 : //!
11 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
12 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
13 : //! to decide when old
14 : //!
15 : //! Deletions are passed onward to the Deleter.
16 :
17 : use std::collections::HashMap;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 :
21 : use camino::Utf8PathBuf;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::debug;
24 : use tracing::info;
25 : use tracing::warn;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::control_plane_client::ControlPlaneGenerationsApi;
29 : use crate::control_plane_client::RetryForeverError;
30 : use crate::metrics;
31 : use crate::virtual_file::MaybeFatalIo;
32 :
33 : use super::deleter::DeleterMessage;
34 : use super::DeletionHeader;
35 : use super::DeletionList;
36 : use super::DeletionQueueError;
37 : use super::FlushOp;
38 : use super::VisibleLsnUpdates;
39 :
40 : // After this length of time, do any validation work that is pending,
41 : // even if we haven't accumulated many keys to delete.
42 : //
43 : // This also causes updates to remote_consistent_lsn to be validated, even
44 : // if there were no deletions enqueued.
45 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
46 :
47 : // If we have received this number of keys, proceed with attempting to execute
48 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
49 :
50 0 : #[derive(Debug)]
51 : pub(super) enum ValidatorQueueMessage {
52 : Delete(DeletionList),
53 : Flush(FlushOp),
54 : }
55 : pub(super) struct Validator<C>
56 : where
57 : C: ControlPlaneGenerationsApi,
58 : {
59 : conf: &'static PageServerConf,
60 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
61 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
62 :
63 : // Client for calling into control plane API for validation of deletes
64 : control_plane_client: Option<C>,
65 :
66 : // DeletionLists which are waiting generation validation. Not safe to
67 : // execute until [`validate`] has processed them.
68 : pending_lists: Vec<DeletionList>,
69 :
70 : // DeletionLists which have passed validation and are ready to execute.
71 : validated_lists: Vec<DeletionList>,
72 :
73 : // Sum of all the lengths of lists in pending_lists
74 : pending_key_count: usize,
75 :
76 : // Lsn validation state: we read projected LSNs and write back visible LSNs
77 : // after validation. This is the LSN equivalent of `pending_validation_lists`:
78 : // it is drained in [`validate`]
79 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
80 :
81 : // If we failed to rewrite a deletion list due to local filesystem I/O failure,
82 : // we must remember that and refuse to advance our persistent validated sequence
83 : // number past the failure.
84 : list_write_failed: Option<u64>,
85 :
86 : cancel: CancellationToken,
87 : }
88 :
89 : impl<C> Validator<C>
90 : where
91 : C: ControlPlaneGenerationsApi,
92 : {
93 633 : pub(super) fn new(
94 633 : conf: &'static PageServerConf,
95 633 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
96 633 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
97 633 : control_plane_client: Option<C>,
98 633 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
99 633 : cancel: CancellationToken,
100 633 : ) -> Self {
101 633 : Self {
102 633 : conf,
103 633 : rx,
104 633 : tx,
105 633 : control_plane_client,
106 633 : lsn_table,
107 633 : pending_lists: Vec::new(),
108 633 : validated_lists: Vec::new(),
109 633 : pending_key_count: 0,
110 633 : list_write_failed: None,
111 633 : cancel,
112 633 : }
113 633 : }
114 : /// Process any outstanding validations of generations of pending LSN updates or pending
115 : /// DeletionLists.
116 : ///
117 : /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
118 : /// go into the queue of ready-to-execute lists.
119 812 : async fn validate(&mut self) -> Result<(), DeletionQueueError> {
120 812 : let mut tenant_generations = HashMap::new();
121 852 : for list in &self.pending_lists {
122 80 : for (tenant_id, tenant_list) in &list.tenants {
123 40 : // Note: DeletionLists are in logical time order, so generation always
124 40 : // goes up. By doing a simple insert() we will always end up with
125 40 : // the latest generation seen for a tenant.
126 40 : tenant_generations.insert(*tenant_id, tenant_list.generation);
127 40 : }
128 : }
129 :
130 812 : let pending_lsn_updates = {
131 812 : let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
132 812 : std::mem::take(&mut *lsn_table)
133 : };
134 1336 : for (tenant_id, update) in &pending_lsn_updates.tenants {
135 524 : let entry = tenant_generations
136 524 : .entry(*tenant_id)
137 524 : .or_insert(update.generation);
138 524 : if update.generation > *entry {
139 7 : *entry = update.generation;
140 517 : }
141 : }
142 :
143 812 : if tenant_generations.is_empty() {
144 : // No work to do
145 361 : return Ok(());
146 451 : }
147 :
148 451 : let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
149 451 : match control_plane_client
150 539 : .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
151 1013 : .await
152 : {
153 448 : Ok(tenants) => tenants,
154 : Err(RetryForeverError::ShuttingDown) => {
155 : // The only way a validation call returns an error is when the cancellation token fires
156 1 : return Err(DeletionQueueError::ShuttingDown);
157 : }
158 : }
159 : } else {
160 : // Control plane API disabled. In legacy mode we consider everything valid.
161 0 : tenant_generations.keys().map(|k| (*k, true)).collect()
162 : };
163 :
164 448 : let mut validated_sequence: Option<u64> = None;
165 :
166 : // Apply the validation results to the pending LSN updates
167 969 : for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
168 521 : let validated_generation = tenant_generations
169 521 : .get(&tenant_id)
170 521 : .expect("Map was built from the same keys we're reading");
171 521 :
172 521 : let valid = tenants_valid
173 521 : .get(&tenant_id)
174 521 : .copied()
175 521 : // If the tenant was missing from the validation response, it has been deleted.
176 521 : // The Timeline that requested the LSN update is probably already torn down,
177 521 : // or will be torn down soon. In this case, drop the update by setting valid=false.
178 521 : .unwrap_or(false);
179 521 :
180 521 : if valid && *validated_generation == tenant_lsn_state.generation {
181 1203 : for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
182 0 : tracing::debug!(
183 0 : %tenant_id,
184 0 : %timeline_id,
185 0 : current = %pending_lsn.result_slot.load(),
186 0 : projected = %pending_lsn.projected,
187 0 : "advancing validated remote_consistent_lsn",
188 0 : );
189 714 : pending_lsn.result_slot.store(pending_lsn.projected);
190 : }
191 : } else {
192 : // If we failed validation, then do not apply any of the projected updates
193 32 : warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}", tenant_lsn_state.generation);
194 32 : metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
195 : }
196 : }
197 :
198 : // Apply the validation results to the pending deletion lists
199 488 : for list in &mut self.pending_lists {
200 : // Filter the list based on whether the server responded valid: true.
201 : // If a tenant is omitted in the response, it has been deleted, and we should
202 : // proceed with deletion.
203 40 : let mut mutated = false;
204 40 : list.tenants.retain(|tenant_id, tenant| {
205 40 : let validated_generation = tenant_generations
206 40 : .get(tenant_id)
207 40 : .expect("Map was built from the same keys we're reading");
208 40 :
209 40 : // If the tenant was missing from the validation response, it has been deleted.
210 40 : // This means that a deletion is valid, but also redundant since the tenant's
211 40 : // objects should have already been deleted. Treat it as invalid to drop the
212 40 : // redundant deletion.
213 40 : let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
214 :
215 : // A list is valid if it comes from the current _or previous_ generation.
216 : // - The previous generation case is permitted due to how we store deletion lists locally:
217 : // if we see the immediately previous generation in a locally stored deletion list,
218 : // it proves that this node's disk was used for both current & previous generations,
219 : // and therefore no other node was involved in between: the two generations may be
220 : // logically treated as the same.
221 : // - In that previous generation case, we rewrote it to the current generation
222 : // in recover(), so the comparison here is simply an equality.
223 :
224 40 : let this_list_valid = valid
225 31 : && (tenant.generation == *validated_generation);
226 :
227 40 : if !this_list_valid {
228 14 : warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
229 14 : metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
230 14 : mutated = true;
231 26 : } else {
232 26 : metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
233 26 : }
234 40 : this_list_valid
235 40 : });
236 40 : list.validated = true;
237 40 :
238 40 : if mutated {
239 : // Save the deletion list if we had to make changes due to stale generations. The
240 : // saved list is valid for execution.
241 14 : if let Err(e) = list.save(self.conf).await {
242 : // Highly unexpected. Could happen if e.g. disk full.
243 : // If we didn't save the trimmed list, it is _not_ valid to execute.
244 0 : warn!("Failed to save modified deletion list {list}: {e:#}");
245 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
246 0 :
247 0 : // Rather than have a complex retry process, just drop it and leak the objects,
248 0 : // scrubber will clean up eventually.
249 0 : list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
250 0 :
251 0 : // We must remember this failure, to prevent later writing out a header that
252 0 : // would imply the unwritable list was valid on disk.
253 0 : if self.list_write_failed.is_none() {
254 0 : self.list_write_failed = Some(list.sequence);
255 0 : }
256 14 : }
257 26 : }
258 :
259 40 : validated_sequence = Some(list.sequence);
260 : }
261 :
262 448 : if let Some(validated_sequence) = validated_sequence {
263 38 : if let Some(list_write_failed) = self.list_write_failed {
264 : // Rare error case: we failed to write out a deletion list to excise invalid
265 : // entries, so we cannot advance the header's valid sequence number past that point.
266 : //
267 : // In this state we will continue to validate, execute and delete deletion lists,
268 : // we just cannot update the header. It should be noticed and fixed by a human due to
269 : // the nonzero value of our unexpected_errors metric.
270 0 : warn!(
271 0 : sequence_number = list_write_failed,
272 0 : "Cannot write header because writing a deletion list failed earlier",
273 0 : );
274 : } else {
275 : // Write the queue header to record how far validation progressed. This avoids having
276 : // to rewrite each DeletionList to set validated=true in it.
277 38 : let header = DeletionHeader::new(validated_sequence);
278 :
279 : // Drop result because the validated_sequence is an optimization. If we fail to save it,
280 : // then restart, we will drop some deletion lists, creating work for scrubber.
281 : // The save() function logs a warning on error.
282 38 : if let Err(e) = header.save(self.conf).await {
283 0 : warn!("Failed to write deletion queue header: {e:#}");
284 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
285 38 : }
286 : }
287 410 : }
288 :
289 : // Transfer the validated lists to the validated queue, for eventual execution
290 448 : self.validated_lists.append(&mut self.pending_lists);
291 448 :
292 448 : Ok(())
293 810 : }
294 :
295 38 : async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
296 78 : for list_path in list_paths {
297 0 : debug!("Removing deletion list {list_path}");
298 40 : tokio::fs::remove_file(&list_path)
299 40 : .await
300 40 : .fatal_err("remove deletion list");
301 : }
302 38 : }
303 :
304 812 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
305 0 : tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
306 :
307 : // Issue any required generation validation calls to the control plane
308 1065 : self.validate().await?;
309 :
310 : // After successful validation, nothing is pending: any lists that
311 : // made it through validation will be in validated_lists.
312 809 : assert!(self.pending_lists.is_empty());
313 809 : self.pending_key_count = 0;
314 :
315 0 : tracing::debug!(
316 0 : "Validation complete, have {} validated lists",
317 0 : self.validated_lists.len()
318 0 : );
319 :
320 : // Return quickly if we have no validated lists to execute. This avoids flushing the
321 : // executor when an idle backend hits its autoflush interval
322 809 : if self.validated_lists.is_empty() {
323 769 : return Ok(());
324 40 : }
325 40 :
326 40 : // Drain `validated_lists` into the executor
327 40 : let mut executing_lists = Vec::new();
328 42 : for list in self.validated_lists.drain(..) {
329 42 : let list_path = self.conf.deletion_list_path(list.sequence);
330 42 : let objects = list.into_remote_paths();
331 42 : self.tx
332 42 : .send(DeleterMessage::Delete(objects))
333 0 : .await
334 42 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
335 42 : executing_lists.push(list_path);
336 : }
337 :
338 40 : self.flush_executor().await?;
339 :
340 : // Erase the deletion lists whose keys have all be deleted from remote storage
341 40 : self.cleanup_lists(executing_lists).await;
342 :
343 38 : Ok(())
344 808 : }
345 :
346 40 : async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
347 40 : // Flush the executor, so that all the keys referenced by these deletion lists
348 40 : // are actually removed from remote storage. This is a precondition to deleting
349 40 : // the deletion lists themselves.
350 40 : let (flush_op, rx) = FlushOp::new();
351 40 : self.tx
352 40 : .send(DeleterMessage::Flush(flush_op))
353 0 : .await
354 40 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
355 :
356 40 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
357 38 : }
358 :
359 633 : pub(super) async fn background(&mut self) {
360 633 : tracing::info!("Started deletion backend worker");
361 :
362 1525 : while !self.cancel.is_cancelled() {
363 1520 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
364 274 : Ok(Some(m)) => m,
365 : Ok(None) => {
366 : // All queue senders closed
367 3 : info!("Shutting down");
368 3 : break;
369 : }
370 : Err(_) => {
371 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
372 : // return immediately if no work is pending.
373 862 : match self.flush().await {
374 617 : Ok(()) => {}
375 1 : Err(DeletionQueueError::ShuttingDown) => {
376 1 : // If we are shutting down, then auto-flush can safely be skipped
377 1 : }
378 : }
379 :
380 618 : continue;
381 : }
382 : };
383 :
384 274 : match msg {
385 84 : ValidatorQueueMessage::Delete(list) => {
386 84 : if list.validated {
387 : // A pre-validated list may only be seen during recovery, if we are recovering
388 : // a DeletionList whose on-disk state has validated=true
389 2 : self.validated_lists.push(list)
390 82 : } else {
391 82 : self.pending_key_count += list.len();
392 82 : self.pending_lists.push(list);
393 82 : }
394 :
395 84 : if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
396 0 : match self.flush().await {
397 0 : Ok(()) => {}
398 0 : Err(DeletionQueueError::ShuttingDown) => {
399 0 : // If we are shutting down, then auto-flush can safely be skipped
400 0 : }
401 : }
402 84 : }
403 : }
404 190 : ValidatorQueueMessage::Flush(op) => {
405 281 : match self.flush().await {
406 190 : Ok(()) => {
407 190 : op.notify();
408 190 : }
409 0 : Err(DeletionQueueError::ShuttingDown) => {
410 0 : // If we fail due to shutting down, we will just drop `op` to propagate that status.
411 0 : }
412 : }
413 : }
414 : }
415 : }
416 8 : }
417 : }
|