Line data Source code
1 : //! The validator is responsible for validating DeletionLists for execution,
2 : //! based on whethe the generation in the DeletionList is still the latest
3 : //! generation for a tenant.
4 : //!
5 : //! The purpose of validation is to ensure split-brain safety in the cluster
6 : //! of pageservers: a deletion may only be executed if the tenant generation
7 : //! that originated it is still current. See docs/rfcs/025-generation-numbers.md
8 : //! The purpose of accumulating lists before validating them is to reduce load
9 : //! on the control plane API by issuing fewer, larger requests.
10 : //!
11 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
12 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
13 : //! to decide when old
14 : //!
15 : //! Deletions are passed onward to the Deleter.
16 :
17 : use std::collections::HashMap;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 :
21 : use camino::Utf8PathBuf;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{debug, info, warn};
24 :
25 : use super::deleter::DeleterMessage;
26 : use super::{DeletionHeader, DeletionList, DeletionQueueError, FlushOp, VisibleLsnUpdates};
27 : use crate::config::PageServerConf;
28 : use crate::controller_upcall_client::{RetryForeverError, StorageControllerUpcallApi};
29 : use crate::metrics;
30 : use crate::virtual_file::MaybeFatalIo;
31 :
32 : // After this length of time, do any validation work that is pending,
33 : // even if we haven't accumulated many keys to delete.
34 : //
35 : // This also causes updates to remote_consistent_lsn to be validated, even
36 : // if there were no deletions enqueued.
37 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
38 :
39 : // If we have received this number of keys, proceed with attempting to execute
40 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
41 :
42 : #[derive(Debug)]
43 : pub(super) enum ValidatorQueueMessage {
44 : Delete(DeletionList),
45 : Flush(FlushOp),
46 : }
47 : pub(super) struct Validator<C>
48 : where
49 : C: StorageControllerUpcallApi,
50 : {
51 : conf: &'static PageServerConf,
52 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
53 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
54 :
55 : // Client for calling into control plane API for validation of deletes
56 : controller_upcall_client: C,
57 :
58 : // DeletionLists which are waiting generation validation. Not safe to
59 : // execute until [`validate`] has processed them.
60 : pending_lists: Vec<DeletionList>,
61 :
62 : // DeletionLists which have passed validation and are ready to execute.
63 : validated_lists: Vec<DeletionList>,
64 :
65 : // Sum of all the lengths of lists in pending_lists
66 : pending_key_count: usize,
67 :
68 : // Lsn validation state: we read projected LSNs and write back visible LSNs
69 : // after validation. This is the LSN equivalent of `pending_validation_lists`:
70 : // it is drained in [`validate`]
71 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
72 :
73 : // If we failed to rewrite a deletion list due to local filesystem I/O failure,
74 : // we must remember that and refuse to advance our persistent validated sequence
75 : // number past the failure.
76 : list_write_failed: Option<u64>,
77 :
78 : cancel: CancellationToken,
79 : }
80 :
81 : impl<C> Validator<C>
82 : where
83 : C: StorageControllerUpcallApi,
84 : {
85 48 : pub(super) fn new(
86 48 : conf: &'static PageServerConf,
87 48 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
88 48 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
89 48 : controller_upcall_client: C,
90 48 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
91 48 : cancel: CancellationToken,
92 48 : ) -> Self {
93 48 : Self {
94 48 : conf,
95 48 : rx,
96 48 : tx,
97 48 : controller_upcall_client,
98 48 : lsn_table,
99 48 : pending_lists: Vec::new(),
100 48 : validated_lists: Vec::new(),
101 48 : pending_key_count: 0,
102 48 : list_write_failed: None,
103 48 : cancel,
104 48 : }
105 48 : }
106 : /// Process any outstanding validations of generations of pending LSN updates or pending
107 : /// DeletionLists.
108 : ///
109 : /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
110 : /// go into the queue of ready-to-execute lists.
111 72 : async fn validate(&mut self) -> Result<(), DeletionQueueError> {
112 72 : let mut tenant_generations = HashMap::new();
113 132 : for list in &self.pending_lists {
114 120 : for (tenant_id, tenant_list) in &list.tenants {
115 60 : // Note: DeletionLists are in logical time order, so generation always
116 60 : // goes up. By doing a simple insert() we will always end up with
117 60 : // the latest generation seen for a tenant.
118 60 : tenant_generations.insert(*tenant_id, tenant_list.generation);
119 60 : }
120 : }
121 :
122 72 : let pending_lsn_updates = {
123 72 : let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
124 72 : std::mem::take(&mut *lsn_table)
125 : };
126 72 : for (tenant_id, update) in &pending_lsn_updates.tenants {
127 0 : let entry = tenant_generations
128 0 : .entry(*tenant_id)
129 0 : .or_insert(update.generation);
130 0 : if update.generation > *entry {
131 0 : *entry = update.generation;
132 0 : }
133 : }
134 :
135 72 : if tenant_generations.is_empty() {
136 : // No work to do
137 24 : return Ok(());
138 48 : }
139 :
140 48 : let tenants_valid = match self
141 48 : .controller_upcall_client
142 48 : .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
143 48 : .await
144 : {
145 48 : Ok(tenants) => tenants,
146 : Err(RetryForeverError::ShuttingDown) => {
147 : // The only way a validation call returns an error is when the cancellation token fires
148 0 : return Err(DeletionQueueError::ShuttingDown);
149 : }
150 : };
151 :
152 48 : let mut validated_sequence: Option<u64> = None;
153 :
154 : // Apply the validation results to the pending LSN updates
155 48 : for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
156 0 : let validated_generation = tenant_generations
157 0 : .get(&tenant_id)
158 0 : .expect("Map was built from the same keys we're reading");
159 0 :
160 0 : let valid = tenants_valid
161 0 : .get(&tenant_id)
162 0 : .copied()
163 0 : // If the tenant was missing from the validation response, it has been deleted.
164 0 : // The Timeline that requested the LSN update is probably already torn down,
165 0 : // or will be torn down soon. In this case, drop the update by setting valid=false.
166 0 : .unwrap_or(false);
167 0 :
168 0 : if valid && *validated_generation == tenant_lsn_state.generation {
169 0 : for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
170 0 : tracing::debug!(
171 : %tenant_id,
172 : %timeline_id,
173 0 : current = %pending_lsn.result_slot.load(),
174 0 : projected = %pending_lsn.projected,
175 0 : "advancing validated remote_consistent_lsn",
176 : );
177 0 : pending_lsn.result_slot.store(pending_lsn.projected);
178 : }
179 : } else {
180 : // If we failed validation, then do not apply any of the projected updates
181 0 : info!(
182 0 : "Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}",
183 : tenant_lsn_state.generation
184 : );
185 0 : metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
186 : }
187 : }
188 :
189 : // Apply the validation results to the pending deletion lists
190 108 : for list in &mut self.pending_lists {
191 : // Filter the list based on whether the server responded valid: true.
192 : // If a tenant is omitted in the response, it has been deleted, and we should
193 : // proceed with deletion.
194 60 : let mut mutated = false;
195 60 : list.tenants.retain(|tenant_id, tenant| {
196 60 : let validated_generation = tenant_generations
197 60 : .get(tenant_id)
198 60 : .expect("Map was built from the same keys we're reading");
199 60 :
200 60 : // If the tenant was missing from the validation response, it has been deleted.
201 60 : // This means that a deletion is valid, but also redundant since the tenant's
202 60 : // objects should have already been deleted. Treat it as invalid to drop the
203 60 : // redundant deletion.
204 60 : let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
205 :
206 : // A list is valid if it comes from the current _or previous_ generation.
207 : // - The previous generation case is permitted due to how we store deletion lists locally:
208 : // if we see the immediately previous generation in a locally stored deletion list,
209 : // it proves that this node's disk was used for both current & previous generations,
210 : // and therefore no other node was involved in between: the two generations may be
211 : // logically treated as the same.
212 : // - In that previous generation case, we rewrote it to the current generation
213 : // in recover(), so the comparison here is simply an equality.
214 :
215 60 : let this_list_valid = valid
216 48 : && (tenant.generation == *validated_generation);
217 :
218 60 : if !this_list_valid {
219 24 : info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
220 24 : metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
221 24 : mutated = true;
222 36 : } else {
223 36 : metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
224 36 : }
225 60 : this_list_valid
226 60 : });
227 60 : list.validated = true;
228 60 :
229 60 : if mutated {
230 : // Save the deletion list if we had to make changes due to stale generations. The
231 : // saved list is valid for execution.
232 24 : if let Err(e) = list.save(self.conf).await {
233 : // Highly unexpected. Could happen if e.g. disk full.
234 : // If we didn't save the trimmed list, it is _not_ valid to execute.
235 0 : warn!("Failed to save modified deletion list {list}: {e:#}");
236 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
237 0 :
238 0 : // Rather than have a complex retry process, just drop it and leak the objects,
239 0 : // scrubber will clean up eventually.
240 0 : list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
241 0 :
242 0 : // We must remember this failure, to prevent later writing out a header that
243 0 : // would imply the unwritable list was valid on disk.
244 0 : if self.list_write_failed.is_none() {
245 0 : self.list_write_failed = Some(list.sequence);
246 0 : }
247 24 : }
248 36 : }
249 :
250 60 : validated_sequence = Some(list.sequence);
251 : }
252 :
253 48 : if let Some(validated_sequence) = validated_sequence {
254 48 : if let Some(list_write_failed) = self.list_write_failed {
255 : // Rare error case: we failed to write out a deletion list to excise invalid
256 : // entries, so we cannot advance the header's valid sequence number past that point.
257 : //
258 : // In this state we will continue to validate, execute and delete deletion lists,
259 : // we just cannot update the header. It should be noticed and fixed by a human due to
260 : // the nonzero value of our unexpected_errors metric.
261 0 : warn!(
262 : sequence_number = list_write_failed,
263 0 : "Cannot write header because writing a deletion list failed earlier",
264 : );
265 : } else {
266 : // Write the queue header to record how far validation progressed. This avoids having
267 : // to rewrite each DeletionList to set validated=true in it.
268 48 : let header = DeletionHeader::new(validated_sequence);
269 :
270 : // Drop result because the validated_sequence is an optimization. If we fail to save it,
271 : // then restart, we will drop some deletion lists, creating work for scrubber.
272 : // The save() function logs a warning on error.
273 48 : if let Err(e) = header.save(self.conf).await {
274 0 : warn!("Failed to write deletion queue header: {e:#}");
275 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
276 48 : }
277 : }
278 0 : }
279 :
280 : // Transfer the validated lists to the validated queue, for eventual execution
281 48 : self.validated_lists.append(&mut self.pending_lists);
282 48 :
283 48 : Ok(())
284 72 : }
285 :
286 48 : async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
287 108 : for list_path in list_paths {
288 60 : debug!("Removing deletion list {list_path}");
289 60 : tokio::fs::remove_file(&list_path)
290 60 : .await
291 60 : .fatal_err("remove deletion list");
292 : }
293 48 : }
294 :
295 72 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
296 72 : tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
297 :
298 : // Issue any required generation validation calls to the control plane
299 72 : self.validate().await?;
300 :
301 : // After successful validation, nothing is pending: any lists that
302 : // made it through validation will be in validated_lists.
303 72 : assert!(self.pending_lists.is_empty());
304 72 : self.pending_key_count = 0;
305 72 :
306 72 : tracing::debug!(
307 0 : "Validation complete, have {} validated lists",
308 0 : self.validated_lists.len()
309 : );
310 :
311 : // Return quickly if we have no validated lists to execute. This avoids flushing the
312 : // executor when an idle backend hits its autoflush interval
313 72 : if self.validated_lists.is_empty() {
314 24 : return Ok(());
315 48 : }
316 48 :
317 48 : // Drain `validated_lists` into the executor
318 48 : let mut executing_lists = Vec::new();
319 60 : for list in self.validated_lists.drain(..) {
320 60 : let list_path = self.conf.deletion_list_path(list.sequence);
321 60 : let objects = list.into_remote_paths();
322 60 : self.tx
323 60 : .send(DeleterMessage::Delete(objects))
324 60 : .await
325 60 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
326 60 : executing_lists.push(list_path);
327 : }
328 :
329 48 : self.flush_executor().await?;
330 :
331 : // Erase the deletion lists whose keys have all be deleted from remote storage
332 48 : self.cleanup_lists(executing_lists).await;
333 :
334 48 : Ok(())
335 72 : }
336 :
337 48 : async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
338 48 : // Flush the executor, so that all the keys referenced by these deletion lists
339 48 : // are actually removed from remote storage. This is a precondition to deleting
340 48 : // the deletion lists themselves.
341 48 : let (flush_op, rx) = FlushOp::new();
342 48 : self.tx
343 48 : .send(DeleterMessage::Flush(flush_op))
344 48 : .await
345 48 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
346 :
347 48 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
348 48 : }
349 :
350 48 : pub(super) async fn background(&mut self) {
351 48 : tracing::info!("Started deletion backend worker");
352 :
353 204 : while !self.cancel.is_cancelled() {
354 204 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
355 144 : Ok(Some(m)) => m,
356 : Ok(None) => {
357 : // All queue senders closed
358 12 : info!("Shutting down");
359 12 : break;
360 : }
361 : Err(_) => {
362 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
363 : // return immediately if no work is pending.
364 12 : match self.flush().await {
365 12 : Ok(()) => {}
366 0 : Err(DeletionQueueError::ShuttingDown) => {
367 0 : // If we are shutting down, then auto-flush can safely be skipped
368 0 : }
369 : }
370 :
371 12 : continue;
372 : }
373 : };
374 :
375 144 : match msg {
376 84 : ValidatorQueueMessage::Delete(list) => {
377 84 : if list.validated {
378 : // A pre-validated list may only be seen during recovery, if we are recovering
379 : // a DeletionList whose on-disk state has validated=true
380 0 : self.validated_lists.push(list)
381 84 : } else {
382 84 : self.pending_key_count += list.len();
383 84 : self.pending_lists.push(list);
384 84 : }
385 :
386 84 : if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
387 0 : match self.flush().await {
388 0 : Ok(()) => {}
389 0 : Err(DeletionQueueError::ShuttingDown) => {
390 0 : // If we are shutting down, then auto-flush can safely be skipped
391 0 : }
392 : }
393 84 : }
394 : }
395 60 : ValidatorQueueMessage::Flush(op) => {
396 60 : match self.flush().await {
397 60 : Ok(()) => {
398 60 : op.notify();
399 60 : }
400 0 : Err(DeletionQueueError::ShuttingDown) => {
401 0 : // If we fail due to shutting down, we will just drop `op` to propagate that status.
402 0 : }
403 : }
404 : }
405 : }
406 : }
407 12 : }
408 : }
|