TLA Line data Source code
1 : mod deleter;
2 : mod list_writer;
3 : mod validator;
4 :
5 : use std::collections::HashMap;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use crate::control_plane_client::ControlPlaneGenerationsApi;
10 : use crate::metrics;
11 : use crate::tenant::remote_timeline_client::remote_layer_path;
12 : use crate::tenant::remote_timeline_client::remote_timeline_path;
13 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
14 : use crate::virtual_file::MaybeFatalIo;
15 : use crate::virtual_file::VirtualFile;
16 : use anyhow::Context;
17 : use camino::Utf8PathBuf;
18 : use pageserver_api::shard::TenantShardId;
19 : use remote_storage::{GenericRemoteStorage, RemotePath};
20 : use serde::Deserialize;
21 : use serde::Serialize;
22 : use thiserror::Error;
23 : use tokio;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::Instrument;
26 : use tracing::{self, debug, error};
27 : use utils::crashsafe::path_with_suffix_extension;
28 : use utils::generation::Generation;
29 : use utils::id::TimelineId;
30 : use utils::lsn::AtomicLsn;
31 : use utils::lsn::Lsn;
32 :
33 : use self::deleter::Deleter;
34 : use self::list_writer::DeletionOp;
35 : use self::list_writer::ListWriter;
36 : use self::list_writer::RecoverOp;
37 : use self::validator::Validator;
38 : use deleter::DeleterMessage;
39 : use list_writer::ListWriterQueueMessage;
40 : use validator::ValidatorQueueMessage;
41 :
42 : use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
43 :
44 : // TODO: configurable for how long to wait before executing deletions
45 :
46 : /// We aggregate object deletions from many tenants in one place, for several reasons:
47 : /// - Coalesce deletions into fewer DeleteObjects calls
48 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
49 : /// to flush any outstanding deletions.
50 : /// - Globally control throughput of deletions, as these are a low priority task: do
51 : /// not compete with the same S3 clients/connections used for higher priority uploads.
52 : /// - Enable gating deletions on validation of a tenant's generation number, to make
53 : /// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
54 : ///
55 : /// There are two kinds of deletion: deferred and immediate. A deferred deletion
56 : /// may be intentionally delayed to protect passive readers of S3 data, and is
57 : /// subject to a generation number validation step. An immediate deletion is
58 : /// ready to execute immediately, and is only queued up so that it can be coalesced
59 : /// with other deletions in flight.
60 : ///
61 : /// Deferred deletions pass through three steps:
62 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
63 : /// DeletionLists, which are persisted to disk.
64 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
65 : /// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn
66 : /// updates for running timelines.
67 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
68 : /// batches of 1000 keys via DeleteObjects.
69 : ///
70 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
71 : /// two stages and are passed straight into the Deleter.
72 : ///
73 : /// Internally, each stage is joined by a channel to the next. On disk, there is only
74 : /// one queue (of DeletionLists), which is written by the frontend and consumed
75 : /// by the backend.
76 CBC 159 : #[derive(Clone)]
77 : pub struct DeletionQueue {
78 : client: DeletionQueueClient,
79 :
80 : // Parent cancellation token for the tokens passed into background workers
81 : cancel: CancellationToken,
82 : }
83 :
84 : /// Opaque wrapper around individual worker tasks, to avoid making the
85 : /// worker objects themselves public
86 : pub struct DeletionQueueWorkers<C>
87 : where
88 : C: ControlPlaneGenerationsApi + Send + Sync,
89 : {
90 : frontend: ListWriter,
91 : backend: Validator<C>,
92 : executor: Deleter,
93 : }
94 :
95 : impl<C> DeletionQueueWorkers<C>
96 : where
97 : C: ControlPlaneGenerationsApi + Send + Sync + 'static,
98 : {
99 561 : pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
100 561 : let jh_frontend = runtime.spawn(async move {
101 561 : self.frontend
102 561 : .background()
103 561 : .instrument(tracing::info_span!(parent:None, "deletion frontend"))
104 3452 : .await
105 561 : });
106 561 : let jh_backend = runtime.spawn(async move {
107 561 : self.backend
108 561 : .background()
109 561 : .instrument(tracing::info_span!(parent:None, "deletion backend"))
110 1902 : .await
111 561 : });
112 561 : let jh_executor = runtime.spawn(async move {
113 561 : self.executor
114 561 : .background()
115 561 : .instrument(tracing::info_span!(parent:None, "deletion executor"))
116 10532 : .await
117 561 : });
118 561 :
119 561 : runtime.spawn({
120 561 : async move {
121 561 : jh_frontend.await.expect("error joining frontend worker");
122 160 : jh_backend.await.expect("error joining backend worker");
123 160 : drop(jh_executor.await.expect("error joining executor worker"));
124 561 : }
125 561 : })
126 561 : }
127 : }
128 :
129 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
130 : /// another channel, and the receive side will receive a message when the channel
131 : /// we're flushing has reached the FlushOp we sent into it.
132 : ///
133 : /// The only extra behavior beyond the channel is that the notify() method does not
134 : /// return an error when the receive side has been dropped, because in this use case
135 : /// it is harmless (the code that initiated the flush no longer cares about the result).
136 UBC 0 : #[derive(Debug)]
137 : struct FlushOp {
138 : tx: tokio::sync::oneshot::Sender<()>,
139 : }
140 :
141 : impl FlushOp {
142 CBC 721 : fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
143 721 : let (tx, rx) = tokio::sync::oneshot::channel::<()>();
144 721 : (Self { tx }, rx)
145 721 : }
146 :
147 719 : fn notify(self) {
148 719 : if self.tx.send(()).is_err() {
149 : // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
150 104 : debug!("deletion queue flush from dropped client");
151 615 : };
152 719 : }
153 : }
154 :
155 4635 : #[derive(Clone, Debug)]
156 : pub struct DeletionQueueClient {
157 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
158 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
159 :
160 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
161 : }
162 :
163 155 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
164 : struct TenantDeletionList {
165 : /// For each Timeline, a list of key fragments to append to the timeline remote path
166 : /// when reconstructing a full key
167 : timelines: HashMap<TimelineId, Vec<String>>,
168 :
169 : /// The generation in which this deletion was emitted: note that this may not be the
170 : /// same as the generation of any layers being deleted. The generation of the layer
171 : /// has already been absorbed into the keys in `objects`
172 : generation: Generation,
173 : }
174 :
175 : impl TenantDeletionList {
176 38 : pub(crate) fn len(&self) -> usize {
177 51 : self.timelines.values().map(|v| v.len()).sum()
178 38 : }
179 : }
180 :
181 : /// Files ending with this suffix will be ignored and erased
182 : /// during recovery as startup.
183 : const TEMP_SUFFIX: &str = "tmp";
184 :
185 252 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
186 : struct DeletionList {
187 : /// Serialization version, for future use
188 : version: u8,
189 :
190 : /// Used for constructing a unique key for each deletion list we write out.
191 : sequence: u64,
192 :
193 : /// To avoid repeating tenant/timeline IDs in every key, we store keys in
194 : /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
195 : /// with one unique generation ID: if someone tries to push a second generation
196 : /// ID for the same tenant, we will start a new DeletionList.
197 : tenants: HashMap<TenantShardId, TenantDeletionList>,
198 :
199 : /// Avoid having to walk `tenants` to calculate the number of keys in
200 : /// the nested deletion lists
201 : size: usize,
202 :
203 : /// Set to true when the list has undergone validation with the control
204 : /// plane and the remaining contents of `tenants` are valid. A list may
205 : /// also be implicitly marked valid by DeletionHeader.validated_sequence
206 : /// advancing to >= DeletionList.sequence
207 : #[serde(default)]
208 : #[serde(skip_serializing_if = "std::ops::Not::not")]
209 : validated: bool,
210 : }
211 :
212 115 : #[derive(Debug, Serialize, Deserialize)]
213 : struct DeletionHeader {
214 : /// Serialization version, for future use
215 : version: u8,
216 :
217 : /// The highest sequence number (inclusive) that has been validated. All deletion
218 : /// lists on disk with a sequence <= this value are safe to execute.
219 : validated_sequence: u64,
220 : }
221 :
222 : impl DeletionHeader {
223 : const VERSION_LATEST: u8 = 1;
224 :
225 37 : fn new(validated_sequence: u64) -> Self {
226 37 : Self {
227 37 : version: Self::VERSION_LATEST,
228 37 : validated_sequence,
229 37 : }
230 37 : }
231 :
232 37 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
233 UBC 0 : debug!("Saving deletion list header {:?}", self);
234 CBC 37 : let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
235 37 : let header_path = conf.deletion_header_path();
236 37 : let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
237 37 : VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
238 UBC 0 : .await
239 CBC 37 : .maybe_fatal_err("save deletion header")?;
240 :
241 37 : Ok(())
242 37 : }
243 : }
244 :
245 : impl DeletionList {
246 : const VERSION_LATEST: u8 = 1;
247 611 : fn new(sequence: u64) -> Self {
248 611 : Self {
249 611 : version: Self::VERSION_LATEST,
250 611 : sequence,
251 611 : tenants: HashMap::new(),
252 611 : size: 0,
253 611 : validated: false,
254 611 : }
255 611 : }
256 :
257 707 : fn is_empty(&self) -> bool {
258 707 : self.tenants.is_empty()
259 707 : }
260 :
261 4115 : fn len(&self) -> usize {
262 4115 : self.size
263 4115 : }
264 :
265 : /// Returns true if the push was accepted, false if the caller must start a new
266 : /// deletion list.
267 3152 : fn push(
268 3152 : &mut self,
269 3152 : tenant: &TenantShardId,
270 3152 : timeline: &TimelineId,
271 3152 : generation: Generation,
272 3152 : objects: &mut Vec<RemotePath>,
273 3152 : ) -> bool {
274 3152 : if objects.is_empty() {
275 : // Avoid inserting an empty TimelineDeletionList: this preserves the property
276 : // that if we have no keys, then self.objects is empty (used in Self::is_empty)
277 353 : return true;
278 2799 : }
279 2799 :
280 2799 : let tenant_entry = self
281 2799 : .tenants
282 2799 : .entry(*tenant)
283 2799 : .or_insert_with(|| TenantDeletionList {
284 92 : timelines: HashMap::new(),
285 92 : generation,
286 2799 : });
287 2799 :
288 2799 : if tenant_entry.generation != generation {
289 : // Only one generation per tenant per list: signal to
290 : // caller to start a new list.
291 2 : return false;
292 2797 : }
293 2797 :
294 2797 : let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
295 2797 :
296 2797 : let timeline_remote_path = remote_timeline_path(tenant, timeline);
297 2797 :
298 2797 : self.size += objects.len();
299 2797 : timeline_entry.extend(objects.drain(..).map(|p| {
300 2797 : p.strip_prefix(&timeline_remote_path)
301 2797 : .expect("Timeline paths always start with the timeline prefix")
302 2797 : .to_string()
303 2797 : }));
304 2797 : true
305 3152 : }
306 :
307 40 : fn into_remote_paths(self) -> Vec<RemotePath> {
308 40 : let mut result = Vec::new();
309 40 : for (tenant, tenant_deletions) in self.tenants.into_iter() {
310 41 : for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
311 41 : let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
312 41 : result.extend(
313 41 : timeline_layers
314 41 : .into_iter()
315 819 : .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
316 41 : );
317 41 : }
318 : }
319 :
320 40 : result
321 40 : }
322 :
323 61 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
324 61 : let path = conf.deletion_list_path(self.sequence);
325 61 : let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
326 61 :
327 61 : let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
328 61 : VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
329 UBC 0 : .await
330 CBC 61 : .maybe_fatal_err("save deletion list")
331 61 : .map_err(Into::into)
332 61 : }
333 : }
334 :
335 : impl std::fmt::Display for DeletionList {
336 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 0 : write!(
338 0 : f,
339 0 : "DeletionList<seq={}, tenants={}, keys={}>",
340 0 : self.sequence,
341 0 : self.tenants.len(),
342 0 : self.size
343 0 : )
344 0 : }
345 : }
346 :
347 : struct PendingLsn {
348 : projected: Lsn,
349 : result_slot: Arc<AtomicLsn>,
350 : }
351 :
352 : struct TenantLsnState {
353 : timelines: HashMap<TimelineId, PendingLsn>,
354 :
355 : // In what generation was the most recent update proposed?
356 : generation: Generation,
357 : }
358 :
359 CBC 659 : #[derive(Default)]
360 : struct VisibleLsnUpdates {
361 : tenants: HashMap<TenantShardId, TenantLsnState>,
362 : }
363 :
364 : impl VisibleLsnUpdates {
365 602 : fn new() -> Self {
366 602 : Self {
367 602 : tenants: HashMap::new(),
368 602 : }
369 602 : }
370 : }
371 :
372 : impl std::fmt::Debug for VisibleLsnUpdates {
373 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374 0 : write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
375 0 : }
376 : }
377 :
378 0 : #[derive(Error, Debug)]
379 : pub enum DeletionQueueError {
380 : #[error("Deletion queue unavailable during shutdown")]
381 : ShuttingDown,
382 : }
383 :
384 : impl DeletionQueueClient {
385 0 : pub(crate) fn broken() -> Self {
386 0 : // Channels whose receivers are immediately dropped.
387 0 : let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
388 0 : let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
389 0 : Self {
390 0 : tx,
391 0 : executor_tx,
392 0 : lsn_table: Arc::default(),
393 0 : }
394 0 : }
395 :
396 : /// This is cancel-safe. If you drop the future before it completes, the message
397 : /// is not pushed, although in the context of the deletion queue it doesn't matter: once
398 : /// we decide to do a deletion the decision is always final.
399 CBC 3994 : fn do_push<T>(
400 3994 : &self,
401 3994 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
402 3994 : msg: T,
403 3994 : ) -> Result<(), DeletionQueueError> {
404 3994 : match queue.send(msg) {
405 3994 : Ok(_) => Ok(()),
406 UBC 0 : Err(e) => {
407 0 : // This shouldn't happen, we should shut down all tenants before
408 0 : // we shut down the global delete queue. If we encounter a bug like this,
409 0 : // we may leak objects as deletions won't be processed.
410 0 : error!("Deletion queue closed while pushing, shutting down? ({e})");
411 0 : Err(DeletionQueueError::ShuttingDown)
412 : }
413 : }
414 CBC 3994 : }
415 :
416 560 : pub(crate) fn recover(
417 560 : &self,
418 560 : attached_tenants: HashMap<TenantShardId, Generation>,
419 560 : ) -> Result<(), DeletionQueueError> {
420 560 : self.do_push(
421 560 : &self.tx,
422 560 : ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
423 560 : )
424 560 : }
425 :
426 : /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
427 : /// world, it must validate its generation number before doing so. Rather than do this synchronously,
428 : /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
429 : /// recently validated separately.
430 : ///
431 : /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The
432 : /// backend will later wake up and notice that the tenant's generation requires validation.
433 5095 : pub(crate) async fn update_remote_consistent_lsn(
434 5095 : &self,
435 5095 : tenant_shard_id: TenantShardId,
436 5095 : timeline_id: TimelineId,
437 5095 : current_generation: Generation,
438 5095 : lsn: Lsn,
439 5095 : result_slot: Arc<AtomicLsn>,
440 5095 : ) {
441 5095 : let mut locked = self
442 5095 : .lsn_table
443 5095 : .write()
444 5095 : .expect("Lock should never be poisoned");
445 5095 :
446 5095 : let tenant_entry = locked
447 5095 : .tenants
448 5095 : .entry(tenant_shard_id)
449 5095 : .or_insert(TenantLsnState {
450 5095 : timelines: HashMap::new(),
451 5095 : generation: current_generation,
452 5095 : });
453 5095 :
454 5095 : if tenant_entry.generation != current_generation {
455 2 : // Generation might have changed if we were detached and then re-attached: in this case,
456 2 : // state from the previous generation cannot be trusted.
457 2 : tenant_entry.timelines.clear();
458 2 : tenant_entry.generation = current_generation;
459 5093 : }
460 :
461 5095 : tenant_entry.timelines.insert(
462 5095 : timeline_id,
463 5095 : PendingLsn {
464 5095 : projected: lsn,
465 5095 : result_slot,
466 5095 : },
467 5095 : );
468 5095 : }
469 :
470 : /// Submit a list of layers for deletion: this function will return before the deletion is
471 : /// persistent, but it may be executed at any time after this function enters: do not push
472 : /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
473 : /// references them).
474 : ///
475 : /// The `current_generation` is the generation of this pageserver's current attachment. The
476 : /// generations in `layers` are the generations in which those layers were written.
477 3255 : pub(crate) async fn push_layers(
478 3255 : &self,
479 3255 : tenant_shard_id: TenantShardId,
480 3255 : timeline_id: TimelineId,
481 3255 : current_generation: Generation,
482 3255 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
483 3255 : ) -> Result<(), DeletionQueueError> {
484 3255 : if current_generation.is_none() {
485 UBC 0 : debug!("Enqueuing deletions in legacy mode, skipping queue");
486 :
487 CBC 24 : let mut layer_paths = Vec::new();
488 48 : for (layer, meta) in layers {
489 24 : layer_paths.push(remote_layer_path(
490 24 : &tenant_shard_id.tenant_id,
491 24 : &timeline_id,
492 24 : meta.shard,
493 24 : &layer,
494 24 : meta.generation,
495 24 : ));
496 24 : }
497 24 : self.push_immediate(layer_paths).await?;
498 33 : return self.flush_immediate().await;
499 3231 : }
500 3231 :
501 3231 : self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
502 3255 : }
503 :
504 : /// When a Tenant has a generation, push_layers is always synchronous because
505 : /// the ListValidator channel is an unbounded channel.
506 : ///
507 : /// This can be merged into push_layers when we remove the Generation-less mode
508 : /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
509 3231 : pub(crate) fn push_layers_sync(
510 3231 : &self,
511 3231 : tenant_shard_id: TenantShardId,
512 3231 : timeline_id: TimelineId,
513 3231 : current_generation: Generation,
514 3231 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
515 3231 : ) -> Result<(), DeletionQueueError> {
516 3231 : metrics::DELETION_QUEUE
517 3231 : .keys_submitted
518 3231 : .inc_by(layers.len() as u64);
519 3231 : self.do_push(
520 3231 : &self.tx,
521 3231 : ListWriterQueueMessage::Delete(DeletionOp {
522 3231 : tenant_shard_id,
523 3231 : timeline_id,
524 3231 : layers,
525 3231 : generation: current_generation,
526 3231 : objects: Vec::new(),
527 3231 : }),
528 3231 : )
529 3231 : }
530 :
531 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
532 203 : async fn do_flush<T>(
533 203 : &self,
534 203 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
535 203 : msg: T,
536 203 : rx: tokio::sync::oneshot::Receiver<()>,
537 203 : ) -> Result<(), DeletionQueueError> {
538 203 : self.do_push(queue, msg)?;
539 205 : if rx.await.is_err() {
540 : // This shouldn't happen if tenants are shut down before deletion queue. If we
541 : // encounter a bug like this, then a flusher will incorrectly believe it has flushed
542 : // when it hasn't, possibly leading to leaking objects.
543 UBC 0 : error!("Deletion queue dropped flush op while client was still waiting");
544 0 : Err(DeletionQueueError::ShuttingDown)
545 : } else {
546 CBC 203 : Ok(())
547 : }
548 203 : }
549 :
550 : /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
551 : ///
552 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
553 186 : pub async fn flush(&self) -> Result<(), DeletionQueueError> {
554 186 : let (flush_op, rx) = FlushOp::new();
555 186 : self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
556 188 : .await
557 186 : }
558 :
559 : /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
560 : /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
561 : /// detach where flushing is nice but not necessary.
562 : ///
563 : /// This function provides no guarantees of work being done.
564 104 : pub fn flush_advisory(&self) {
565 104 : let (flush_op, _) = FlushOp::new();
566 104 :
567 104 : // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
568 104 : drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
569 104 : }
570 :
571 : // Wait until all previous deletions are executed
572 17 : pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
573 UBC 0 : debug!("flush_execute: flushing to deletion lists...");
574 : // Flush any buffered work to deletion lists
575 CBC 17 : self.flush().await?;
576 :
577 : // Flush the backend into the executor of deletion lists
578 17 : let (flush_op, rx) = FlushOp::new();
579 UBC 0 : debug!("flush_execute: flushing backend...");
580 CBC 17 : self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
581 17 : .await?;
582 UBC 0 : debug!("flush_execute: finished flushing backend...");
583 :
584 : // Flush any immediate-mode deletions (the above backend flush will only flush
585 : // the executor if deletions had flowed through the backend)
586 0 : debug!("flush_execute: flushing execution...");
587 CBC 17 : self.flush_immediate().await?;
588 UBC 0 : debug!("flush_execute: finished flushing execution...");
589 CBC 17 : Ok(())
590 17 : }
591 :
592 : /// This interface bypasses the persistent deletion queue, and any validation
593 : /// that this pageserver is still elegible to execute the deletions. It is for
594 : /// use in timeline deletions, where the control plane is telling us we may
595 : /// delete everything in the timeline.
596 : ///
597 : /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
598 456 : pub(crate) async fn push_immediate(
599 456 : &self,
600 456 : objects: Vec<RemotePath>,
601 456 : ) -> Result<(), DeletionQueueError> {
602 456 : metrics::DELETION_QUEUE
603 456 : .keys_submitted
604 456 : .inc_by(objects.len() as u64);
605 456 : self.executor_tx
606 456 : .send(DeleterMessage::Delete(objects))
607 9 : .await
608 456 : .map_err(|_| DeletionQueueError::ShuttingDown)
609 456 : }
610 :
611 : /// Companion to push_immediate. When this returns Ok, all prior objects sent
612 : /// into push_immediate have been deleted from remote storage.
613 375 : pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
614 375 : let (flush_op, rx) = FlushOp::new();
615 375 : self.executor_tx
616 375 : .send(DeleterMessage::Flush(flush_op))
617 9 : .await
618 375 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
619 :
620 375 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
621 375 : }
622 : }
623 :
624 : impl DeletionQueue {
625 1118 : pub fn new_client(&self) -> DeletionQueueClient {
626 1118 : self.client.clone()
627 1118 : }
628 :
629 : /// Caller may use the returned object to construct clients with new_client.
630 : /// Caller should tokio::spawn the background() members of the two worker objects returned:
631 : /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
632 : ///
633 : /// If remote_storage is None, then the returned workers will also be None.
634 561 : pub fn new<C>(
635 561 : remote_storage: Option<GenericRemoteStorage>,
636 561 : control_plane_client: Option<C>,
637 561 : conf: &'static PageServerConf,
638 561 : ) -> (Self, Option<DeletionQueueWorkers<C>>)
639 561 : where
640 561 : C: ControlPlaneGenerationsApi + Send + Sync,
641 561 : {
642 561 : // Unbounded channel: enables non-async functions to submit deletions. The actual length is
643 561 : // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
644 561 : // enough to avoid this taking pathologically large amount of memory.
645 561 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
646 561 :
647 561 : // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
648 561 : let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
649 561 :
650 561 : // Shallow channel: it carries lists of paths, and we expect the main queueing to
651 561 : // happen in the backend (persistent), not in this queue.
652 561 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
653 561 :
654 561 : let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
655 561 :
656 561 : // The deletion queue has an independent cancellation token to
657 561 : // the general pageserver shutdown token, because it stays alive a bit
658 561 : // longer to flush after Tenants have all been torn down.
659 561 : let cancel = CancellationToken::new();
660 :
661 561 : let remote_storage = match remote_storage {
662 : None => {
663 UBC 0 : return (
664 0 : Self {
665 0 : client: DeletionQueueClient {
666 0 : tx,
667 0 : executor_tx,
668 0 : lsn_table: lsn_table.clone(),
669 0 : },
670 0 : cancel,
671 0 : },
672 0 : None,
673 0 : )
674 : }
675 CBC 561 : Some(r) => r,
676 561 : };
677 561 :
678 561 : (
679 561 : Self {
680 561 : client: DeletionQueueClient {
681 561 : tx,
682 561 : executor_tx: executor_tx.clone(),
683 561 : lsn_table: lsn_table.clone(),
684 561 : },
685 561 : cancel: cancel.clone(),
686 561 : },
687 561 : Some(DeletionQueueWorkers {
688 561 : frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
689 561 : backend: Validator::new(
690 561 : conf,
691 561 : backend_rx,
692 561 : executor_tx,
693 561 : control_plane_client,
694 561 : lsn_table.clone(),
695 561 : cancel.clone(),
696 561 : ),
697 561 : executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
698 561 : }),
699 561 : )
700 561 : }
701 :
702 159 : pub async fn shutdown(&mut self, timeout: Duration) {
703 159 : self.cancel.cancel();
704 159 :
705 161 : match tokio::time::timeout(timeout, self.client.flush()).await {
706 : Ok(Ok(())) => {
707 159 : tracing::info!("Deletion queue flushed successfully on shutdown")
708 : }
709 : Ok(Err(DeletionQueueError::ShuttingDown)) => {
710 : // This is not harmful for correctness, but is unexpected: the deletion
711 : // queue's workers should stay alive as long as there are any client handles instantiated.
712 UBC 0 : tracing::warn!("Deletion queue stopped prematurely");
713 : }
714 0 : Err(_timeout) => {
715 0 : tracing::warn!("Timed out flushing deletion queue on shutdown")
716 : }
717 : }
718 CBC 159 : }
719 : }
720 :
721 : #[cfg(test)]
722 : mod test {
723 : use camino::Utf8Path;
724 : use hex_literal::hex;
725 : use pageserver_api::shard::ShardIndex;
726 : use std::{io::ErrorKind, time::Duration};
727 : use tracing::info;
728 :
729 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
730 : use tokio::task::JoinHandle;
731 :
732 : use crate::{
733 : control_plane_client::RetryForeverError,
734 : repository::Key,
735 : tenant::{
736 : harness::TenantHarness, remote_timeline_client::remote_timeline_path,
737 : storage_layer::DeltaFileName,
738 : },
739 : };
740 :
741 : use super::*;
742 : pub const TIMELINE_ID: TimelineId =
743 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
744 :
745 : pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName {
746 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
747 : lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
748 : });
749 :
750 : // When you need a second layer in a test.
751 : pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName {
752 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
753 : lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
754 : });
755 :
756 : struct TestSetup {
757 : harness: TenantHarness,
758 : remote_fs_dir: Utf8PathBuf,
759 : storage: GenericRemoteStorage,
760 : mock_control_plane: MockControlPlane,
761 : deletion_queue: DeletionQueue,
762 : worker_join: JoinHandle<()>,
763 : }
764 :
765 : impl TestSetup {
766 : /// Simulate a pageserver restart by destroying and recreating the deletion queue
767 1 : async fn restart(&mut self) {
768 1 : let (deletion_queue, workers) = DeletionQueue::new(
769 1 : Some(self.storage.clone()),
770 1 : Some(self.mock_control_plane.clone()),
771 1 : self.harness.conf,
772 1 : );
773 :
774 UBC 0 : tracing::debug!("Spawning worker for new queue queue");
775 CBC 1 : let worker_join = workers
776 1 : .unwrap()
777 1 : .spawn_with(&tokio::runtime::Handle::current());
778 1 :
779 1 : let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
780 1 : let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
781 :
782 UBC 0 : tracing::debug!("Joining worker from previous queue");
783 CBC 1 : old_deletion_queue.cancel.cancel();
784 1 : old_worker_join
785 1 : .await
786 1 : .expect("Failed to join workers for previous deletion queue");
787 1 : }
788 :
789 3 : fn set_latest_generation(&self, gen: Generation) {
790 3 : let tenant_shard_id = self.harness.tenant_shard_id;
791 3 : self.mock_control_plane
792 3 : .latest_generation
793 3 : .lock()
794 3 : .unwrap()
795 3 : .insert(tenant_shard_id, gen);
796 3 : }
797 :
798 : /// Returns remote layer file name, suitable for use in assert_remote_files
799 3 : fn write_remote_layer(
800 3 : &self,
801 3 : file_name: LayerFileName,
802 3 : gen: Generation,
803 3 : ) -> anyhow::Result<String> {
804 3 : let tenant_shard_id = self.harness.tenant_shard_id;
805 3 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
806 3 : let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
807 3 : std::fs::create_dir_all(&remote_timeline_path)?;
808 3 : let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
809 3 :
810 3 : let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
811 3 :
812 3 : std::fs::write(
813 3 : remote_timeline_path.join(remote_layer_file_name.clone()),
814 3 : content,
815 3 : )?;
816 :
817 3 : Ok(remote_layer_file_name)
818 3 : }
819 : }
820 :
821 4 : #[derive(Debug, Clone)]
822 : struct MockControlPlane {
823 : pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
824 : }
825 :
826 : impl MockControlPlane {
827 3 : fn new() -> Self {
828 3 : Self {
829 3 : latest_generation: Arc::default(),
830 3 : }
831 3 : }
832 : }
833 :
834 : #[async_trait::async_trait]
835 : impl ControlPlaneGenerationsApi for MockControlPlane {
836 : #[allow(clippy::diverging_sub_expression)] // False positive via async_trait
837 UBC 0 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
838 0 : unimplemented!()
839 0 : }
840 CBC 4 : async fn validate(
841 4 : &self,
842 4 : tenants: Vec<(TenantShardId, Generation)>,
843 4 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
844 4 : let mut result = HashMap::new();
845 4 :
846 4 : let latest_generation = self.latest_generation.lock().unwrap();
847 :
848 8 : for (tenant_shard_id, generation) in tenants {
849 4 : if let Some(latest) = latest_generation.get(&tenant_shard_id) {
850 4 : result.insert(tenant_shard_id, *latest == generation);
851 4 : }
852 : }
853 :
854 4 : Ok(result)
855 8 : }
856 : }
857 :
858 3 : fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
859 3 : let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
860 3 : let harness = TenantHarness::create(test_name)?;
861 :
862 : // We do not load() the harness: we only need its config and remote_storage
863 :
864 : // Set up a GenericRemoteStorage targetting a directory
865 3 : let remote_fs_dir = harness.conf.workdir.join("remote_fs");
866 3 : std::fs::create_dir_all(remote_fs_dir)?;
867 3 : let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
868 3 : let storage_config = RemoteStorageConfig {
869 3 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
870 3 : };
871 3 : let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
872 3 :
873 3 : let mock_control_plane = MockControlPlane::new();
874 3 :
875 3 : let (deletion_queue, worker) = DeletionQueue::new(
876 3 : Some(storage.clone()),
877 3 : Some(mock_control_plane.clone()),
878 3 : harness.conf,
879 3 : );
880 3 :
881 3 : let worker = worker.unwrap();
882 3 : let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
883 3 :
884 3 : Ok(TestSetup {
885 3 : harness,
886 3 : remote_fs_dir,
887 3 : storage,
888 3 : mock_control_plane,
889 3 : deletion_queue,
890 3 : worker_join,
891 3 : })
892 3 : }
893 :
894 : // TODO: put this in a common location so that we can share with remote_timeline_client's tests
895 9 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
896 9 : let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
897 9 : expected.sort();
898 9 :
899 9 : let mut found: Vec<String> = Vec::new();
900 9 : let dir = match std::fs::read_dir(remote_path) {
901 9 : Ok(d) => d,
902 UBC 0 : Err(e) => {
903 0 : if e.kind() == ErrorKind::NotFound {
904 0 : if expected.is_empty() {
905 : // We are asserting prefix is empty: it is expected that the dir is missing
906 0 : return;
907 : } else {
908 0 : assert_eq!(expected, Vec::<String>::new());
909 0 : unreachable!();
910 : }
911 : } else {
912 0 : panic!("Unexpected error listing {remote_path}: {e}");
913 : }
914 : }
915 : };
916 :
917 CBC 9 : for entry in dir.flatten() {
918 8 : let entry_name = entry.file_name();
919 8 : let fname = entry_name.to_str().unwrap();
920 8 : found.push(String::from(fname));
921 8 : }
922 9 : found.sort();
923 9 :
924 9 : assert_eq!(expected, found);
925 9 : }
926 :
927 5 : fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
928 5 : let dir = match std::fs::read_dir(directory) {
929 4 : Ok(d) => d,
930 : Err(_) => {
931 1 : assert_eq!(expected, &Vec::<String>::new());
932 1 : return;
933 : }
934 : };
935 4 : let mut found = Vec::new();
936 9 : for dentry in dir {
937 5 : let dentry = dentry.unwrap();
938 5 : let file_name = dentry.file_name();
939 5 : let file_name_str = file_name.to_string_lossy();
940 5 : found.push(file_name_str.to_string());
941 5 : }
942 4 : found.sort();
943 4 : assert_eq!(expected, found);
944 5 : }
945 :
946 1 : #[tokio::test]
947 1 : async fn deletion_queue_smoke() -> anyhow::Result<()> {
948 1 : // Basic test that the deletion queue processes the deletions we pass into it
949 1 : let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
950 1 : let client = ctx.deletion_queue.new_client();
951 1 : client.recover(HashMap::new())?;
952 :
953 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
954 1 : let tenant_shard_id = ctx.harness.tenant_shard_id;
955 1 :
956 1 : let content: Vec<u8> = "victim1 contents".into();
957 1 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
958 1 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
959 1 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
960 1 :
961 1 : // Exercise the distinction between the generation of the layers
962 1 : // we delete, and the generation of the running Tenant.
963 1 : let layer_generation = Generation::new(0xdeadbeef);
964 1 : let now_generation = Generation::new(0xfeedbeef);
965 1 : let layer_metadata =
966 1 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
967 1 :
968 1 : let remote_layer_file_name_1 =
969 1 : format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
970 1 :
971 1 : // Set mock control plane state to valid for our generation
972 1 : ctx.set_latest_generation(now_generation);
973 1 :
974 1 : // Inject a victim file to remote storage
975 1 : info!("Writing");
976 1 : std::fs::create_dir_all(&remote_timeline_path)?;
977 1 : std::fs::write(
978 1 : remote_timeline_path.join(remote_layer_file_name_1.clone()),
979 1 : content,
980 1 : )?;
981 1 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
982 1 :
983 1 : // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
984 1 : info!("Pushing");
985 1 : client
986 1 : .push_layers(
987 1 : tenant_shard_id,
988 1 : TIMELINE_ID,
989 1 : now_generation,
990 1 : [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
991 1 : )
992 UBC 0 : .await?;
993 CBC 1 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
994 1 :
995 1 : assert_local_files(&[], &deletion_prefix);
996 1 :
997 1 : // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
998 1 : info!("Flushing");
999 1 : client.flush().await?;
1000 1 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
1001 1 : assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
1002 1 :
1003 1 : // File should go away when we execute
1004 1 : info!("Flush-executing");
1005 3 : client.flush_execute().await?;
1006 1 : assert_remote_files(&[], &remote_timeline_path);
1007 1 : assert_local_files(&["header-01"], &deletion_prefix);
1008 1 :
1009 1 : // Flushing on an empty queue should succeed immediately, and not write any lists
1010 1 : info!("Flush-executing on empty");
1011 3 : client.flush_execute().await?;
1012 1 : assert_local_files(&["header-01"], &deletion_prefix);
1013 1 :
1014 1 : Ok(())
1015 : }
1016 :
1017 1 : #[tokio::test]
1018 1 : async fn deletion_queue_validation() -> anyhow::Result<()> {
1019 1 : let ctx = setup("deletion_queue_validation").expect("Failed test setup");
1020 1 : let client = ctx.deletion_queue.new_client();
1021 1 : client.recover(HashMap::new())?;
1022 :
1023 : // Generation that the control plane thinks is current
1024 1 : let latest_generation = Generation::new(0xdeadbeef);
1025 1 : // Generation that our DeletionQueue thinks the tenant is running with
1026 1 : let stale_generation = latest_generation.previous();
1027 1 : // Generation that our example layer file was written with
1028 1 : let layer_generation = stale_generation.previous();
1029 1 : let layer_metadata =
1030 1 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1031 1 :
1032 1 : ctx.set_latest_generation(latest_generation);
1033 1 :
1034 1 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1035 1 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1036 1 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1037 :
1038 : // Initial state: a remote layer exists
1039 1 : let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1040 1 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1041 1 :
1042 1 : tracing::debug!("Pushing...");
1043 1 : client
1044 1 : .push_layers(
1045 1 : tenant_shard_id,
1046 1 : TIMELINE_ID,
1047 1 : stale_generation,
1048 1 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1049 1 : )
1050 UBC 0 : .await?;
1051 :
1052 : // We enqueued the operation in a stale generation: it should have failed validation
1053 CBC 1 : tracing::debug!("Flushing...");
1054 3 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1055 1 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1056 1 :
1057 1 : tracing::debug!("Pushing...");
1058 1 : client
1059 1 : .push_layers(
1060 1 : tenant_shard_id,
1061 1 : TIMELINE_ID,
1062 1 : latest_generation,
1063 1 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1064 1 : )
1065 UBC 0 : .await?;
1066 :
1067 : // We enqueued the operation in a fresh generation: it should have passed validation
1068 CBC 1 : tracing::debug!("Flushing...");
1069 3 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1070 1 : assert_remote_files(&[], &remote_timeline_path);
1071 1 :
1072 1 : Ok(())
1073 : }
1074 :
1075 1 : #[tokio::test]
1076 1 : async fn deletion_queue_recovery() -> anyhow::Result<()> {
1077 1 : // Basic test that the deletion queue processes the deletions we pass into it
1078 1 : let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
1079 1 : let client = ctx.deletion_queue.new_client();
1080 1 : client.recover(HashMap::new())?;
1081 :
1082 1 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1083 1 :
1084 1 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1085 1 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1086 1 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
1087 1 :
1088 1 : let layer_generation = Generation::new(0xdeadbeef);
1089 1 : let now_generation = Generation::new(0xfeedbeef);
1090 1 : let layer_metadata =
1091 1 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1092 :
1093 : // Inject a deletion in the generation before generation_now: after restart,
1094 : // this deletion should _not_ get executed (only the immediately previous
1095 : // generation gets that treatment)
1096 1 : let remote_layer_file_name_historical =
1097 1 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1098 1 : client
1099 1 : .push_layers(
1100 1 : tenant_shard_id,
1101 1 : TIMELINE_ID,
1102 1 : now_generation.previous(),
1103 1 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1104 1 : )
1105 UBC 0 : .await?;
1106 :
1107 : // Inject a deletion in the generation before generation_now: after restart,
1108 : // this deletion should get executed, because we execute deletions in the
1109 : // immediately previous generation on the same node.
1110 CBC 1 : let remote_layer_file_name_previous =
1111 1 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
1112 1 : client
1113 1 : .push_layers(
1114 1 : tenant_shard_id,
1115 1 : TIMELINE_ID,
1116 1 : now_generation,
1117 1 : [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
1118 1 : )
1119 UBC 0 : .await?;
1120 :
1121 CBC 1 : client.flush().await?;
1122 1 : assert_remote_files(
1123 1 : &[
1124 1 : &remote_layer_file_name_historical,
1125 1 : &remote_layer_file_name_previous,
1126 1 : ],
1127 1 : &remote_timeline_path,
1128 1 : );
1129 1 :
1130 1 : // Different generatinos for the same tenant will cause two separate
1131 1 : // deletion lists to be emitted.
1132 1 : assert_local_files(
1133 1 : &["0000000000000001-01.list", "0000000000000002-01.list"],
1134 1 : &deletion_prefix,
1135 1 : );
1136 1 :
1137 1 : // Simulate a node restart: the latest generation advances
1138 1 : let now_generation = now_generation.next();
1139 1 : ctx.set_latest_generation(now_generation);
1140 1 :
1141 1 : // Restart the deletion queue
1142 1 : drop(client);
1143 1 : ctx.restart().await;
1144 1 : let client = ctx.deletion_queue.new_client();
1145 1 : client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
1146 :
1147 1 : info!("Flush-executing");
1148 3 : client.flush_execute().await?;
1149 : // The deletion from immediately prior generation was executed, the one from
1150 : // an older generation was not.
1151 1 : assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
1152 1 : Ok(())
1153 : }
1154 : }
1155 :
1156 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
1157 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
1158 : #[cfg(test)]
1159 : pub(crate) mod mock {
1160 : use tracing::info;
1161 :
1162 : use crate::tenant::remote_timeline_client::remote_layer_path;
1163 :
1164 : use super::*;
1165 : use std::sync::{
1166 : atomic::{AtomicUsize, Ordering},
1167 : Arc,
1168 : };
1169 :
1170 : pub struct ConsumerState {
1171 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
1172 : executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
1173 : }
1174 :
1175 : impl ConsumerState {
1176 1 : async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
1177 1 : let mut executed = 0;
1178 :
1179 1 : info!("Executing all pending deletions");
1180 :
1181 : // Transform all executor messages to generic frontend messages
1182 1 : while let Ok(msg) = self.executor_rx.try_recv() {
1183 UBC 0 : match msg {
1184 0 : DeleterMessage::Delete(objects) => {
1185 0 : for path in objects {
1186 0 : match remote_storage.delete(&path).await {
1187 : Ok(_) => {
1188 0 : debug!("Deleted {path}");
1189 : }
1190 0 : Err(e) => {
1191 0 : error!("Failed to delete {path}, leaking object! ({e})");
1192 : }
1193 : }
1194 0 : executed += 1;
1195 : }
1196 : }
1197 0 : DeleterMessage::Flush(flush_op) => {
1198 0 : flush_op.notify();
1199 0 : }
1200 : }
1201 : }
1202 :
1203 CBC 2 : while let Ok(msg) = self.rx.try_recv() {
1204 1 : match msg {
1205 1 : ListWriterQueueMessage::Delete(op) => {
1206 1 : let mut objects = op.objects;
1207 2 : for (layer, meta) in op.layers {
1208 1 : objects.push(remote_layer_path(
1209 1 : &op.tenant_shard_id.tenant_id,
1210 1 : &op.timeline_id,
1211 1 : meta.shard,
1212 1 : &layer,
1213 1 : meta.generation,
1214 1 : ));
1215 1 : }
1216 :
1217 2 : for path in objects {
1218 1 : info!("Executing deletion {path}");
1219 1 : match remote_storage.delete(&path).await {
1220 : Ok(_) => {
1221 UBC 0 : debug!("Deleted {path}");
1222 : }
1223 0 : Err(e) => {
1224 0 : error!("Failed to delete {path}, leaking object! ({e})");
1225 : }
1226 : }
1227 CBC 1 : executed += 1;
1228 : }
1229 : }
1230 UBC 0 : ListWriterQueueMessage::Flush(op) => {
1231 0 : op.notify();
1232 0 : }
1233 0 : ListWriterQueueMessage::FlushExecute(op) => {
1234 0 : // We have already executed all prior deletions because mock does them inline
1235 0 : op.notify();
1236 0 : }
1237 0 : ListWriterQueueMessage::Recover(_) => {
1238 0 : // no-op in mock
1239 0 : }
1240 : }
1241 CBC 1 : info!("All pending deletions have been executed");
1242 : }
1243 :
1244 1 : executed
1245 1 : }
1246 : }
1247 :
1248 : pub struct MockDeletionQueue {
1249 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
1250 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
1251 : executed: Arc<AtomicUsize>,
1252 : remote_storage: Option<GenericRemoteStorage>,
1253 : consumer: std::sync::Mutex<ConsumerState>,
1254 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
1255 : }
1256 :
1257 : impl MockDeletionQueue {
1258 41 : pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
1259 41 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1260 41 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
1261 41 :
1262 41 : let executed = Arc::new(AtomicUsize::new(0));
1263 41 :
1264 41 : Self {
1265 41 : tx,
1266 41 : executor_tx,
1267 41 : executed,
1268 41 : remote_storage,
1269 41 : consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
1270 41 : lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
1271 41 : }
1272 41 : }
1273 :
1274 : #[allow(clippy::await_holding_lock)]
1275 1 : pub async fn pump(&self) {
1276 1 : if let Some(remote_storage) = &self.remote_storage {
1277 : // Permit holding mutex across await, because this is only ever
1278 : // called once at a time in tests.
1279 1 : let mut locked = self.consumer.lock().unwrap();
1280 1 : let count = locked.consume(remote_storage).await;
1281 1 : self.executed.fetch_add(count, Ordering::Relaxed);
1282 UBC 0 : }
1283 CBC 1 : }
1284 :
1285 47 : pub(crate) fn new_client(&self) -> DeletionQueueClient {
1286 47 : DeletionQueueClient {
1287 47 : tx: self.tx.clone(),
1288 47 : executor_tx: self.executor_tx.clone(),
1289 47 : lsn_table: self.lsn_table.clone(),
1290 47 : }
1291 47 : }
1292 : }
1293 :
1294 : /// Test round-trip serialization/deserialization, and test stability of the format
1295 : /// vs. a static expected string for the serialized version.
1296 1 : #[test]
1297 1 : fn deletion_list_serialization() -> anyhow::Result<()> {
1298 1 : let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
1299 1 : .to_string()
1300 1 : .parse::<TenantShardId>()?;
1301 1 : let timeline_id = "be322c834ed9e709e63b5c9698691910"
1302 1 : .to_string()
1303 1 : .parse::<TimelineId>()?;
1304 1 : let generation = Generation::new(123);
1305 :
1306 1 : let object =
1307 1 : RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
1308 1 : let mut objects = [object].to_vec();
1309 1 :
1310 1 : let mut example = DeletionList::new(1);
1311 1 : example.push(&tenant_id, &timeline_id, generation, &mut objects);
1312 :
1313 1 : let encoded = serde_json::to_string(&example)?;
1314 :
1315 1 : let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
1316 1 : assert_eq!(encoded, expected);
1317 :
1318 1 : let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
1319 1 : assert_eq!(example, decoded);
1320 :
1321 1 : Ok(())
1322 1 : }
1323 : }
|