Line data Source code
1 : //! The validator is responsible for validating DeletionLists for execution,
2 : //! based on whethe the generation in the DeletionList is still the latest
3 : //! generation for a tenant.
4 : //!
5 : //! The purpose of validation is to ensure split-brain safety in the cluster
6 : //! of pageservers: a deletion may only be executed if the tenant generation
7 : //! that originated it is still current. See docs/rfcs/025-generation-numbers.md
8 : //! The purpose of accumulating lists before validating them is to reduce load
9 : //! on the control plane API by issuing fewer, larger requests.
10 : //!
11 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
12 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
13 : //! to decide when old
14 : //!
15 : //! Deletions are passed onward to the Deleter.
16 :
17 : use std::collections::HashMap;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 :
21 : use camino::Utf8PathBuf;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{debug, info, warn};
24 :
25 : use super::deleter::DeleterMessage;
26 : use super::{DeletionHeader, DeletionList, DeletionQueueError, FlushOp, VisibleLsnUpdates};
27 : use crate::config::PageServerConf;
28 : use crate::controller_upcall_client::{ControlPlaneGenerationsApi, RetryForeverError};
29 : use crate::metrics;
30 : use crate::virtual_file::MaybeFatalIo;
31 :
32 : // After this length of time, do any validation work that is pending,
33 : // even if we haven't accumulated many keys to delete.
34 : //
35 : // This also causes updates to remote_consistent_lsn to be validated, even
36 : // if there were no deletions enqueued.
37 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
38 :
39 : // If we have received this number of keys, proceed with attempting to execute
40 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
41 :
42 : #[derive(Debug)]
43 : pub(super) enum ValidatorQueueMessage {
44 : Delete(DeletionList),
45 : Flush(FlushOp),
46 : }
47 : pub(super) struct Validator<C>
48 : where
49 : C: ControlPlaneGenerationsApi,
50 : {
51 : conf: &'static PageServerConf,
52 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
53 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
54 :
55 : // Client for calling into control plane API for validation of deletes
56 : controller_upcall_client: Option<C>,
57 :
58 : // DeletionLists which are waiting generation validation. Not safe to
59 : // execute until [`validate`] has processed them.
60 : pending_lists: Vec<DeletionList>,
61 :
62 : // DeletionLists which have passed validation and are ready to execute.
63 : validated_lists: Vec<DeletionList>,
64 :
65 : // Sum of all the lengths of lists in pending_lists
66 : pending_key_count: usize,
67 :
68 : // Lsn validation state: we read projected LSNs and write back visible LSNs
69 : // after validation. This is the LSN equivalent of `pending_validation_lists`:
70 : // it is drained in [`validate`]
71 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
72 :
73 : // If we failed to rewrite a deletion list due to local filesystem I/O failure,
74 : // we must remember that and refuse to advance our persistent validated sequence
75 : // number past the failure.
76 : list_write_failed: Option<u64>,
77 :
78 : cancel: CancellationToken,
79 : }
80 :
81 : impl<C> Validator<C>
82 : where
83 : C: ControlPlaneGenerationsApi,
84 : {
85 16 : pub(super) fn new(
86 16 : conf: &'static PageServerConf,
87 16 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
88 16 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
89 16 : controller_upcall_client: Option<C>,
90 16 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
91 16 : cancel: CancellationToken,
92 16 : ) -> Self {
93 16 : Self {
94 16 : conf,
95 16 : rx,
96 16 : tx,
97 16 : controller_upcall_client,
98 16 : lsn_table,
99 16 : pending_lists: Vec::new(),
100 16 : validated_lists: Vec::new(),
101 16 : pending_key_count: 0,
102 16 : list_write_failed: None,
103 16 : cancel,
104 16 : }
105 16 : }
106 : /// Process any outstanding validations of generations of pending LSN updates or pending
107 : /// DeletionLists.
108 : ///
109 : /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
110 : /// go into the queue of ready-to-execute lists.
111 24 : async fn validate(&mut self) -> Result<(), DeletionQueueError> {
112 24 : let mut tenant_generations = HashMap::new();
113 44 : for list in &self.pending_lists {
114 40 : for (tenant_id, tenant_list) in &list.tenants {
115 20 : // Note: DeletionLists are in logical time order, so generation always
116 20 : // goes up. By doing a simple insert() we will always end up with
117 20 : // the latest generation seen for a tenant.
118 20 : tenant_generations.insert(*tenant_id, tenant_list.generation);
119 20 : }
120 : }
121 :
122 24 : let pending_lsn_updates = {
123 24 : let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
124 24 : std::mem::take(&mut *lsn_table)
125 : };
126 24 : for (tenant_id, update) in &pending_lsn_updates.tenants {
127 0 : let entry = tenant_generations
128 0 : .entry(*tenant_id)
129 0 : .or_insert(update.generation);
130 0 : if update.generation > *entry {
131 0 : *entry = update.generation;
132 0 : }
133 : }
134 :
135 24 : if tenant_generations.is_empty() {
136 : // No work to do
137 8 : return Ok(());
138 16 : }
139 :
140 16 : let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
141 16 : match controller_upcall_client
142 16 : .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
143 16 : .await
144 : {
145 16 : Ok(tenants) => tenants,
146 : Err(RetryForeverError::ShuttingDown) => {
147 : // The only way a validation call returns an error is when the cancellation token fires
148 0 : return Err(DeletionQueueError::ShuttingDown);
149 : }
150 : }
151 : } else {
152 : // Control plane API disabled. In legacy mode we consider everything valid.
153 0 : tenant_generations.keys().map(|k| (*k, true)).collect()
154 : };
155 :
156 16 : let mut validated_sequence: Option<u64> = None;
157 :
158 : // Apply the validation results to the pending LSN updates
159 16 : for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
160 0 : let validated_generation = tenant_generations
161 0 : .get(&tenant_id)
162 0 : .expect("Map was built from the same keys we're reading");
163 0 :
164 0 : let valid = tenants_valid
165 0 : .get(&tenant_id)
166 0 : .copied()
167 0 : // If the tenant was missing from the validation response, it has been deleted.
168 0 : // The Timeline that requested the LSN update is probably already torn down,
169 0 : // or will be torn down soon. In this case, drop the update by setting valid=false.
170 0 : .unwrap_or(false);
171 0 :
172 0 : if valid && *validated_generation == tenant_lsn_state.generation {
173 0 : for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
174 0 : tracing::debug!(
175 : %tenant_id,
176 : %timeline_id,
177 0 : current = %pending_lsn.result_slot.load(),
178 0 : projected = %pending_lsn.projected,
179 0 : "advancing validated remote_consistent_lsn",
180 : );
181 0 : pending_lsn.result_slot.store(pending_lsn.projected);
182 : }
183 : } else {
184 : // If we failed validation, then do not apply any of the projected updates
185 0 : info!(
186 0 : "Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}",
187 : tenant_lsn_state.generation
188 : );
189 0 : metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
190 : }
191 : }
192 :
193 : // Apply the validation results to the pending deletion lists
194 36 : for list in &mut self.pending_lists {
195 : // Filter the list based on whether the server responded valid: true.
196 : // If a tenant is omitted in the response, it has been deleted, and we should
197 : // proceed with deletion.
198 20 : let mut mutated = false;
199 20 : list.tenants.retain(|tenant_id, tenant| {
200 20 : let validated_generation = tenant_generations
201 20 : .get(tenant_id)
202 20 : .expect("Map was built from the same keys we're reading");
203 20 :
204 20 : // If the tenant was missing from the validation response, it has been deleted.
205 20 : // This means that a deletion is valid, but also redundant since the tenant's
206 20 : // objects should have already been deleted. Treat it as invalid to drop the
207 20 : // redundant deletion.
208 20 : let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
209 :
210 : // A list is valid if it comes from the current _or previous_ generation.
211 : // - The previous generation case is permitted due to how we store deletion lists locally:
212 : // if we see the immediately previous generation in a locally stored deletion list,
213 : // it proves that this node's disk was used for both current & previous generations,
214 : // and therefore no other node was involved in between: the two generations may be
215 : // logically treated as the same.
216 : // - In that previous generation case, we rewrote it to the current generation
217 : // in recover(), so the comparison here is simply an equality.
218 :
219 20 : let this_list_valid = valid
220 16 : && (tenant.generation == *validated_generation);
221 :
222 20 : if !this_list_valid {
223 8 : info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
224 8 : metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
225 8 : mutated = true;
226 12 : } else {
227 12 : metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
228 12 : }
229 20 : this_list_valid
230 20 : });
231 20 : list.validated = true;
232 20 :
233 20 : if mutated {
234 : // Save the deletion list if we had to make changes due to stale generations. The
235 : // saved list is valid for execution.
236 8 : if let Err(e) = list.save(self.conf).await {
237 : // Highly unexpected. Could happen if e.g. disk full.
238 : // If we didn't save the trimmed list, it is _not_ valid to execute.
239 0 : warn!("Failed to save modified deletion list {list}: {e:#}");
240 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
241 0 :
242 0 : // Rather than have a complex retry process, just drop it and leak the objects,
243 0 : // scrubber will clean up eventually.
244 0 : list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
245 0 :
246 0 : // We must remember this failure, to prevent later writing out a header that
247 0 : // would imply the unwritable list was valid on disk.
248 0 : if self.list_write_failed.is_none() {
249 0 : self.list_write_failed = Some(list.sequence);
250 0 : }
251 8 : }
252 12 : }
253 :
254 20 : validated_sequence = Some(list.sequence);
255 : }
256 :
257 16 : if let Some(validated_sequence) = validated_sequence {
258 16 : if let Some(list_write_failed) = self.list_write_failed {
259 : // Rare error case: we failed to write out a deletion list to excise invalid
260 : // entries, so we cannot advance the header's valid sequence number past that point.
261 : //
262 : // In this state we will continue to validate, execute and delete deletion lists,
263 : // we just cannot update the header. It should be noticed and fixed by a human due to
264 : // the nonzero value of our unexpected_errors metric.
265 0 : warn!(
266 : sequence_number = list_write_failed,
267 0 : "Cannot write header because writing a deletion list failed earlier",
268 : );
269 : } else {
270 : // Write the queue header to record how far validation progressed. This avoids having
271 : // to rewrite each DeletionList to set validated=true in it.
272 16 : let header = DeletionHeader::new(validated_sequence);
273 :
274 : // Drop result because the validated_sequence is an optimization. If we fail to save it,
275 : // then restart, we will drop some deletion lists, creating work for scrubber.
276 : // The save() function logs a warning on error.
277 16 : if let Err(e) = header.save(self.conf).await {
278 0 : warn!("Failed to write deletion queue header: {e:#}");
279 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
280 16 : }
281 : }
282 0 : }
283 :
284 : // Transfer the validated lists to the validated queue, for eventual execution
285 16 : self.validated_lists.append(&mut self.pending_lists);
286 16 :
287 16 : Ok(())
288 24 : }
289 :
290 16 : async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
291 36 : for list_path in list_paths {
292 20 : debug!("Removing deletion list {list_path}");
293 20 : tokio::fs::remove_file(&list_path)
294 20 : .await
295 20 : .fatal_err("remove deletion list");
296 : }
297 16 : }
298 :
299 24 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
300 24 : tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
301 :
302 : // Issue any required generation validation calls to the control plane
303 24 : self.validate().await?;
304 :
305 : // After successful validation, nothing is pending: any lists that
306 : // made it through validation will be in validated_lists.
307 24 : assert!(self.pending_lists.is_empty());
308 24 : self.pending_key_count = 0;
309 24 :
310 24 : tracing::debug!(
311 0 : "Validation complete, have {} validated lists",
312 0 : self.validated_lists.len()
313 : );
314 :
315 : // Return quickly if we have no validated lists to execute. This avoids flushing the
316 : // executor when an idle backend hits its autoflush interval
317 24 : if self.validated_lists.is_empty() {
318 8 : return Ok(());
319 16 : }
320 16 :
321 16 : // Drain `validated_lists` into the executor
322 16 : let mut executing_lists = Vec::new();
323 20 : for list in self.validated_lists.drain(..) {
324 20 : let list_path = self.conf.deletion_list_path(list.sequence);
325 20 : let objects = list.into_remote_paths();
326 20 : self.tx
327 20 : .send(DeleterMessage::Delete(objects))
328 20 : .await
329 20 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
330 20 : executing_lists.push(list_path);
331 : }
332 :
333 16 : self.flush_executor().await?;
334 :
335 : // Erase the deletion lists whose keys have all be deleted from remote storage
336 16 : self.cleanup_lists(executing_lists).await;
337 :
338 16 : Ok(())
339 24 : }
340 :
341 16 : async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
342 16 : // Flush the executor, so that all the keys referenced by these deletion lists
343 16 : // are actually removed from remote storage. This is a precondition to deleting
344 16 : // the deletion lists themselves.
345 16 : let (flush_op, rx) = FlushOp::new();
346 16 : self.tx
347 16 : .send(DeleterMessage::Flush(flush_op))
348 16 : .await
349 16 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
350 :
351 16 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
352 16 : }
353 :
354 16 : pub(super) async fn background(&mut self) {
355 16 : tracing::info!("Started deletion backend worker");
356 :
357 68 : while !self.cancel.is_cancelled() {
358 68 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
359 48 : Ok(Some(m)) => m,
360 : Ok(None) => {
361 : // All queue senders closed
362 4 : info!("Shutting down");
363 4 : break;
364 : }
365 : Err(_) => {
366 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
367 : // return immediately if no work is pending.
368 4 : match self.flush().await {
369 4 : Ok(()) => {}
370 0 : Err(DeletionQueueError::ShuttingDown) => {
371 0 : // If we are shutting down, then auto-flush can safely be skipped
372 0 : }
373 : }
374 :
375 4 : continue;
376 : }
377 : };
378 :
379 48 : match msg {
380 28 : ValidatorQueueMessage::Delete(list) => {
381 28 : if list.validated {
382 : // A pre-validated list may only be seen during recovery, if we are recovering
383 : // a DeletionList whose on-disk state has validated=true
384 0 : self.validated_lists.push(list)
385 28 : } else {
386 28 : self.pending_key_count += list.len();
387 28 : self.pending_lists.push(list);
388 28 : }
389 :
390 28 : if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
391 0 : match self.flush().await {
392 0 : Ok(()) => {}
393 0 : Err(DeletionQueueError::ShuttingDown) => {
394 0 : // If we are shutting down, then auto-flush can safely be skipped
395 0 : }
396 : }
397 28 : }
398 : }
399 20 : ValidatorQueueMessage::Flush(op) => {
400 20 : match self.flush().await {
401 20 : Ok(()) => {
402 20 : op.notify();
403 20 : }
404 0 : Err(DeletionQueueError::ShuttingDown) => {
405 0 : // If we fail due to shutting down, we will just drop `op` to propagate that status.
406 0 : }
407 : }
408 : }
409 : }
410 : }
411 4 : }
412 : }
|