Line data Source code
1 : //! The validator is responsible for validating DeletionLists for execution,
2 : //! based on whethe the generation in the DeletionList is still the latest
3 : //! generation for a tenant.
4 : //!
5 : //! The purpose of validation is to ensure split-brain safety in the cluster
6 : //! of pageservers: a deletion may only be executed if the tenant generation
7 : //! that originated it is still current. See docs/rfcs/025-generation-numbers.md
8 : //! The purpose of accumulating lists before validating them is to reduce load
9 : //! on the control plane API by issuing fewer, larger requests.
10 : //!
11 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
12 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
13 : //! to decide when old
14 : //!
15 : //! Deletions are passed onward to the Deleter.
16 :
17 : use std::collections::HashMap;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 :
21 : use camino::Utf8PathBuf;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::debug;
24 : use tracing::info;
25 : use tracing::warn;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::controller_upcall_client::ControlPlaneGenerationsApi;
29 : use crate::controller_upcall_client::RetryForeverError;
30 : use crate::metrics;
31 : use crate::virtual_file::MaybeFatalIo;
32 :
33 : use super::deleter::DeleterMessage;
34 : use super::DeletionHeader;
35 : use super::DeletionList;
36 : use super::DeletionQueueError;
37 : use super::FlushOp;
38 : use super::VisibleLsnUpdates;
39 :
40 : // After this length of time, do any validation work that is pending,
41 : // even if we haven't accumulated many keys to delete.
42 : //
43 : // This also causes updates to remote_consistent_lsn to be validated, even
44 : // if there were no deletions enqueued.
45 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
46 :
47 : // If we have received this number of keys, proceed with attempting to execute
48 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
49 :
50 : #[derive(Debug)]
51 : pub(super) enum ValidatorQueueMessage {
52 : Delete(DeletionList),
53 : Flush(FlushOp),
54 : }
55 : pub(super) struct Validator<C>
56 : where
57 : C: ControlPlaneGenerationsApi,
58 : {
59 : conf: &'static PageServerConf,
60 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
61 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
62 :
63 : // Client for calling into control plane API for validation of deletes
64 : controller_upcall_client: Option<C>,
65 :
66 : // DeletionLists which are waiting generation validation. Not safe to
67 : // execute until [`validate`] has processed them.
68 : pending_lists: Vec<DeletionList>,
69 :
70 : // DeletionLists which have passed validation and are ready to execute.
71 : validated_lists: Vec<DeletionList>,
72 :
73 : // Sum of all the lengths of lists in pending_lists
74 : pending_key_count: usize,
75 :
76 : // Lsn validation state: we read projected LSNs and write back visible LSNs
77 : // after validation. This is the LSN equivalent of `pending_validation_lists`:
78 : // it is drained in [`validate`]
79 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
80 :
81 : // If we failed to rewrite a deletion list due to local filesystem I/O failure,
82 : // we must remember that and refuse to advance our persistent validated sequence
83 : // number past the failure.
84 : list_write_failed: Option<u64>,
85 :
86 : cancel: CancellationToken,
87 : }
88 :
89 : impl<C> Validator<C>
90 : where
91 : C: ControlPlaneGenerationsApi,
92 : {
93 8 : pub(super) fn new(
94 8 : conf: &'static PageServerConf,
95 8 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
96 8 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
97 8 : controller_upcall_client: Option<C>,
98 8 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
99 8 : cancel: CancellationToken,
100 8 : ) -> Self {
101 8 : Self {
102 8 : conf,
103 8 : rx,
104 8 : tx,
105 8 : controller_upcall_client,
106 8 : lsn_table,
107 8 : pending_lists: Vec::new(),
108 8 : validated_lists: Vec::new(),
109 8 : pending_key_count: 0,
110 8 : list_write_failed: None,
111 8 : cancel,
112 8 : }
113 8 : }
114 : /// Process any outstanding validations of generations of pending LSN updates or pending
115 : /// DeletionLists.
116 : ///
117 : /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
118 : /// go into the queue of ready-to-execute lists.
119 10 : async fn validate(&mut self) -> Result<(), DeletionQueueError> {
120 10 : let mut tenant_generations = HashMap::new();
121 20 : for list in &self.pending_lists {
122 20 : for (tenant_id, tenant_list) in &list.tenants {
123 10 : // Note: DeletionLists are in logical time order, so generation always
124 10 : // goes up. By doing a simple insert() we will always end up with
125 10 : // the latest generation seen for a tenant.
126 10 : tenant_generations.insert(*tenant_id, tenant_list.generation);
127 10 : }
128 : }
129 :
130 10 : let pending_lsn_updates = {
131 10 : let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
132 10 : std::mem::take(&mut *lsn_table)
133 : };
134 10 : for (tenant_id, update) in &pending_lsn_updates.tenants {
135 0 : let entry = tenant_generations
136 0 : .entry(*tenant_id)
137 0 : .or_insert(update.generation);
138 0 : if update.generation > *entry {
139 0 : *entry = update.generation;
140 0 : }
141 : }
142 :
143 10 : if tenant_generations.is_empty() {
144 : // No work to do
145 2 : return Ok(());
146 8 : }
147 :
148 8 : let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
149 8 : match controller_upcall_client
150 8 : .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
151 0 : .await
152 : {
153 8 : Ok(tenants) => tenants,
154 : Err(RetryForeverError::ShuttingDown) => {
155 : // The only way a validation call returns an error is when the cancellation token fires
156 0 : return Err(DeletionQueueError::ShuttingDown);
157 : }
158 : }
159 : } else {
160 : // Control plane API disabled. In legacy mode we consider everything valid.
161 0 : tenant_generations.keys().map(|k| (*k, true)).collect()
162 : };
163 :
164 8 : let mut validated_sequence: Option<u64> = None;
165 :
166 : // Apply the validation results to the pending LSN updates
167 8 : for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
168 0 : let validated_generation = tenant_generations
169 0 : .get(&tenant_id)
170 0 : .expect("Map was built from the same keys we're reading");
171 0 :
172 0 : let valid = tenants_valid
173 0 : .get(&tenant_id)
174 0 : .copied()
175 0 : // If the tenant was missing from the validation response, it has been deleted.
176 0 : // The Timeline that requested the LSN update is probably already torn down,
177 0 : // or will be torn down soon. In this case, drop the update by setting valid=false.
178 0 : .unwrap_or(false);
179 0 :
180 0 : if valid && *validated_generation == tenant_lsn_state.generation {
181 0 : for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
182 0 : tracing::debug!(
183 : %tenant_id,
184 : %timeline_id,
185 0 : current = %pending_lsn.result_slot.load(),
186 0 : projected = %pending_lsn.projected,
187 0 : "advancing validated remote_consistent_lsn",
188 : );
189 0 : pending_lsn.result_slot.store(pending_lsn.projected);
190 : }
191 : } else {
192 : // If we failed validation, then do not apply any of the projected updates
193 0 : info!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}", tenant_lsn_state.generation);
194 0 : metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
195 : }
196 : }
197 :
198 : // Apply the validation results to the pending deletion lists
199 18 : for list in &mut self.pending_lists {
200 : // Filter the list based on whether the server responded valid: true.
201 : // If a tenant is omitted in the response, it has been deleted, and we should
202 : // proceed with deletion.
203 10 : let mut mutated = false;
204 10 : list.tenants.retain(|tenant_id, tenant| {
205 10 : let validated_generation = tenant_generations
206 10 : .get(tenant_id)
207 10 : .expect("Map was built from the same keys we're reading");
208 10 :
209 10 : // If the tenant was missing from the validation response, it has been deleted.
210 10 : // This means that a deletion is valid, but also redundant since the tenant's
211 10 : // objects should have already been deleted. Treat it as invalid to drop the
212 10 : // redundant deletion.
213 10 : let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
214 :
215 : // A list is valid if it comes from the current _or previous_ generation.
216 : // - The previous generation case is permitted due to how we store deletion lists locally:
217 : // if we see the immediately previous generation in a locally stored deletion list,
218 : // it proves that this node's disk was used for both current & previous generations,
219 : // and therefore no other node was involved in between: the two generations may be
220 : // logically treated as the same.
221 : // - In that previous generation case, we rewrote it to the current generation
222 : // in recover(), so the comparison here is simply an equality.
223 :
224 10 : let this_list_valid = valid
225 8 : && (tenant.generation == *validated_generation);
226 :
227 10 : if !this_list_valid {
228 4 : info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
229 4 : metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
230 4 : mutated = true;
231 6 : } else {
232 6 : metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
233 6 : }
234 10 : this_list_valid
235 10 : });
236 10 : list.validated = true;
237 10 :
238 10 : if mutated {
239 : // Save the deletion list if we had to make changes due to stale generations. The
240 : // saved list is valid for execution.
241 4 : if let Err(e) = list.save(self.conf).await {
242 : // Highly unexpected. Could happen if e.g. disk full.
243 : // If we didn't save the trimmed list, it is _not_ valid to execute.
244 0 : warn!("Failed to save modified deletion list {list}: {e:#}");
245 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
246 0 :
247 0 : // Rather than have a complex retry process, just drop it and leak the objects,
248 0 : // scrubber will clean up eventually.
249 0 : list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
250 0 :
251 0 : // We must remember this failure, to prevent later writing out a header that
252 0 : // would imply the unwritable list was valid on disk.
253 0 : if self.list_write_failed.is_none() {
254 0 : self.list_write_failed = Some(list.sequence);
255 0 : }
256 4 : }
257 6 : }
258 :
259 10 : validated_sequence = Some(list.sequence);
260 : }
261 :
262 8 : if let Some(validated_sequence) = validated_sequence {
263 8 : if let Some(list_write_failed) = self.list_write_failed {
264 : // Rare error case: we failed to write out a deletion list to excise invalid
265 : // entries, so we cannot advance the header's valid sequence number past that point.
266 : //
267 : // In this state we will continue to validate, execute and delete deletion lists,
268 : // we just cannot update the header. It should be noticed and fixed by a human due to
269 : // the nonzero value of our unexpected_errors metric.
270 0 : warn!(
271 : sequence_number = list_write_failed,
272 0 : "Cannot write header because writing a deletion list failed earlier",
273 : );
274 : } else {
275 : // Write the queue header to record how far validation progressed. This avoids having
276 : // to rewrite each DeletionList to set validated=true in it.
277 8 : let header = DeletionHeader::new(validated_sequence);
278 :
279 : // Drop result because the validated_sequence is an optimization. If we fail to save it,
280 : // then restart, we will drop some deletion lists, creating work for scrubber.
281 : // The save() function logs a warning on error.
282 8 : if let Err(e) = header.save(self.conf).await {
283 0 : warn!("Failed to write deletion queue header: {e:#}");
284 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
285 8 : }
286 : }
287 0 : }
288 :
289 : // Transfer the validated lists to the validated queue, for eventual execution
290 8 : self.validated_lists.append(&mut self.pending_lists);
291 8 :
292 8 : Ok(())
293 10 : }
294 :
295 8 : async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
296 18 : for list_path in list_paths {
297 10 : debug!("Removing deletion list {list_path}");
298 10 : tokio::fs::remove_file(&list_path)
299 10 : .await
300 10 : .fatal_err("remove deletion list");
301 : }
302 8 : }
303 :
304 10 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
305 10 : tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
306 :
307 : // Issue any required generation validation calls to the control plane
308 12 : self.validate().await?;
309 :
310 : // After successful validation, nothing is pending: any lists that
311 : // made it through validation will be in validated_lists.
312 10 : assert!(self.pending_lists.is_empty());
313 10 : self.pending_key_count = 0;
314 10 :
315 10 : tracing::debug!(
316 0 : "Validation complete, have {} validated lists",
317 0 : self.validated_lists.len()
318 : );
319 :
320 : // Return quickly if we have no validated lists to execute. This avoids flushing the
321 : // executor when an idle backend hits its autoflush interval
322 10 : if self.validated_lists.is_empty() {
323 2 : return Ok(());
324 8 : }
325 8 :
326 8 : // Drain `validated_lists` into the executor
327 8 : let mut executing_lists = Vec::new();
328 10 : for list in self.validated_lists.drain(..) {
329 10 : let list_path = self.conf.deletion_list_path(list.sequence);
330 10 : let objects = list.into_remote_paths();
331 10 : self.tx
332 10 : .send(DeleterMessage::Delete(objects))
333 0 : .await
334 10 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
335 10 : executing_lists.push(list_path);
336 : }
337 :
338 8 : self.flush_executor().await?;
339 :
340 : // Erase the deletion lists whose keys have all be deleted from remote storage
341 10 : self.cleanup_lists(executing_lists).await;
342 :
343 8 : Ok(())
344 10 : }
345 :
346 8 : async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
347 8 : // Flush the executor, so that all the keys referenced by these deletion lists
348 8 : // are actually removed from remote storage. This is a precondition to deleting
349 8 : // the deletion lists themselves.
350 8 : let (flush_op, rx) = FlushOp::new();
351 8 : self.tx
352 8 : .send(DeleterMessage::Flush(flush_op))
353 0 : .await
354 8 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
355 :
356 8 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
357 8 : }
358 :
359 8 : pub(super) async fn background(&mut self) {
360 8 : tracing::info!("Started deletion backend worker");
361 :
362 32 : while !self.cancel.is_cancelled() {
363 32 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
364 24 : Ok(Some(m)) => m,
365 : Ok(None) => {
366 : // All queue senders closed
367 2 : info!("Shutting down");
368 2 : break;
369 : }
370 : Err(_) => {
371 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
372 : // return immediately if no work is pending.
373 0 : match self.flush().await {
374 0 : Ok(()) => {}
375 0 : Err(DeletionQueueError::ShuttingDown) => {
376 0 : // If we are shutting down, then auto-flush can safely be skipped
377 0 : }
378 : }
379 :
380 0 : continue;
381 : }
382 : };
383 :
384 24 : match msg {
385 14 : ValidatorQueueMessage::Delete(list) => {
386 14 : if list.validated {
387 : // A pre-validated list may only be seen during recovery, if we are recovering
388 : // a DeletionList whose on-disk state has validated=true
389 0 : self.validated_lists.push(list)
390 14 : } else {
391 14 : self.pending_key_count += list.len();
392 14 : self.pending_lists.push(list);
393 14 : }
394 :
395 14 : if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
396 0 : match self.flush().await {
397 0 : Ok(()) => {}
398 0 : Err(DeletionQueueError::ShuttingDown) => {
399 0 : // If we are shutting down, then auto-flush can safely be skipped
400 0 : }
401 : }
402 14 : }
403 : }
404 10 : ValidatorQueueMessage::Flush(op) => {
405 30 : match self.flush().await {
406 10 : Ok(()) => {
407 10 : op.notify();
408 10 : }
409 0 : Err(DeletionQueueError::ShuttingDown) => {
410 0 : // If we fail due to shutting down, we will just drop `op` to propagate that status.
411 0 : }
412 : }
413 : }
414 : }
415 : }
416 2 : }
417 : }
|