Line data Source code
1 : mod deleter;
2 : mod list_writer;
3 : mod validator;
4 :
5 : use std::collections::HashMap;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use crate::control_plane_client::ControlPlaneGenerationsApi;
10 : use crate::metrics;
11 : use crate::tenant::remote_timeline_client::remote_layer_path;
12 : use crate::tenant::remote_timeline_client::remote_timeline_path;
13 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
14 : use crate::virtual_file::MaybeFatalIo;
15 : use crate::virtual_file::VirtualFile;
16 : use anyhow::Context;
17 : use camino::Utf8PathBuf;
18 : use pageserver_api::shard::TenantShardId;
19 : use remote_storage::{GenericRemoteStorage, RemotePath};
20 : use serde::Deserialize;
21 : use serde::Serialize;
22 : use thiserror::Error;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::Instrument;
25 : use tracing::{debug, error};
26 : use utils::crashsafe::path_with_suffix_extension;
27 : use utils::generation::Generation;
28 : use utils::id::TimelineId;
29 : use utils::lsn::AtomicLsn;
30 : use utils::lsn::Lsn;
31 :
32 : use self::deleter::Deleter;
33 : use self::list_writer::DeletionOp;
34 : use self::list_writer::ListWriter;
35 : use self::list_writer::RecoverOp;
36 : use self::validator::Validator;
37 : use deleter::DeleterMessage;
38 : use list_writer::ListWriterQueueMessage;
39 : use validator::ValidatorQueueMessage;
40 :
41 : use crate::{config::PageServerConf, tenant::storage_layer::LayerName};
42 :
43 : // TODO: configurable for how long to wait before executing deletions
44 :
45 : /// We aggregate object deletions from many tenants in one place, for several reasons:
46 : /// - Coalesce deletions into fewer DeleteObjects calls
47 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
48 : /// to flush any outstanding deletions.
49 : /// - Globally control throughput of deletions, as these are a low priority task: do
50 : /// not compete with the same S3 clients/connections used for higher priority uploads.
51 : /// - Enable gating deletions on validation of a tenant's generation number, to make
52 : /// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
53 : ///
54 : /// There are two kinds of deletion: deferred and immediate. A deferred deletion
55 : /// may be intentionally delayed to protect passive readers of S3 data, and is
56 : /// subject to a generation number validation step. An immediate deletion is
57 : /// ready to execute immediately, and is only queued up so that it can be coalesced
58 : /// with other deletions in flight.
59 : ///
60 : /// Deferred deletions pass through three steps:
61 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
62 : /// DeletionLists, which are persisted to disk.
63 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
64 : /// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn
65 : /// updates for running timelines.
66 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
67 : /// batches of 1000 keys via DeleteObjects.
68 : ///
69 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
70 : /// two stages and are passed straight into the Deleter.
71 : ///
72 : /// Internally, each stage is joined by a channel to the next. On disk, there is only
73 : /// one queue (of DeletionLists), which is written by the frontend and consumed
74 : /// by the backend.
75 : #[derive(Clone)]
76 : pub struct DeletionQueue {
77 : client: DeletionQueueClient,
78 :
79 : // Parent cancellation token for the tokens passed into background workers
80 : cancel: CancellationToken,
81 : }
82 :
83 : /// Opaque wrapper around individual worker tasks, to avoid making the
84 : /// worker objects themselves public
85 : pub struct DeletionQueueWorkers<C>
86 : where
87 : C: ControlPlaneGenerationsApi + Send + Sync,
88 : {
89 : frontend: ListWriter,
90 : backend: Validator<C>,
91 : executor: Deleter,
92 : }
93 :
94 : impl<C> DeletionQueueWorkers<C>
95 : where
96 : C: ControlPlaneGenerationsApi + Send + Sync + 'static,
97 : {
98 24 : pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
99 24 : let jh_frontend = runtime.spawn(async move {
100 24 : self.frontend
101 24 : .background()
102 24 : .instrument(tracing::info_span!(parent:None, "deletion frontend"))
103 152 : .await
104 24 : });
105 24 : let jh_backend = runtime.spawn(async move {
106 24 : self.backend
107 24 : .background()
108 24 : .instrument(tracing::info_span!(parent:None, "deletion backend"))
109 173 : .await
110 24 : });
111 24 : let jh_executor = runtime.spawn(async move {
112 24 : self.executor
113 24 : .background()
114 24 : .instrument(tracing::info_span!(parent:None, "deletion executor"))
115 84 : .await
116 24 : });
117 24 :
118 24 : runtime.spawn({
119 24 : async move {
120 24 : jh_frontend.await.expect("error joining frontend worker");
121 6 : jh_backend.await.expect("error joining backend worker");
122 6 : drop(jh_executor.await.expect("error joining executor worker"));
123 24 : }
124 24 : })
125 24 : }
126 : }
127 :
128 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
129 : /// another channel, and the receive side will receive a message when the channel
130 : /// we're flushing has reached the FlushOp we sent into it.
131 : ///
132 : /// The only extra behavior beyond the channel is that the notify() method does not
133 : /// return an error when the receive side has been dropped, because in this use case
134 : /// it is harmless (the code that initiated the flush no longer cares about the result).
135 : #[derive(Debug)]
136 : struct FlushOp {
137 : tx: tokio::sync::oneshot::Sender<()>,
138 : }
139 :
140 : impl FlushOp {
141 126 : fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
142 126 : let (tx, rx) = tokio::sync::oneshot::channel::<()>();
143 126 : (Self { tx }, rx)
144 126 : }
145 :
146 126 : fn notify(self) {
147 126 : if self.tx.send(()).is_err() {
148 : // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
149 0 : debug!("deletion queue flush from dropped client");
150 126 : };
151 126 : }
152 : }
153 :
154 : #[derive(Clone, Debug)]
155 : pub struct DeletionQueueClient {
156 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
157 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
158 :
159 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
160 : }
161 :
162 54 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
163 : struct TenantDeletionList {
164 : /// For each Timeline, a list of key fragments to append to the timeline remote path
165 : /// when reconstructing a full key
166 : timelines: HashMap<TimelineId, Vec<String>>,
167 :
168 : /// The generation in which this deletion was emitted: note that this may not be the
169 : /// same as the generation of any layers being deleted. The generation of the layer
170 : /// has already been absorbed into the keys in `objects`
171 : generation: Generation,
172 : }
173 :
174 : impl TenantDeletionList {
175 30 : pub(crate) fn len(&self) -> usize {
176 30 : self.timelines.values().map(|v| v.len()).sum()
177 30 : }
178 : }
179 :
180 : /// Files ending with this suffix will be ignored and erased
181 : /// during recovery as startup.
182 : const TEMP_SUFFIX: &str = "tmp";
183 :
184 90 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
185 : struct DeletionList {
186 : /// Serialization version, for future use
187 : version: u8,
188 :
189 : /// Used for constructing a unique key for each deletion list we write out.
190 : sequence: u64,
191 :
192 : /// To avoid repeating tenant/timeline IDs in every key, we store keys in
193 : /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
194 : /// with one unique generation ID: if someone tries to push a second generation
195 : /// ID for the same tenant, we will start a new DeletionList.
196 : tenants: HashMap<TenantShardId, TenantDeletionList>,
197 :
198 : /// Avoid having to walk `tenants` to calculate the number of keys in
199 : /// the nested deletion lists
200 : size: usize,
201 :
202 : /// Set to true when the list has undergone validation with the control
203 : /// plane and the remaining contents of `tenants` are valid. A list may
204 : /// also be implicitly marked valid by DeletionHeader.validated_sequence
205 : /// advancing to >= DeletionList.sequence
206 : #[serde(default)]
207 : #[serde(skip_serializing_if = "std::ops::Not::not")]
208 : validated: bool,
209 : }
210 :
211 0 : #[derive(Debug, Serialize, Deserialize)]
212 : struct DeletionHeader {
213 : /// Serialization version, for future use
214 : version: u8,
215 :
216 : /// The highest sequence number (inclusive) that has been validated. All deletion
217 : /// lists on disk with a sequence <= this value are safe to execute.
218 : validated_sequence: u64,
219 : }
220 :
221 : impl DeletionHeader {
222 : const VERSION_LATEST: u8 = 1;
223 :
224 24 : fn new(validated_sequence: u64) -> Self {
225 24 : Self {
226 24 : version: Self::VERSION_LATEST,
227 24 : validated_sequence,
228 24 : }
229 24 : }
230 :
231 24 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
232 24 : debug!("Saving deletion list header {:?}", self);
233 24 : let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
234 24 : let header_path = conf.deletion_header_path();
235 24 : let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
236 24 : VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
237 24 : .await
238 24 : .maybe_fatal_err("save deletion header")?;
239 :
240 24 : Ok(())
241 24 : }
242 : }
243 :
244 : impl DeletionList {
245 : const VERSION_LATEST: u8 = 1;
246 60 : fn new(sequence: u64) -> Self {
247 60 : Self {
248 60 : version: Self::VERSION_LATEST,
249 60 : sequence,
250 60 : tenants: HashMap::new(),
251 60 : size: 0,
252 60 : validated: false,
253 60 : }
254 60 : }
255 :
256 80 : fn is_empty(&self) -> bool {
257 80 : self.tenants.is_empty()
258 80 : }
259 :
260 180 : fn len(&self) -> usize {
261 180 : self.size
262 180 : }
263 :
264 : /// Returns true if the push was accepted, false if the caller must start a new
265 : /// deletion list.
266 42 : fn push(
267 42 : &mut self,
268 42 : tenant: &TenantShardId,
269 42 : timeline: &TimelineId,
270 42 : generation: Generation,
271 42 : objects: &mut Vec<RemotePath>,
272 42 : ) -> bool {
273 42 : if objects.is_empty() {
274 : // Avoid inserting an empty TimelineDeletionList: this preserves the property
275 : // that if we have no keys, then self.objects is empty (used in Self::is_empty)
276 0 : return true;
277 42 : }
278 42 :
279 42 : let tenant_entry = self
280 42 : .tenants
281 42 : .entry(*tenant)
282 42 : .or_insert_with(|| TenantDeletionList {
283 36 : timelines: HashMap::new(),
284 36 : generation,
285 42 : });
286 42 :
287 42 : if tenant_entry.generation != generation {
288 : // Only one generation per tenant per list: signal to
289 : // caller to start a new list.
290 6 : return false;
291 36 : }
292 36 :
293 36 : let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
294 36 :
295 36 : let timeline_remote_path = remote_timeline_path(tenant, timeline);
296 36 :
297 36 : self.size += objects.len();
298 36 : timeline_entry.extend(objects.drain(..).map(|p| {
299 36 : p.strip_prefix(&timeline_remote_path)
300 36 : .expect("Timeline paths always start with the timeline prefix")
301 36 : .to_string()
302 36 : }));
303 36 : true
304 42 : }
305 :
306 30 : fn into_remote_paths(self) -> Vec<RemotePath> {
307 30 : let mut result = Vec::new();
308 30 : for (tenant, tenant_deletions) in self.tenants.into_iter() {
309 18 : for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
310 18 : let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
311 18 : result.extend(
312 18 : timeline_layers
313 18 : .into_iter()
314 18 : .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
315 18 : );
316 18 : }
317 : }
318 :
319 30 : result
320 30 : }
321 :
322 42 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
323 42 : let path = conf.deletion_list_path(self.sequence);
324 42 : let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
325 42 :
326 42 : let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
327 42 :
328 42 : VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
329 42 : .await
330 42 : .maybe_fatal_err("save deletion list")
331 42 : .map_err(Into::into)
332 42 : }
333 : }
334 :
335 : impl std::fmt::Display for DeletionList {
336 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 0 : write!(
338 0 : f,
339 0 : "DeletionList<seq={}, tenants={}, keys={}>",
340 0 : self.sequence,
341 0 : self.tenants.len(),
342 0 : self.size
343 0 : )
344 0 : }
345 : }
346 :
347 : struct PendingLsn {
348 : projected: Lsn,
349 : result_slot: Arc<AtomicLsn>,
350 : }
351 :
352 : struct TenantLsnState {
353 : timelines: HashMap<TimelineId, PendingLsn>,
354 :
355 : // In what generation was the most recent update proposed?
356 : generation: Generation,
357 : }
358 :
359 : #[derive(Default)]
360 : struct VisibleLsnUpdates {
361 : tenants: HashMap<TenantShardId, TenantLsnState>,
362 : }
363 :
364 : impl VisibleLsnUpdates {
365 594 : fn new() -> Self {
366 594 : Self {
367 594 : tenants: HashMap::new(),
368 594 : }
369 594 : }
370 : }
371 :
372 : impl std::fmt::Debug for VisibleLsnUpdates {
373 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374 0 : write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
375 0 : }
376 : }
377 :
378 0 : #[derive(Error, Debug)]
379 : pub enum DeletionQueueError {
380 : #[error("Deletion queue unavailable during shutdown")]
381 : ShuttingDown,
382 : }
383 :
384 : impl DeletionQueueClient {
385 : /// This is cancel-safe. If you drop the future before it completes, the message
386 : /// is not pushed, although in the context of the deletion queue it doesn't matter: once
387 : /// we decide to do a deletion the decision is always final.
388 844 : fn do_push<T>(
389 844 : &self,
390 844 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
391 844 : msg: T,
392 844 : ) -> Result<(), DeletionQueueError> {
393 844 : match queue.send(msg) {
394 844 : Ok(_) => Ok(()),
395 0 : Err(e) => {
396 0 : // This shouldn't happen, we should shut down all tenants before
397 0 : // we shut down the global delete queue. If we encounter a bug like this,
398 0 : // we may leak objects as deletions won't be processed.
399 0 : error!("Deletion queue closed while pushing, shutting down? ({e})");
400 0 : Err(DeletionQueueError::ShuttingDown)
401 : }
402 : }
403 844 : }
404 :
405 24 : pub(crate) fn recover(
406 24 : &self,
407 24 : attached_tenants: HashMap<TenantShardId, Generation>,
408 24 : ) -> Result<(), DeletionQueueError> {
409 24 : self.do_push(
410 24 : &self.tx,
411 24 : ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
412 24 : )
413 24 : }
414 :
415 : /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
416 : /// world, it must validate its generation number before doing so. Rather than do this synchronously,
417 : /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
418 : /// recently validated separately.
419 : ///
420 : /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The
421 : /// backend will later wake up and notice that the tenant's generation requires validation.
422 4217 : pub(crate) async fn update_remote_consistent_lsn(
423 4217 : &self,
424 4217 : tenant_shard_id: TenantShardId,
425 4217 : timeline_id: TimelineId,
426 4217 : current_generation: Generation,
427 4217 : lsn: Lsn,
428 4217 : result_slot: Arc<AtomicLsn>,
429 4217 : ) {
430 4217 : let mut locked = self
431 4217 : .lsn_table
432 4217 : .write()
433 4217 : .expect("Lock should never be poisoned");
434 4217 :
435 4217 : let tenant_entry = locked
436 4217 : .tenants
437 4217 : .entry(tenant_shard_id)
438 4217 : .or_insert(TenantLsnState {
439 4217 : timelines: HashMap::new(),
440 4217 : generation: current_generation,
441 4217 : });
442 4217 :
443 4217 : if tenant_entry.generation != current_generation {
444 0 : // Generation might have changed if we were detached and then re-attached: in this case,
445 0 : // state from the previous generation cannot be trusted.
446 0 : tenant_entry.timelines.clear();
447 0 : tenant_entry.generation = current_generation;
448 4217 : }
449 :
450 4217 : tenant_entry.timelines.insert(
451 4217 : timeline_id,
452 4217 : PendingLsn {
453 4217 : projected: lsn,
454 4217 : result_slot,
455 4217 : },
456 4217 : );
457 4217 : }
458 :
459 : /// Submit a list of layers for deletion: this function will return before the deletion is
460 : /// persistent, but it may be executed at any time after this function enters: do not push
461 : /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
462 : /// references them).
463 : ///
464 : /// The `current_generation` is the generation of this pageserver's current attachment. The
465 : /// generations in `layers` are the generations in which those layers were written.
466 748 : pub(crate) async fn push_layers(
467 748 : &self,
468 748 : tenant_shard_id: TenantShardId,
469 748 : timeline_id: TimelineId,
470 748 : current_generation: Generation,
471 748 : layers: Vec<(LayerName, LayerFileMetadata)>,
472 748 : ) -> Result<(), DeletionQueueError> {
473 748 : if current_generation.is_none() {
474 0 : debug!("Enqueuing deletions in legacy mode, skipping queue");
475 :
476 0 : let mut layer_paths = Vec::new();
477 0 : for (layer, meta) in layers {
478 0 : layer_paths.push(remote_layer_path(
479 0 : &tenant_shard_id.tenant_id,
480 0 : &timeline_id,
481 0 : meta.shard,
482 0 : &layer,
483 0 : meta.generation,
484 0 : ));
485 0 : }
486 0 : self.push_immediate(layer_paths).await?;
487 0 : return self.flush_immediate().await;
488 748 : }
489 748 :
490 748 : self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
491 748 : }
492 :
493 : /// When a Tenant has a generation, push_layers is always synchronous because
494 : /// the ListValidator channel is an unbounded channel.
495 : ///
496 : /// This can be merged into push_layers when we remove the Generation-less mode
497 : /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
498 748 : pub(crate) fn push_layers_sync(
499 748 : &self,
500 748 : tenant_shard_id: TenantShardId,
501 748 : timeline_id: TimelineId,
502 748 : current_generation: Generation,
503 748 : layers: Vec<(LayerName, LayerFileMetadata)>,
504 748 : ) -> Result<(), DeletionQueueError> {
505 748 : metrics::DELETION_QUEUE
506 748 : .keys_submitted
507 748 : .inc_by(layers.len() as u64);
508 748 : self.do_push(
509 748 : &self.tx,
510 748 : ListWriterQueueMessage::Delete(DeletionOp {
511 748 : tenant_shard_id,
512 748 : timeline_id,
513 748 : layers,
514 748 : generation: current_generation,
515 748 : objects: Vec::new(),
516 748 : }),
517 748 : )
518 748 : }
519 :
520 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
521 72 : async fn do_flush<T>(
522 72 : &self,
523 72 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
524 72 : msg: T,
525 72 : rx: tokio::sync::oneshot::Receiver<()>,
526 72 : ) -> Result<(), DeletionQueueError> {
527 72 : self.do_push(queue, msg)?;
528 72 : if rx.await.is_err() {
529 : // This shouldn't happen if tenants are shut down before deletion queue. If we
530 : // encounter a bug like this, then a flusher will incorrectly believe it has flushed
531 : // when it hasn't, possibly leading to leaking objects.
532 0 : error!("Deletion queue dropped flush op while client was still waiting");
533 0 : Err(DeletionQueueError::ShuttingDown)
534 : } else {
535 72 : Ok(())
536 : }
537 72 : }
538 :
539 : /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
540 : ///
541 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
542 42 : pub async fn flush(&self) -> Result<(), DeletionQueueError> {
543 42 : let (flush_op, rx) = FlushOp::new();
544 42 : self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
545 42 : .await
546 42 : }
547 :
548 : /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
549 : /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
550 : /// detach where flushing is nice but not necessary.
551 : ///
552 : /// This function provides no guarantees of work being done.
553 0 : pub fn flush_advisory(&self) {
554 0 : let (flush_op, _) = FlushOp::new();
555 0 :
556 0 : // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
557 0 : drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
558 0 : }
559 :
560 : // Wait until all previous deletions are executed
561 30 : pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
562 30 : debug!("flush_execute: flushing to deletion lists...");
563 : // Flush any buffered work to deletion lists
564 30 : self.flush().await?;
565 :
566 : // Flush the backend into the executor of deletion lists
567 30 : let (flush_op, rx) = FlushOp::new();
568 30 : debug!("flush_execute: flushing backend...");
569 30 : self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
570 30 : .await?;
571 30 : debug!("flush_execute: finished flushing backend...");
572 :
573 : // Flush any immediate-mode deletions (the above backend flush will only flush
574 : // the executor if deletions had flowed through the backend)
575 30 : debug!("flush_execute: flushing execution...");
576 30 : self.flush_immediate().await?;
577 30 : debug!("flush_execute: finished flushing execution...");
578 30 : Ok(())
579 30 : }
580 :
581 : /// This interface bypasses the persistent deletion queue, and any validation
582 : /// that this pageserver is still elegible to execute the deletions. It is for
583 : /// use in timeline deletions, where the control plane is telling us we may
584 : /// delete everything in the timeline.
585 : ///
586 : /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
587 0 : pub(crate) async fn push_immediate(
588 0 : &self,
589 0 : objects: Vec<RemotePath>,
590 0 : ) -> Result<(), DeletionQueueError> {
591 0 : metrics::DELETION_QUEUE
592 0 : .keys_submitted
593 0 : .inc_by(objects.len() as u64);
594 0 : self.executor_tx
595 0 : .send(DeleterMessage::Delete(objects))
596 0 : .await
597 0 : .map_err(|_| DeletionQueueError::ShuttingDown)
598 0 : }
599 :
600 : /// Companion to push_immediate. When this returns Ok, all prior objects sent
601 : /// into push_immediate have been deleted from remote storage.
602 30 : pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
603 30 : let (flush_op, rx) = FlushOp::new();
604 30 : self.executor_tx
605 30 : .send(DeleterMessage::Flush(flush_op))
606 0 : .await
607 30 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
608 :
609 30 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
610 30 : }
611 : }
612 :
613 : impl DeletionQueue {
614 24 : pub fn new_client(&self) -> DeletionQueueClient {
615 24 : self.client.clone()
616 24 : }
617 :
618 : /// Caller may use the returned object to construct clients with new_client.
619 : /// Caller should tokio::spawn the background() members of the two worker objects returned:
620 : /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
621 : ///
622 : /// If remote_storage is None, then the returned workers will also be None.
623 24 : pub fn new<C>(
624 24 : remote_storage: GenericRemoteStorage,
625 24 : control_plane_client: Option<C>,
626 24 : conf: &'static PageServerConf,
627 24 : ) -> (Self, Option<DeletionQueueWorkers<C>>)
628 24 : where
629 24 : C: ControlPlaneGenerationsApi + Send + Sync,
630 24 : {
631 24 : // Unbounded channel: enables non-async functions to submit deletions. The actual length is
632 24 : // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
633 24 : // enough to avoid this taking pathologically large amount of memory.
634 24 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
635 24 :
636 24 : // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
637 24 : let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
638 24 :
639 24 : // Shallow channel: it carries lists of paths, and we expect the main queueing to
640 24 : // happen in the backend (persistent), not in this queue.
641 24 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
642 24 :
643 24 : let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
644 24 :
645 24 : // The deletion queue has an independent cancellation token to
646 24 : // the general pageserver shutdown token, because it stays alive a bit
647 24 : // longer to flush after Tenants have all been torn down.
648 24 : let cancel = CancellationToken::new();
649 24 :
650 24 : (
651 24 : Self {
652 24 : client: DeletionQueueClient {
653 24 : tx,
654 24 : executor_tx: executor_tx.clone(),
655 24 : lsn_table: lsn_table.clone(),
656 24 : },
657 24 : cancel: cancel.clone(),
658 24 : },
659 24 : Some(DeletionQueueWorkers {
660 24 : frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
661 24 : backend: Validator::new(
662 24 : conf,
663 24 : backend_rx,
664 24 : executor_tx,
665 24 : control_plane_client,
666 24 : lsn_table.clone(),
667 24 : cancel.clone(),
668 24 : ),
669 24 : executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
670 24 : }),
671 24 : )
672 24 : }
673 :
674 0 : pub async fn shutdown(&mut self, timeout: Duration) {
675 0 : match tokio::time::timeout(timeout, self.client.flush()).await {
676 : Ok(Ok(())) => {
677 0 : tracing::info!("Deletion queue flushed successfully on shutdown")
678 : }
679 : Ok(Err(DeletionQueueError::ShuttingDown)) => {
680 : // This is not harmful for correctness, but is unexpected: the deletion
681 : // queue's workers should stay alive as long as there are any client handles instantiated.
682 0 : tracing::warn!("Deletion queue stopped prematurely");
683 : }
684 0 : Err(_timeout) => {
685 0 : tracing::warn!("Timed out flushing deletion queue on shutdown")
686 : }
687 : }
688 :
689 : // We only cancel _after_ flushing: otherwise we would be shutting down the
690 : // components that do the flush.
691 0 : self.cancel.cancel();
692 0 : }
693 : }
694 :
695 : #[cfg(test)]
696 : mod test {
697 : use camino::Utf8Path;
698 : use hex_literal::hex;
699 : use pageserver_api::{shard::ShardIndex, upcall_api::ReAttachResponseTenant};
700 : use std::{io::ErrorKind, time::Duration};
701 : use tracing::info;
702 :
703 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
704 : use tokio::task::JoinHandle;
705 :
706 : use crate::{
707 : control_plane_client::RetryForeverError,
708 : repository::Key,
709 : tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
710 : };
711 :
712 : use super::*;
713 : pub const TIMELINE_ID: TimelineId =
714 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
715 :
716 : pub const EXAMPLE_LAYER_NAME: LayerName = LayerName::Delta(DeltaLayerName {
717 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
718 : lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
719 : });
720 :
721 : // When you need a second layer in a test.
722 : pub const EXAMPLE_LAYER_NAME_ALT: LayerName = LayerName::Delta(DeltaLayerName {
723 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
724 : lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
725 : });
726 :
727 : struct TestSetup {
728 : harness: TenantHarness,
729 : remote_fs_dir: Utf8PathBuf,
730 : storage: GenericRemoteStorage,
731 : mock_control_plane: MockControlPlane,
732 : deletion_queue: DeletionQueue,
733 : worker_join: JoinHandle<()>,
734 : }
735 :
736 : impl TestSetup {
737 : /// Simulate a pageserver restart by destroying and recreating the deletion queue
738 6 : async fn restart(&mut self) {
739 6 : let (deletion_queue, workers) = DeletionQueue::new(
740 6 : self.storage.clone(),
741 6 : Some(self.mock_control_plane.clone()),
742 6 : self.harness.conf,
743 6 : );
744 6 :
745 6 : tracing::debug!("Spawning worker for new queue queue");
746 6 : let worker_join = workers
747 6 : .unwrap()
748 6 : .spawn_with(&tokio::runtime::Handle::current());
749 6 :
750 6 : let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
751 6 : let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
752 6 :
753 6 : tracing::debug!("Joining worker from previous queue");
754 6 : old_deletion_queue.cancel.cancel();
755 6 : old_worker_join
756 6 : .await
757 6 : .expect("Failed to join workers for previous deletion queue");
758 6 : }
759 :
760 18 : fn set_latest_generation(&self, gen: Generation) {
761 18 : let tenant_shard_id = self.harness.tenant_shard_id;
762 18 : self.mock_control_plane
763 18 : .latest_generation
764 18 : .lock()
765 18 : .unwrap()
766 18 : .insert(tenant_shard_id, gen);
767 18 : }
768 :
769 : /// Returns remote layer file name, suitable for use in assert_remote_files
770 18 : fn write_remote_layer(
771 18 : &self,
772 18 : file_name: LayerName,
773 18 : gen: Generation,
774 18 : ) -> anyhow::Result<String> {
775 18 : let tenant_shard_id = self.harness.tenant_shard_id;
776 18 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
777 18 : let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
778 18 : std::fs::create_dir_all(&remote_timeline_path)?;
779 18 : let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
780 18 :
781 18 : let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
782 18 :
783 18 : std::fs::write(
784 18 : remote_timeline_path.join(remote_layer_file_name.clone()),
785 18 : content,
786 18 : )?;
787 :
788 18 : Ok(remote_layer_file_name)
789 18 : }
790 : }
791 :
792 : #[derive(Debug, Clone)]
793 : struct MockControlPlane {
794 : pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
795 : }
796 :
797 : impl MockControlPlane {
798 18 : fn new() -> Self {
799 18 : Self {
800 18 : latest_generation: Arc::default(),
801 18 : }
802 18 : }
803 : }
804 :
805 : impl ControlPlaneGenerationsApi for MockControlPlane {
806 0 : async fn re_attach(
807 0 : &self,
808 0 : _conf: &PageServerConf,
809 0 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
810 0 : unimplemented!()
811 : }
812 :
813 24 : async fn validate(
814 24 : &self,
815 24 : tenants: Vec<(TenantShardId, Generation)>,
816 24 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
817 24 : let mut result = HashMap::new();
818 24 :
819 24 : let latest_generation = self.latest_generation.lock().unwrap();
820 :
821 48 : for (tenant_shard_id, generation) in tenants {
822 24 : if let Some(latest) = latest_generation.get(&tenant_shard_id) {
823 24 : result.insert(tenant_shard_id, *latest == generation);
824 24 : }
825 : }
826 :
827 24 : Ok(result)
828 24 : }
829 : }
830 :
831 18 : async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
832 18 : let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
833 18 : let harness = TenantHarness::create(test_name).await?;
834 :
835 : // We do not load() the harness: we only need its config and remote_storage
836 :
837 : // Set up a GenericRemoteStorage targetting a directory
838 18 : let remote_fs_dir = harness.conf.workdir.join("remote_fs");
839 18 : std::fs::create_dir_all(remote_fs_dir)?;
840 18 : let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
841 18 : let storage_config = RemoteStorageConfig {
842 18 : storage: RemoteStorageKind::LocalFs {
843 18 : local_path: remote_fs_dir.clone(),
844 18 : },
845 18 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
846 18 : };
847 18 : let storage = GenericRemoteStorage::from_config(&storage_config)
848 0 : .await
849 18 : .unwrap();
850 18 :
851 18 : let mock_control_plane = MockControlPlane::new();
852 18 :
853 18 : let (deletion_queue, worker) = DeletionQueue::new(
854 18 : storage.clone(),
855 18 : Some(mock_control_plane.clone()),
856 18 : harness.conf,
857 18 : );
858 18 :
859 18 : let worker = worker.unwrap();
860 18 : let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
861 18 :
862 18 : Ok(TestSetup {
863 18 : harness,
864 18 : remote_fs_dir,
865 18 : storage,
866 18 : mock_control_plane,
867 18 : deletion_queue,
868 18 : worker_join,
869 18 : })
870 18 : }
871 :
872 : // TODO: put this in a common location so that we can share with remote_timeline_client's tests
873 54 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
874 54 : let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
875 54 : expected.sort();
876 54 :
877 54 : let mut found: Vec<String> = Vec::new();
878 54 : let dir = match std::fs::read_dir(remote_path) {
879 54 : Ok(d) => d,
880 0 : Err(e) => {
881 0 : if e.kind() == ErrorKind::NotFound {
882 0 : if expected.is_empty() {
883 : // We are asserting prefix is empty: it is expected that the dir is missing
884 0 : return;
885 : } else {
886 0 : assert_eq!(expected, Vec::<String>::new());
887 0 : unreachable!();
888 : }
889 : } else {
890 0 : panic!("Unexpected error listing {remote_path}: {e}");
891 : }
892 : }
893 : };
894 :
895 54 : for entry in dir.flatten() {
896 48 : let entry_name = entry.file_name();
897 48 : let fname = entry_name.to_str().unwrap();
898 48 : found.push(String::from(fname));
899 48 : }
900 54 : found.sort();
901 54 :
902 54 : assert_eq!(expected, found);
903 54 : }
904 :
905 30 : fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
906 30 : let dir = match std::fs::read_dir(directory) {
907 24 : Ok(d) => d,
908 : Err(_) => {
909 6 : assert_eq!(expected, &Vec::<String>::new());
910 6 : return;
911 : }
912 : };
913 24 : let mut found = Vec::new();
914 54 : for dentry in dir {
915 30 : let dentry = dentry.unwrap();
916 30 : let file_name = dentry.file_name();
917 30 : let file_name_str = file_name.to_string_lossy();
918 30 : found.push(file_name_str.to_string());
919 30 : }
920 24 : found.sort();
921 24 : assert_eq!(expected, found);
922 30 : }
923 :
924 : #[tokio::test]
925 6 : async fn deletion_queue_smoke() -> anyhow::Result<()> {
926 6 : // Basic test that the deletion queue processes the deletions we pass into it
927 6 : let ctx = setup("deletion_queue_smoke")
928 6 : .await
929 6 : .expect("Failed test setup");
930 6 : let client = ctx.deletion_queue.new_client();
931 6 : client.recover(HashMap::new())?;
932 6 :
933 6 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
934 6 : let tenant_shard_id = ctx.harness.tenant_shard_id;
935 6 :
936 6 : let content: Vec<u8> = "victim1 contents".into();
937 6 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
938 6 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
939 6 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
940 6 :
941 6 : // Exercise the distinction between the generation of the layers
942 6 : // we delete, and the generation of the running Tenant.
943 6 : let layer_generation = Generation::new(0xdeadbeef);
944 6 : let now_generation = Generation::new(0xfeedbeef);
945 6 : let layer_metadata =
946 6 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
947 6 :
948 6 : let remote_layer_file_name_1 =
949 6 : format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
950 6 :
951 6 : // Set mock control plane state to valid for our generation
952 6 : ctx.set_latest_generation(now_generation);
953 6 :
954 6 : // Inject a victim file to remote storage
955 6 : info!("Writing");
956 6 : std::fs::create_dir_all(&remote_timeline_path)?;
957 6 : std::fs::write(
958 6 : remote_timeline_path.join(remote_layer_file_name_1.clone()),
959 6 : content,
960 6 : )?;
961 6 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
962 6 :
963 6 : // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
964 6 : info!("Pushing");
965 6 : client
966 6 : .push_layers(
967 6 : tenant_shard_id,
968 6 : TIMELINE_ID,
969 6 : now_generation,
970 6 : [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
971 6 : )
972 6 : .await?;
973 6 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
974 6 :
975 6 : assert_local_files(&[], &deletion_prefix);
976 6 :
977 6 : // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
978 6 : info!("Flushing");
979 6 : client.flush().await?;
980 6 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
981 6 : assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
982 6 :
983 6 : // File should go away when we execute
984 6 : info!("Flush-executing");
985 18 : client.flush_execute().await?;
986 6 : assert_remote_files(&[], &remote_timeline_path);
987 6 : assert_local_files(&["header-01"], &deletion_prefix);
988 6 :
989 6 : // Flushing on an empty queue should succeed immediately, and not write any lists
990 6 : info!("Flush-executing on empty");
991 18 : client.flush_execute().await?;
992 6 : assert_local_files(&["header-01"], &deletion_prefix);
993 6 :
994 6 : Ok(())
995 6 : }
996 :
997 : #[tokio::test]
998 6 : async fn deletion_queue_validation() -> anyhow::Result<()> {
999 6 : let ctx = setup("deletion_queue_validation")
1000 6 : .await
1001 6 : .expect("Failed test setup");
1002 6 : let client = ctx.deletion_queue.new_client();
1003 6 : client.recover(HashMap::new())?;
1004 6 :
1005 6 : // Generation that the control plane thinks is current
1006 6 : let latest_generation = Generation::new(0xdeadbeef);
1007 6 : // Generation that our DeletionQueue thinks the tenant is running with
1008 6 : let stale_generation = latest_generation.previous();
1009 6 : // Generation that our example layer file was written with
1010 6 : let layer_generation = stale_generation.previous();
1011 6 : let layer_metadata =
1012 6 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1013 6 :
1014 6 : ctx.set_latest_generation(latest_generation);
1015 6 :
1016 6 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1017 6 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1018 6 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1019 6 :
1020 6 : // Initial state: a remote layer exists
1021 6 : let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1022 6 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1023 6 :
1024 6 : tracing::debug!("Pushing...");
1025 6 : client
1026 6 : .push_layers(
1027 6 : tenant_shard_id,
1028 6 : TIMELINE_ID,
1029 6 : stale_generation,
1030 6 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1031 6 : )
1032 6 : .await?;
1033 6 :
1034 6 : // We enqueued the operation in a stale generation: it should have failed validation
1035 6 : tracing::debug!("Flushing...");
1036 18 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1037 6 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1038 6 :
1039 6 : tracing::debug!("Pushing...");
1040 6 : client
1041 6 : .push_layers(
1042 6 : tenant_shard_id,
1043 6 : TIMELINE_ID,
1044 6 : latest_generation,
1045 6 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1046 6 : )
1047 6 : .await?;
1048 6 :
1049 6 : // We enqueued the operation in a fresh generation: it should have passed validation
1050 6 : tracing::debug!("Flushing...");
1051 18 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1052 6 : assert_remote_files(&[], &remote_timeline_path);
1053 6 :
1054 6 : Ok(())
1055 6 : }
1056 :
1057 : #[tokio::test]
1058 6 : async fn deletion_queue_recovery() -> anyhow::Result<()> {
1059 6 : // Basic test that the deletion queue processes the deletions we pass into it
1060 6 : let mut ctx = setup("deletion_queue_recovery")
1061 6 : .await
1062 6 : .expect("Failed test setup");
1063 6 : let client = ctx.deletion_queue.new_client();
1064 6 : client.recover(HashMap::new())?;
1065 6 :
1066 6 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1067 6 :
1068 6 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1069 6 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1070 6 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
1071 6 :
1072 6 : let layer_generation = Generation::new(0xdeadbeef);
1073 6 : let now_generation = Generation::new(0xfeedbeef);
1074 6 : let layer_metadata =
1075 6 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1076 6 :
1077 6 : // Inject a deletion in the generation before generation_now: after restart,
1078 6 : // this deletion should _not_ get executed (only the immediately previous
1079 6 : // generation gets that treatment)
1080 6 : let remote_layer_file_name_historical =
1081 6 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1082 6 : client
1083 6 : .push_layers(
1084 6 : tenant_shard_id,
1085 6 : TIMELINE_ID,
1086 6 : now_generation.previous(),
1087 6 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1088 6 : )
1089 6 : .await?;
1090 6 :
1091 6 : // Inject a deletion in the generation before generation_now: after restart,
1092 6 : // this deletion should get executed, because we execute deletions in the
1093 6 : // immediately previous generation on the same node.
1094 6 : let remote_layer_file_name_previous =
1095 6 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
1096 6 : client
1097 6 : .push_layers(
1098 6 : tenant_shard_id,
1099 6 : TIMELINE_ID,
1100 6 : now_generation,
1101 6 : [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
1102 6 : )
1103 6 : .await?;
1104 6 :
1105 6 : client.flush().await?;
1106 6 : assert_remote_files(
1107 6 : &[
1108 6 : &remote_layer_file_name_historical,
1109 6 : &remote_layer_file_name_previous,
1110 6 : ],
1111 6 : &remote_timeline_path,
1112 6 : );
1113 6 :
1114 6 : // Different generatinos for the same tenant will cause two separate
1115 6 : // deletion lists to be emitted.
1116 6 : assert_local_files(
1117 6 : &["0000000000000001-01.list", "0000000000000002-01.list"],
1118 6 : &deletion_prefix,
1119 6 : );
1120 6 :
1121 6 : // Simulate a node restart: the latest generation advances
1122 6 : let now_generation = now_generation.next();
1123 6 : ctx.set_latest_generation(now_generation);
1124 6 :
1125 6 : // Restart the deletion queue
1126 6 : drop(client);
1127 6 : ctx.restart().await;
1128 6 : let client = ctx.deletion_queue.new_client();
1129 6 : client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
1130 6 :
1131 6 : info!("Flush-executing");
1132 18 : client.flush_execute().await?;
1133 6 : // The deletion from immediately prior generation was executed, the one from
1134 6 : // an older generation was not.
1135 6 : assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
1136 6 : Ok(())
1137 6 : }
1138 : }
1139 :
1140 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
1141 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
1142 : #[cfg(test)]
1143 : pub(crate) mod mock {
1144 : use tracing::info;
1145 :
1146 : use super::*;
1147 : use std::sync::atomic::{AtomicUsize, Ordering};
1148 :
1149 : pub struct ConsumerState {
1150 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
1151 : executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
1152 : cancel: CancellationToken,
1153 : }
1154 :
1155 : impl ConsumerState {
1156 6 : async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
1157 6 : let mut executed = 0;
1158 6 :
1159 6 : info!("Executing all pending deletions");
1160 :
1161 : // Transform all executor messages to generic frontend messages
1162 6 : while let Ok(msg) = self.executor_rx.try_recv() {
1163 0 : match msg {
1164 0 : DeleterMessage::Delete(objects) => {
1165 0 : for path in objects {
1166 0 : match remote_storage.delete(&path, &self.cancel).await {
1167 : Ok(_) => {
1168 0 : debug!("Deleted {path}");
1169 : }
1170 0 : Err(e) => {
1171 0 : error!("Failed to delete {path}, leaking object! ({e})");
1172 : }
1173 : }
1174 0 : executed += 1;
1175 : }
1176 : }
1177 0 : DeleterMessage::Flush(flush_op) => {
1178 0 : flush_op.notify();
1179 0 : }
1180 : }
1181 : }
1182 :
1183 12 : while let Ok(msg) = self.rx.try_recv() {
1184 6 : match msg {
1185 6 : ListWriterQueueMessage::Delete(op) => {
1186 6 : let mut objects = op.objects;
1187 12 : for (layer, meta) in op.layers {
1188 6 : objects.push(remote_layer_path(
1189 6 : &op.tenant_shard_id.tenant_id,
1190 6 : &op.timeline_id,
1191 6 : meta.shard,
1192 6 : &layer,
1193 6 : meta.generation,
1194 6 : ));
1195 6 : }
1196 :
1197 12 : for path in objects {
1198 6 : info!("Executing deletion {path}");
1199 6 : match remote_storage.delete(&path, &self.cancel).await {
1200 : Ok(_) => {
1201 6 : debug!("Deleted {path}");
1202 : }
1203 0 : Err(e) => {
1204 0 : error!("Failed to delete {path}, leaking object! ({e})");
1205 : }
1206 : }
1207 6 : executed += 1;
1208 : }
1209 : }
1210 0 : ListWriterQueueMessage::Flush(op) => {
1211 0 : op.notify();
1212 0 : }
1213 0 : ListWriterQueueMessage::FlushExecute(op) => {
1214 0 : // We have already executed all prior deletions because mock does them inline
1215 0 : op.notify();
1216 0 : }
1217 0 : ListWriterQueueMessage::Recover(_) => {
1218 0 : // no-op in mock
1219 0 : }
1220 : }
1221 6 : info!("All pending deletions have been executed");
1222 : }
1223 :
1224 6 : executed
1225 6 : }
1226 : }
1227 :
1228 : pub struct MockDeletionQueue {
1229 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
1230 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
1231 : executed: Arc<AtomicUsize>,
1232 : remote_storage: Option<GenericRemoteStorage>,
1233 : consumer: std::sync::Mutex<ConsumerState>,
1234 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
1235 : }
1236 :
1237 : impl MockDeletionQueue {
1238 570 : pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
1239 570 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1240 570 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
1241 570 :
1242 570 : let executed = Arc::new(AtomicUsize::new(0));
1243 570 :
1244 570 : Self {
1245 570 : tx,
1246 570 : executor_tx,
1247 570 : executed,
1248 570 : remote_storage,
1249 570 : consumer: std::sync::Mutex::new(ConsumerState {
1250 570 : rx,
1251 570 : executor_rx,
1252 570 : cancel: CancellationToken::new(),
1253 570 : }),
1254 570 : lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
1255 570 : }
1256 570 : }
1257 :
1258 : #[allow(clippy::await_holding_lock)]
1259 6 : pub async fn pump(&self) {
1260 6 : if let Some(remote_storage) = &self.remote_storage {
1261 : // Permit holding mutex across await, because this is only ever
1262 : // called once at a time in tests.
1263 6 : let mut locked = self.consumer.lock().unwrap();
1264 6 : let count = locked.consume(remote_storage).await;
1265 6 : self.executed.fetch_add(count, Ordering::Relaxed);
1266 0 : }
1267 6 : }
1268 :
1269 600 : pub(crate) fn new_client(&self) -> DeletionQueueClient {
1270 600 : DeletionQueueClient {
1271 600 : tx: self.tx.clone(),
1272 600 : executor_tx: self.executor_tx.clone(),
1273 600 : lsn_table: self.lsn_table.clone(),
1274 600 : }
1275 600 : }
1276 : }
1277 :
1278 : /// Test round-trip serialization/deserialization, and test stability of the format
1279 : /// vs. a static expected string for the serialized version.
1280 : #[test]
1281 6 : fn deletion_list_serialization() -> anyhow::Result<()> {
1282 6 : let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
1283 6 : .to_string()
1284 6 : .parse::<TenantShardId>()?;
1285 6 : let timeline_id = "be322c834ed9e709e63b5c9698691910"
1286 6 : .to_string()
1287 6 : .parse::<TimelineId>()?;
1288 6 : let generation = Generation::new(123);
1289 :
1290 6 : let object =
1291 6 : RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
1292 6 : let mut objects = [object].to_vec();
1293 6 :
1294 6 : let mut example = DeletionList::new(1);
1295 6 : example.push(&tenant_id, &timeline_id, generation, &mut objects);
1296 :
1297 6 : let encoded = serde_json::to_string(&example)?;
1298 :
1299 6 : let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
1300 6 : assert_eq!(encoded, expected);
1301 :
1302 6 : let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
1303 6 : assert_eq!(example, decoded);
1304 :
1305 6 : Ok(())
1306 6 : }
1307 : }
|