TLA Line data Source code
1 : //! The validator is responsible for validating DeletionLists for execution,
2 : //! based on whethe the generation in the DeletionList is still the latest
3 : //! generation for a tenant.
4 : //!
5 : //! The purpose of validation is to ensure split-brain safety in the cluster
6 : //! of pageservers: a deletion may only be executed if the tenant generation
7 : //! that originated it is still current. See docs/rfcs/025-generation-numbers.md
8 : //! The purpose of accumulating lists before validating them is to reduce load
9 : //! on the control plane API by issuing fewer, larger requests.
10 : //!
11 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
12 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
13 : //! to decide when old
14 : //!
15 : //! Deletions are passed onward to the Deleter.
16 :
17 : use std::collections::HashMap;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 :
21 : use camino::Utf8PathBuf;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::debug;
24 : use tracing::info;
25 : use tracing::warn;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::control_plane_client::ControlPlaneGenerationsApi;
29 : use crate::control_plane_client::RetryForeverError;
30 : use crate::metrics;
31 :
32 : use super::deleter::DeleterMessage;
33 : use super::DeletionHeader;
34 : use super::DeletionList;
35 : use super::DeletionQueueError;
36 : use super::FlushOp;
37 : use super::VisibleLsnUpdates;
38 :
39 : // After this length of time, do any validation work that is pending,
40 : // even if we haven't accumulated many keys to delete.
41 : //
42 : // This also causes updates to remote_consistent_lsn to be validated, even
43 : // if there were no deletions enqueued.
44 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
45 :
46 : // If we have received this number of keys, proceed with attempting to execute
47 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
48 :
49 UBC 0 : #[derive(Debug)]
50 : pub(super) enum ValidatorQueueMessage {
51 : Delete(DeletionList),
52 : Flush(FlushOp),
53 : }
54 : pub(super) struct Validator<C>
55 : where
56 : C: ControlPlaneGenerationsApi,
57 : {
58 : conf: &'static PageServerConf,
59 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
60 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
61 :
62 : // Client for calling into control plane API for validation of deletes
63 : control_plane_client: Option<C>,
64 :
65 : // DeletionLists which are waiting generation validation. Not safe to
66 : // execute until [`validate`] has processed them.
67 : pending_lists: Vec<DeletionList>,
68 :
69 : // DeletionLists which have passed validation and are ready to execute.
70 : validated_lists: Vec<DeletionList>,
71 :
72 : // Sum of all the lengths of lists in pending_lists
73 : pending_key_count: usize,
74 :
75 : // Lsn validation state: we read projected LSNs and write back visible LSNs
76 : // after validation. This is the LSN equivalent of `pending_validation_lists`:
77 : // it is drained in [`validate`]
78 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
79 :
80 : // If we failed to rewrite a deletion list due to local filesystem I/O failure,
81 : // we must remember that and refuse to advance our persistent validated sequence
82 : // number past the failure.
83 : list_write_failed: Option<u64>,
84 :
85 : cancel: CancellationToken,
86 : }
87 :
88 : impl<C> Validator<C>
89 : where
90 : C: ControlPlaneGenerationsApi,
91 : {
92 CBC 564 : pub(super) fn new(
93 564 : conf: &'static PageServerConf,
94 564 : rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
95 564 : tx: tokio::sync::mpsc::Sender<DeleterMessage>,
96 564 : control_plane_client: Option<C>,
97 564 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
98 564 : cancel: CancellationToken,
99 564 : ) -> Self {
100 564 : Self {
101 564 : conf,
102 564 : rx,
103 564 : tx,
104 564 : control_plane_client,
105 564 : lsn_table,
106 564 : pending_lists: Vec::new(),
107 564 : validated_lists: Vec::new(),
108 564 : pending_key_count: 0,
109 564 : list_write_failed: None,
110 564 : cancel,
111 564 : }
112 564 : }
113 : /// Process any outstanding validations of generations of pending LSN updates or pending
114 : /// DeletionLists.
115 : ///
116 : /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
117 : /// go into the queue of ready-to-execute lists.
118 381 : async fn validate(&mut self) -> Result<(), DeletionQueueError> {
119 381 : let mut tenant_generations = HashMap::new();
120 395 : for list in &self.pending_lists {
121 28 : for (tenant_id, tenant_list) in &list.tenants {
122 14 : // Note: DeletionLists are in logical time order, so generation always
123 14 : // goes up. By doing a simple insert() we will always end up with
124 14 : // the latest generation seen for a tenant.
125 14 : tenant_generations.insert(*tenant_id, tenant_list.generation);
126 14 : }
127 : }
128 :
129 381 : let pending_lsn_updates = {
130 381 : let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
131 381 : std::mem::take(&mut *lsn_table)
132 : };
133 388 : for (tenant_id, update) in &pending_lsn_updates.tenants {
134 7 : let entry = tenant_generations
135 7 : .entry(*tenant_id)
136 7 : .or_insert(update.generation);
137 7 : if update.generation > *entry {
138 UBC 0 : *entry = update.generation;
139 CBC 7 : }
140 : }
141 :
142 381 : if tenant_generations.is_empty() {
143 : // No work to do
144 367 : return Ok(());
145 14 : }
146 :
147 14 : let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
148 14 : match control_plane_client
149 14 : .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
150 41 : .await
151 : {
152 13 : Ok(tenants) => tenants,
153 : Err(RetryForeverError::ShuttingDown) => {
154 : // The only way a validation call returns an error is when the cancellation token fires
155 1 : return Err(DeletionQueueError::ShuttingDown);
156 : }
157 : }
158 : } else {
159 : // Control plane API disabled. In legacy mode we consider everything valid.
160 UBC 0 : tenant_generations.keys().map(|k| (*k, true)).collect()
161 : };
162 :
163 CBC 13 : let mut validated_sequence: Option<u64> = None;
164 :
165 : // Apply the validation results to the pending LSN updates
166 19 : for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
167 6 : let validated_generation = tenant_generations
168 6 : .get(&tenant_id)
169 6 : .expect("Map was built from the same keys we're reading");
170 6 :
171 6 : let valid = tenants_valid
172 6 : .get(&tenant_id)
173 6 : .copied()
174 6 : // If the tenant was missing from the validation response, it has been deleted.
175 6 : // The Timeline that requested the LSN update is probably already torn down,
176 6 : // or will be torn down soon. In this case, drop the update by setting valid=false.
177 6 : .unwrap_or(false);
178 6 :
179 6 : if valid && *validated_generation == tenant_lsn_state.generation {
180 10 : for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines {
181 5 : pending_lsn.result_slot.store(pending_lsn.projected);
182 5 : }
183 : } else {
184 : // If we failed validation, then do not apply any of the projected updates
185 1 : warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}", tenant_lsn_state.generation);
186 1 : metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
187 : }
188 : }
189 :
190 : // Apply the validation results to the pending deletion lists
191 27 : for list in &mut self.pending_lists {
192 : // Filter the list based on whether the server responded valid: true.
193 : // If a tenant is omitted in the response, it has been deleted, and we should
194 : // proceed with deletion.
195 14 : let mut mutated = false;
196 14 : list.tenants.retain(|tenant_id, tenant| {
197 14 : let validated_generation = tenant_generations
198 14 : .get(tenant_id)
199 14 : .expect("Map was built from the same keys we're reading");
200 14 :
201 14 : // If the tenant was missing from the validation response, it has been deleted.
202 14 : // This means that a deletion is valid, but also redundant since the tenant's
203 14 : // objects should have already been deleted. Treat it as invalid to drop the
204 14 : // redundant deletion.
205 14 : let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
206 :
207 : // A list is valid if it comes from the current _or previous_ generation.
208 : // - The previous generation case is permitted due to how we store deletion lists locally:
209 : // if we see the immediately previous generation in a locally stored deletion list,
210 : // it proves that this node's disk was used for both current & previous generations,
211 : // and therefore no other node was involved in between: the two generations may be
212 : // logically treated as the same.
213 : // - In that previous generation case, we rewrote it to the current generation
214 : // in recover(), so the comparison here is simply an equality.
215 :
216 14 : let this_list_valid = valid
217 11 : && (tenant.generation == *validated_generation);
218 :
219 14 : if !this_list_valid {
220 4 : warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
221 4 : metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
222 4 : mutated = true;
223 10 : } else {
224 10 : metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
225 10 : }
226 14 : this_list_valid
227 14 : });
228 14 : list.validated = true;
229 14 :
230 14 : if mutated {
231 : // Save the deletion list if we had to make changes due to stale generations. The
232 : // saved list is valid for execution.
233 4 : if let Err(e) = list.save(self.conf).await {
234 : // Highly unexpected. Could happen if e.g. disk full.
235 : // If we didn't save the trimmed list, it is _not_ valid to execute.
236 UBC 0 : warn!("Failed to save modified deletion list {list}: {e:#}");
237 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
238 0 :
239 0 : // Rather than have a complex retry process, just drop it and leak the objects,
240 0 : // scrubber will clean up eventually.
241 0 : list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
242 0 :
243 0 : // We must remember this failure, to prevent later writing out a header that
244 0 : // would imply the unwritable list was valid on disk.
245 0 : if self.list_write_failed.is_none() {
246 0 : self.list_write_failed = Some(list.sequence);
247 0 : }
248 CBC 4 : }
249 10 : }
250 :
251 14 : validated_sequence = Some(list.sequence);
252 : }
253 :
254 13 : if let Some(validated_sequence) = validated_sequence {
255 13 : if let Some(list_write_failed) = self.list_write_failed {
256 : // Rare error case: we failed to write out a deletion list to excise invalid
257 : // entries, so we cannot advance the header's valid sequence number past that point.
258 : //
259 : // In this state we will continue to validate, execute and delete deletion lists,
260 : // we just cannot update the header. It should be noticed and fixed by a human due to
261 : // the nonzero value of our unexpected_errors metric.
262 UBC 0 : warn!(
263 0 : sequence_number = list_write_failed,
264 0 : "Cannot write header because writing a deletion list failed earlier",
265 0 : );
266 : } else {
267 : // Write the queue header to record how far validation progressed. This avoids having
268 : // to rewrite each DeletionList to set validated=true in it.
269 CBC 13 : let header = DeletionHeader::new(validated_sequence);
270 :
271 : // Drop result because the validated_sequence is an optimization. If we fail to save it,
272 : // then restart, we will drop some deletion lists, creating work for scrubber.
273 : // The save() function logs a warning on error.
274 13 : if let Err(e) = header.save(self.conf).await {
275 UBC 0 : warn!("Failed to write deletion queue header: {e:#}");
276 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
277 CBC 13 : }
278 : }
279 LBC (1) : }
280 :
281 : // Transfer the validated lists to the validated queue, for eventual execution
282 CBC 13 : self.validated_lists.append(&mut self.pending_lists);
283 13 :
284 13 : Ok(())
285 381 : }
286 :
287 13 : async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
288 27 : for list_path in list_paths {
289 UBC 0 : debug!("Removing deletion list {list_path}");
290 :
291 CBC 14 : if let Err(e) = tokio::fs::remove_file(&list_path).await {
292 : // Unexpected: we should have permissions and nothing else should
293 : // be touching these files. We will leave the file behind. Subsequent
294 : // pageservers will try and load it again: hopefully whatever storage
295 : // issue (probably permissions) has been fixed by then.
296 UBC 0 : tracing::error!("Failed to delete {list_path}: {e:#}");
297 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
298 0 : break;
299 CBC 14 : }
300 : }
301 13 : }
302 :
303 381 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
304 UBC 0 : tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
305 :
306 : // Issue any required generation validation calls to the control plane
307 CBC 381 : self.validate().await?;
308 :
309 : // After successful validation, nothing is pending: any lists that
310 : // made it through validation will be in validated_lists.
311 380 : assert!(self.pending_lists.is_empty());
312 380 : self.pending_key_count = 0;
313 :
314 UBC 0 : tracing::debug!(
315 0 : "Validation complete, have {} validated lists",
316 0 : self.validated_lists.len()
317 0 : );
318 :
319 : // Return quickly if we have no validated lists to execute. This avoids flushing the
320 : // executor when an idle backend hits its autoflush interval
321 CBC 380 : if self.validated_lists.is_empty() {
322 365 : return Ok(());
323 15 : }
324 15 :
325 15 : // Drain `validated_lists` into the executor
326 15 : let mut executing_lists = Vec::new();
327 16 : for list in self.validated_lists.drain(..) {
328 16 : let list_path = self.conf.deletion_list_path(list.sequence);
329 16 : let objects = list.into_remote_paths();
330 16 : self.tx
331 16 : .send(DeleterMessage::Delete(objects))
332 UBC 0 : .await
333 CBC 16 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
334 16 : executing_lists.push(list_path);
335 : }
336 :
337 15 : self.flush_executor().await?;
338 :
339 : // Erase the deletion lists whose keys have all be deleted from remote storage
340 14 : self.cleanup_lists(executing_lists).await;
341 :
342 13 : Ok(())
343 379 : }
344 :
345 15 : async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
346 15 : // Flush the executor, so that all the keys referenced by these deletion lists
347 15 : // are actually removed from remote storage. This is a precondition to deleting
348 15 : // the deletion lists themselves.
349 15 : let (flush_op, rx) = FlushOp::new();
350 15 : self.tx
351 15 : .send(DeleterMessage::Flush(flush_op))
352 UBC 0 : .await
353 CBC 15 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
354 :
355 15 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
356 13 : }
357 :
358 564 : pub(super) async fn background(&mut self) {
359 564 : tracing::info!("Started deletion backend worker");
360 :
361 966 : while !self.cancel.is_cancelled() {
362 965 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
363 77 : Ok(Some(m)) => m,
364 : Ok(None) => {
365 : // All queue senders closed
366 144 : info!("Shutting down");
367 144 : break;
368 : }
369 : Err(_) => {
370 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
371 : // return immediately if no work is pending.
372 327 : match self.flush().await {
373 324 : Ok(()) => {}
374 1 : Err(DeletionQueueError::ShuttingDown) => {
375 1 : // If we are shutting down, then auto-flush can safely be skipped
376 1 : }
377 : }
378 :
379 325 : continue;
380 : }
381 : };
382 :
383 77 : match msg {
384 23 : ValidatorQueueMessage::Delete(list) => {
385 23 : if list.validated {
386 : // A pre-validated list may only be seen during recovery, if we are recovering
387 : // a DeletionList whose on-disk state has validated=true
388 2 : self.validated_lists.push(list)
389 21 : } else {
390 21 : self.pending_key_count += list.len();
391 21 : self.pending_lists.push(list);
392 21 : }
393 :
394 23 : if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
395 UBC 0 : match self.flush().await {
396 0 : Ok(()) => {}
397 0 : Err(DeletionQueueError::ShuttingDown) => {
398 0 : // If we are shutting down, then auto-flush can safely be skipped
399 0 : }
400 : }
401 CBC 23 : }
402 : }
403 54 : ValidatorQueueMessage::Flush(op) => {
404 54 : match self.flush().await {
405 54 : Ok(()) => {
406 54 : op.notify();
407 54 : }
408 UBC 0 : Err(DeletionQueueError::ShuttingDown) => {
409 0 : // If we fail due to shutting down, we will just drop `op` to propagate that status.
410 0 : }
411 : }
412 : }
413 : }
414 : }
415 CBC 145 : }
416 : }
|