Line data Source code
1 : mod deleter;
2 : mod list_writer;
3 : mod validator;
4 :
5 : use std::collections::HashMap;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use crate::controller_upcall_client::ControlPlaneGenerationsApi;
10 : use crate::metrics;
11 : use crate::tenant::remote_timeline_client::remote_timeline_path;
12 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
13 : use crate::virtual_file::MaybeFatalIo;
14 : use crate::virtual_file::VirtualFile;
15 : use anyhow::Context;
16 : use camino::Utf8PathBuf;
17 : use pageserver_api::shard::TenantShardId;
18 : use remote_storage::{GenericRemoteStorage, RemotePath};
19 : use serde::Deserialize;
20 : use serde::Serialize;
21 : use thiserror::Error;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::Instrument;
24 : use tracing::{debug, error};
25 : use utils::crashsafe::path_with_suffix_extension;
26 : use utils::generation::Generation;
27 : use utils::id::TimelineId;
28 : use utils::lsn::AtomicLsn;
29 : use utils::lsn::Lsn;
30 :
31 : use self::deleter::Deleter;
32 : use self::list_writer::DeletionOp;
33 : use self::list_writer::ListWriter;
34 : use self::list_writer::RecoverOp;
35 : use self::validator::Validator;
36 : use deleter::DeleterMessage;
37 : use list_writer::ListWriterQueueMessage;
38 : use validator::ValidatorQueueMessage;
39 :
40 : use crate::{config::PageServerConf, tenant::storage_layer::LayerName};
41 :
42 : // TODO: configurable for how long to wait before executing deletions
43 :
44 : /// We aggregate object deletions from many tenants in one place, for several reasons:
45 : /// - Coalesce deletions into fewer DeleteObjects calls
46 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
47 : /// to flush any outstanding deletions.
48 : /// - Globally control throughput of deletions, as these are a low priority task: do
49 : /// not compete with the same S3 clients/connections used for higher priority uploads.
50 : /// - Enable gating deletions on validation of a tenant's generation number, to make
51 : /// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
52 : ///
53 : /// There are two kinds of deletion: deferred and immediate. A deferred deletion
54 : /// may be intentionally delayed to protect passive readers of S3 data, and is
55 : /// subject to a generation number validation step. An immediate deletion is
56 : /// ready to execute immediately, and is only queued up so that it can be coalesced
57 : /// with other deletions in flight.
58 : ///
59 : /// Deferred deletions pass through three steps:
60 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
61 : /// DeletionLists, which are persisted to disk.
62 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
63 : /// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn
64 : /// updates for running timelines.
65 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
66 : /// batches of 1000 keys via DeleteObjects.
67 : ///
68 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
69 : /// two stages and are passed straight into the Deleter.
70 : ///
71 : /// Internally, each stage is joined by a channel to the next. On disk, there is only
72 : /// one queue (of DeletionLists), which is written by the frontend and consumed
73 : /// by the backend.
74 : #[derive(Clone)]
75 : pub struct DeletionQueue {
76 : client: DeletionQueueClient,
77 :
78 : // Parent cancellation token for the tokens passed into background workers
79 : cancel: CancellationToken,
80 : }
81 :
82 : /// Opaque wrapper around individual worker tasks, to avoid making the
83 : /// worker objects themselves public
84 : pub struct DeletionQueueWorkers<C>
85 : where
86 : C: ControlPlaneGenerationsApi + Send + Sync,
87 : {
88 : frontend: ListWriter,
89 : backend: Validator<C>,
90 : executor: Deleter,
91 : }
92 :
93 : impl<C> DeletionQueueWorkers<C>
94 : where
95 : C: ControlPlaneGenerationsApi + Send + Sync + 'static,
96 : {
97 16 : pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
98 16 : let jh_frontend = runtime.spawn(async move {
99 16 : self.frontend
100 16 : .background()
101 16 : .instrument(tracing::info_span!(parent:None, "deletion frontend"))
102 16 : .await
103 16 : });
104 16 : let jh_backend = runtime.spawn(async move {
105 16 : self.backend
106 16 : .background()
107 16 : .instrument(tracing::info_span!(parent:None, "deletion backend"))
108 16 : .await
109 16 : });
110 16 : let jh_executor = runtime.spawn(async move {
111 16 : self.executor
112 16 : .background()
113 16 : .instrument(tracing::info_span!(parent:None, "deletion executor"))
114 16 : .await
115 16 : });
116 16 :
117 16 : runtime.spawn({
118 16 : async move {
119 16 : jh_frontend.await.expect("error joining frontend worker");
120 4 : jh_backend.await.expect("error joining backend worker");
121 4 : drop(jh_executor.await.expect("error joining executor worker"));
122 16 : }
123 16 : })
124 16 : }
125 : }
126 :
127 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
128 : /// another channel, and the receive side will receive a message when the channel
129 : /// we're flushing has reached the FlushOp we sent into it.
130 : ///
131 : /// The only extra behavior beyond the channel is that the notify() method does not
132 : /// return an error when the receive side has been dropped, because in this use case
133 : /// it is harmless (the code that initiated the flush no longer cares about the result).
134 : #[derive(Debug)]
135 : struct FlushOp {
136 : tx: tokio::sync::oneshot::Sender<()>,
137 : }
138 :
139 : impl FlushOp {
140 84 : fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
141 84 : let (tx, rx) = tokio::sync::oneshot::channel::<()>();
142 84 : (Self { tx }, rx)
143 84 : }
144 :
145 88 : fn notify(self) {
146 88 : if self.tx.send(()).is_err() {
147 : // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
148 0 : debug!("deletion queue flush from dropped client");
149 88 : };
150 88 : }
151 : }
152 :
153 : #[derive(Clone, Debug)]
154 : pub struct DeletionQueueClient {
155 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
156 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
157 :
158 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
159 : }
160 :
161 24 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
162 : struct TenantDeletionList {
163 : /// For each Timeline, a list of key fragments to append to the timeline remote path
164 : /// when reconstructing a full key
165 : timelines: HashMap<TimelineId, Vec<String>>,
166 :
167 : /// The generation in which this deletion was emitted: note that this may not be the
168 : /// same as the generation of any layers being deleted. The generation of the layer
169 : /// has already been absorbed into the keys in `objects`
170 : generation: Generation,
171 : }
172 :
173 : impl TenantDeletionList {
174 20 : pub(crate) fn len(&self) -> usize {
175 20 : self.timelines.values().map(|v| v.len()).sum()
176 20 : }
177 : }
178 :
179 : /// Files ending with this suffix will be ignored and erased
180 : /// during recovery as startup.
181 : const TEMP_SUFFIX: &str = "tmp";
182 :
183 48 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
184 : struct DeletionList {
185 : /// Serialization version, for future use
186 : version: u8,
187 :
188 : /// Used for constructing a unique key for each deletion list we write out.
189 : sequence: u64,
190 :
191 : /// To avoid repeating tenant/timeline IDs in every key, we store keys in
192 : /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
193 : /// with one unique generation ID: if someone tries to push a second generation
194 : /// ID for the same tenant, we will start a new DeletionList.
195 : tenants: HashMap<TenantShardId, TenantDeletionList>,
196 :
197 : /// Avoid having to walk `tenants` to calculate the number of keys in
198 : /// the nested deletion lists
199 : size: usize,
200 :
201 : /// Set to true when the list has undergone validation with the control
202 : /// plane and the remaining contents of `tenants` are valid. A list may
203 : /// also be implicitly marked valid by DeletionHeader.validated_sequence
204 : /// advancing to >= DeletionList.sequence
205 : #[serde(default)]
206 : #[serde(skip_serializing_if = "std::ops::Not::not")]
207 : validated: bool,
208 : }
209 :
210 0 : #[derive(Debug, Serialize, Deserialize)]
211 : struct DeletionHeader {
212 : /// Serialization version, for future use
213 : version: u8,
214 :
215 : /// The highest sequence number (inclusive) that has been validated. All deletion
216 : /// lists on disk with a sequence <= this value are safe to execute.
217 : validated_sequence: u64,
218 : }
219 :
220 : impl DeletionHeader {
221 : const VERSION_LATEST: u8 = 1;
222 :
223 16 : fn new(validated_sequence: u64) -> Self {
224 16 : Self {
225 16 : version: Self::VERSION_LATEST,
226 16 : validated_sequence,
227 16 : }
228 16 : }
229 :
230 16 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
231 16 : debug!("Saving deletion list header {:?}", self);
232 16 : let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
233 16 : let header_path = conf.deletion_header_path();
234 16 : let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
235 16 : VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
236 16 : .await
237 16 : .maybe_fatal_err("save deletion header")?;
238 :
239 16 : Ok(())
240 16 : }
241 : }
242 :
243 : impl DeletionList {
244 : const VERSION_LATEST: u8 = 1;
245 40 : fn new(sequence: u64) -> Self {
246 40 : Self {
247 40 : version: Self::VERSION_LATEST,
248 40 : sequence,
249 40 : tenants: HashMap::new(),
250 40 : size: 0,
251 40 : validated: false,
252 40 : }
253 40 : }
254 :
255 56 : fn is_empty(&self) -> bool {
256 56 : self.tenants.is_empty()
257 56 : }
258 :
259 120 : fn len(&self) -> usize {
260 120 : self.size
261 120 : }
262 :
263 : /// Returns true if the push was accepted, false if the caller must start a new
264 : /// deletion list.
265 28 : fn push(
266 28 : &mut self,
267 28 : tenant: &TenantShardId,
268 28 : timeline: &TimelineId,
269 28 : generation: Generation,
270 28 : objects: &mut Vec<RemotePath>,
271 28 : ) -> bool {
272 28 : if objects.is_empty() {
273 : // Avoid inserting an empty TimelineDeletionList: this preserves the property
274 : // that if we have no keys, then self.objects is empty (used in Self::is_empty)
275 0 : return true;
276 28 : }
277 28 :
278 28 : let tenant_entry = self
279 28 : .tenants
280 28 : .entry(*tenant)
281 28 : .or_insert_with(|| TenantDeletionList {
282 24 : timelines: HashMap::new(),
283 24 : generation,
284 28 : });
285 28 :
286 28 : if tenant_entry.generation != generation {
287 : // Only one generation per tenant per list: signal to
288 : // caller to start a new list.
289 4 : return false;
290 24 : }
291 24 :
292 24 : let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
293 24 :
294 24 : let timeline_remote_path = remote_timeline_path(tenant, timeline);
295 24 :
296 24 : self.size += objects.len();
297 24 : timeline_entry.extend(objects.drain(..).map(|p| {
298 24 : p.strip_prefix(&timeline_remote_path)
299 24 : .expect("Timeline paths always start with the timeline prefix")
300 24 : .to_string()
301 24 : }));
302 24 : true
303 28 : }
304 :
305 20 : fn into_remote_paths(self) -> Vec<RemotePath> {
306 20 : let mut result = Vec::new();
307 20 : for (tenant, tenant_deletions) in self.tenants.into_iter() {
308 12 : for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
309 12 : let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
310 12 : result.extend(
311 12 : timeline_layers
312 12 : .into_iter()
313 12 : .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
314 12 : );
315 12 : }
316 : }
317 :
318 20 : result
319 20 : }
320 :
321 28 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
322 28 : let path = conf.deletion_list_path(self.sequence);
323 28 : let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
324 28 :
325 28 : let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
326 28 :
327 28 : VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
328 28 : .await
329 28 : .maybe_fatal_err("save deletion list")
330 28 : .map_err(Into::into)
331 28 : }
332 : }
333 :
334 : impl std::fmt::Display for DeletionList {
335 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 0 : write!(
337 0 : f,
338 0 : "DeletionList<seq={}, tenants={}, keys={}>",
339 0 : self.sequence,
340 0 : self.tenants.len(),
341 0 : self.size
342 0 : )
343 0 : }
344 : }
345 :
346 : struct PendingLsn {
347 : projected: Lsn,
348 : result_slot: Arc<AtomicLsn>,
349 : }
350 :
351 : struct TenantLsnState {
352 : timelines: HashMap<TimelineId, PendingLsn>,
353 :
354 : // In what generation was the most recent update proposed?
355 : generation: Generation,
356 : }
357 :
358 : #[derive(Default)]
359 : struct VisibleLsnUpdates {
360 : tenants: HashMap<TenantShardId, TenantLsnState>,
361 : }
362 :
363 : impl VisibleLsnUpdates {
364 460 : fn new() -> Self {
365 460 : Self {
366 460 : tenants: HashMap::new(),
367 460 : }
368 460 : }
369 : }
370 :
371 : impl std::fmt::Debug for VisibleLsnUpdates {
372 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 0 : write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
374 0 : }
375 : }
376 :
377 : #[derive(Error, Debug)]
378 : pub enum DeletionQueueError {
379 : #[error("Deletion queue unavailable during shutdown")]
380 : ShuttingDown,
381 : }
382 :
383 : impl DeletionQueueClient {
384 : /// This is cancel-safe. If you drop the future before it completes, the message
385 : /// is not pushed, although in the context of the deletion queue it doesn't matter: once
386 : /// we decide to do a deletion the decision is always final.
387 737 : fn do_push<T>(
388 737 : &self,
389 737 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
390 737 : msg: T,
391 737 : ) -> Result<(), DeletionQueueError> {
392 737 : match queue.send(msg) {
393 737 : Ok(_) => Ok(()),
394 0 : Err(e) => {
395 0 : // This shouldn't happen, we should shut down all tenants before
396 0 : // we shut down the global delete queue. If we encounter a bug like this,
397 0 : // we may leak objects as deletions won't be processed.
398 0 : error!("Deletion queue closed while pushing, shutting down? ({e})");
399 0 : Err(DeletionQueueError::ShuttingDown)
400 : }
401 : }
402 737 : }
403 :
404 16 : pub(crate) fn recover(
405 16 : &self,
406 16 : attached_tenants: HashMap<TenantShardId, Generation>,
407 16 : ) -> Result<(), DeletionQueueError> {
408 16 : self.do_push(
409 16 : &self.tx,
410 16 : ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
411 16 : )
412 16 : }
413 :
414 : /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
415 : /// world, it must validate its generation number before doing so. Rather than do this synchronously,
416 : /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
417 : /// recently validated separately.
418 : ///
419 : /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The
420 : /// backend will later wake up and notice that the tenant's generation requires validation.
421 2965 : pub(crate) async fn update_remote_consistent_lsn(
422 2965 : &self,
423 2965 : tenant_shard_id: TenantShardId,
424 2965 : timeline_id: TimelineId,
425 2965 : current_generation: Generation,
426 2965 : lsn: Lsn,
427 2965 : result_slot: Arc<AtomicLsn>,
428 2965 : ) {
429 2965 : let mut locked = self
430 2965 : .lsn_table
431 2965 : .write()
432 2965 : .expect("Lock should never be poisoned");
433 2965 :
434 2965 : let tenant_entry = locked
435 2965 : .tenants
436 2965 : .entry(tenant_shard_id)
437 2965 : .or_insert(TenantLsnState {
438 2965 : timelines: HashMap::new(),
439 2965 : generation: current_generation,
440 2965 : });
441 2965 :
442 2965 : if tenant_entry.generation != current_generation {
443 0 : // Generation might have changed if we were detached and then re-attached: in this case,
444 0 : // state from the previous generation cannot be trusted.
445 0 : tenant_entry.timelines.clear();
446 0 : tenant_entry.generation = current_generation;
447 2965 : }
448 :
449 2965 : tenant_entry.timelines.insert(
450 2965 : timeline_id,
451 2965 : PendingLsn {
452 2965 : projected: lsn,
453 2965 : result_slot,
454 2965 : },
455 2965 : );
456 2965 : }
457 :
458 : /// Submit a list of layers for deletion: this function will return before the deletion is
459 : /// persistent, but it may be executed at any time after this function enters: do not push
460 : /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
461 : /// references them).
462 : ///
463 : /// The `current_generation` is the generation of this pageserver's current attachment. The
464 : /// generations in `layers` are the generations in which those layers were written.
465 673 : pub(crate) fn push_layers(
466 673 : &self,
467 673 : tenant_shard_id: TenantShardId,
468 673 : timeline_id: TimelineId,
469 673 : current_generation: Generation,
470 673 : layers: Vec<(LayerName, LayerFileMetadata)>,
471 673 : ) -> Result<(), DeletionQueueError> {
472 673 : // None generations are not valid for attached tenants: they must always be attached in
473 673 : // a known generation. None generations are still permitted for layers in the index because
474 673 : // they may be historical.
475 673 : assert!(!current_generation.is_none());
476 :
477 673 : metrics::DELETION_QUEUE
478 673 : .keys_submitted
479 673 : .inc_by(layers.len() as u64);
480 673 : self.do_push(
481 673 : &self.tx,
482 673 : ListWriterQueueMessage::Delete(DeletionOp {
483 673 : tenant_shard_id,
484 673 : timeline_id,
485 673 : layers,
486 673 : generation: current_generation,
487 673 : objects: Vec::new(),
488 673 : }),
489 673 : )
490 673 : }
491 :
492 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
493 48 : async fn do_flush<T>(
494 48 : &self,
495 48 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
496 48 : msg: T,
497 48 : rx: tokio::sync::oneshot::Receiver<()>,
498 48 : ) -> Result<(), DeletionQueueError> {
499 48 : self.do_push(queue, msg)?;
500 48 : if rx.await.is_err() {
501 : // This shouldn't happen if tenants are shut down before deletion queue. If we
502 : // encounter a bug like this, then a flusher will incorrectly believe it has flushed
503 : // when it hasn't, possibly leading to leaking objects.
504 0 : error!("Deletion queue dropped flush op while client was still waiting");
505 0 : Err(DeletionQueueError::ShuttingDown)
506 : } else {
507 48 : Ok(())
508 : }
509 48 : }
510 :
511 : /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
512 : ///
513 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
514 28 : pub async fn flush(&self) -> Result<(), DeletionQueueError> {
515 28 : let (flush_op, rx) = FlushOp::new();
516 28 : self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
517 28 : .await
518 28 : }
519 :
520 : /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
521 : /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
522 : /// detach where flushing is nice but not necessary.
523 : ///
524 : /// This function provides no guarantees of work being done.
525 0 : pub fn flush_advisory(&self) {
526 0 : let (flush_op, _) = FlushOp::new();
527 0 :
528 0 : // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
529 0 : drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
530 0 : }
531 :
532 : // Wait until all previous deletions are executed
533 20 : pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
534 20 : debug!("flush_execute: flushing to deletion lists...");
535 : // Flush any buffered work to deletion lists
536 20 : self.flush().await?;
537 :
538 : // Flush the backend into the executor of deletion lists
539 20 : let (flush_op, rx) = FlushOp::new();
540 20 : debug!("flush_execute: flushing backend...");
541 20 : self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
542 20 : .await?;
543 20 : debug!("flush_execute: finished flushing backend...");
544 :
545 : // Flush any immediate-mode deletions (the above backend flush will only flush
546 : // the executor if deletions had flowed through the backend)
547 20 : debug!("flush_execute: flushing execution...");
548 20 : self.flush_immediate().await?;
549 20 : debug!("flush_execute: finished flushing execution...");
550 20 : Ok(())
551 20 : }
552 :
553 : /// This interface bypasses the persistent deletion queue, and any validation
554 : /// that this pageserver is still elegible to execute the deletions. It is for
555 : /// use in timeline deletions, where the control plane is telling us we may
556 : /// delete everything in the timeline.
557 : ///
558 : /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
559 0 : pub(crate) async fn push_immediate(
560 0 : &self,
561 0 : objects: Vec<RemotePath>,
562 0 : ) -> Result<(), DeletionQueueError> {
563 0 : metrics::DELETION_QUEUE
564 0 : .keys_submitted
565 0 : .inc_by(objects.len() as u64);
566 0 : self.executor_tx
567 0 : .send(DeleterMessage::Delete(objects))
568 0 : .await
569 0 : .map_err(|_| DeletionQueueError::ShuttingDown)
570 0 : }
571 :
572 : /// Companion to push_immediate. When this returns Ok, all prior objects sent
573 : /// into push_immediate have been deleted from remote storage.
574 20 : pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
575 20 : let (flush_op, rx) = FlushOp::new();
576 20 : self.executor_tx
577 20 : .send(DeleterMessage::Flush(flush_op))
578 20 : .await
579 20 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
580 :
581 20 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
582 20 : }
583 : }
584 :
585 : impl DeletionQueue {
586 16 : pub fn new_client(&self) -> DeletionQueueClient {
587 16 : self.client.clone()
588 16 : }
589 :
590 : /// Caller may use the returned object to construct clients with new_client.
591 : /// Caller should tokio::spawn the background() members of the two worker objects returned:
592 : /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
593 16 : pub fn new<C>(
594 16 : remote_storage: GenericRemoteStorage,
595 16 : controller_upcall_client: Option<C>,
596 16 : conf: &'static PageServerConf,
597 16 : ) -> (Self, DeletionQueueWorkers<C>)
598 16 : where
599 16 : C: ControlPlaneGenerationsApi + Send + Sync,
600 16 : {
601 16 : // Unbounded channel: enables non-async functions to submit deletions. The actual length is
602 16 : // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
603 16 : // enough to avoid this taking pathologically large amount of memory.
604 16 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
605 16 :
606 16 : // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
607 16 : let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
608 16 :
609 16 : // Shallow channel: it carries lists of paths, and we expect the main queueing to
610 16 : // happen in the backend (persistent), not in this queue.
611 16 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
612 16 :
613 16 : let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
614 16 :
615 16 : // The deletion queue has an independent cancellation token to
616 16 : // the general pageserver shutdown token, because it stays alive a bit
617 16 : // longer to flush after Tenants have all been torn down.
618 16 : let cancel = CancellationToken::new();
619 16 :
620 16 : (
621 16 : Self {
622 16 : client: DeletionQueueClient {
623 16 : tx,
624 16 : executor_tx: executor_tx.clone(),
625 16 : lsn_table: lsn_table.clone(),
626 16 : },
627 16 : cancel: cancel.clone(),
628 16 : },
629 16 : DeletionQueueWorkers {
630 16 : frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
631 16 : backend: Validator::new(
632 16 : conf,
633 16 : backend_rx,
634 16 : executor_tx,
635 16 : controller_upcall_client,
636 16 : lsn_table.clone(),
637 16 : cancel.clone(),
638 16 : ),
639 16 : executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
640 16 : },
641 16 : )
642 16 : }
643 :
644 0 : pub async fn shutdown(&mut self, timeout: Duration) {
645 0 : match tokio::time::timeout(timeout, self.client.flush()).await {
646 : Ok(Ok(())) => {
647 0 : tracing::info!("Deletion queue flushed successfully on shutdown")
648 : }
649 : Ok(Err(DeletionQueueError::ShuttingDown)) => {
650 : // This is not harmful for correctness, but is unexpected: the deletion
651 : // queue's workers should stay alive as long as there are any client handles instantiated.
652 0 : tracing::warn!("Deletion queue stopped prematurely");
653 : }
654 0 : Err(_timeout) => {
655 0 : tracing::warn!("Timed out flushing deletion queue on shutdown")
656 : }
657 : }
658 :
659 : // We only cancel _after_ flushing: otherwise we would be shutting down the
660 : // components that do the flush.
661 0 : self.cancel.cancel();
662 0 : }
663 : }
664 :
665 : #[cfg(test)]
666 : mod test {
667 : use camino::Utf8Path;
668 : use hex_literal::hex;
669 : use pageserver_api::{key::Key, shard::ShardIndex, upcall_api::ReAttachResponseTenant};
670 : use std::{io::ErrorKind, time::Duration};
671 : use tracing::info;
672 :
673 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
674 : use tokio::task::JoinHandle;
675 :
676 : use crate::{
677 : controller_upcall_client::RetryForeverError,
678 : tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
679 : };
680 :
681 : use super::*;
682 : pub const TIMELINE_ID: TimelineId =
683 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
684 :
685 : pub const EXAMPLE_LAYER_NAME: LayerName = LayerName::Delta(DeltaLayerName {
686 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
687 : lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
688 : });
689 :
690 : // When you need a second layer in a test.
691 : pub const EXAMPLE_LAYER_NAME_ALT: LayerName = LayerName::Delta(DeltaLayerName {
692 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
693 : lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
694 : });
695 :
696 : struct TestSetup {
697 : harness: TenantHarness,
698 : remote_fs_dir: Utf8PathBuf,
699 : storage: GenericRemoteStorage,
700 : mock_control_plane: MockControlPlane,
701 : deletion_queue: DeletionQueue,
702 : worker_join: JoinHandle<()>,
703 : }
704 :
705 : impl TestSetup {
706 : /// Simulate a pageserver restart by destroying and recreating the deletion queue
707 4 : async fn restart(&mut self) {
708 4 : let (deletion_queue, workers) = DeletionQueue::new(
709 4 : self.storage.clone(),
710 4 : Some(self.mock_control_plane.clone()),
711 4 : self.harness.conf,
712 4 : );
713 4 :
714 4 : tracing::debug!("Spawning worker for new queue queue");
715 4 : let worker_join = workers.spawn_with(&tokio::runtime::Handle::current());
716 4 :
717 4 : let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
718 4 : let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
719 4 :
720 4 : tracing::debug!("Joining worker from previous queue");
721 4 : old_deletion_queue.cancel.cancel();
722 4 : old_worker_join
723 4 : .await
724 4 : .expect("Failed to join workers for previous deletion queue");
725 4 : }
726 :
727 12 : fn set_latest_generation(&self, gen: Generation) {
728 12 : let tenant_shard_id = self.harness.tenant_shard_id;
729 12 : self.mock_control_plane
730 12 : .latest_generation
731 12 : .lock()
732 12 : .unwrap()
733 12 : .insert(tenant_shard_id, gen);
734 12 : }
735 :
736 : /// Returns remote layer file name, suitable for use in assert_remote_files
737 12 : fn write_remote_layer(
738 12 : &self,
739 12 : file_name: LayerName,
740 12 : gen: Generation,
741 12 : ) -> anyhow::Result<String> {
742 12 : let tenant_shard_id = self.harness.tenant_shard_id;
743 12 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
744 12 : let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
745 12 : std::fs::create_dir_all(&remote_timeline_path)?;
746 12 : let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
747 12 :
748 12 : let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
749 12 :
750 12 : std::fs::write(
751 12 : remote_timeline_path.join(remote_layer_file_name.clone()),
752 12 : content,
753 12 : )?;
754 :
755 12 : Ok(remote_layer_file_name)
756 12 : }
757 : }
758 :
759 : #[derive(Debug, Clone)]
760 : struct MockControlPlane {
761 : pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
762 : }
763 :
764 : impl MockControlPlane {
765 12 : fn new() -> Self {
766 12 : Self {
767 12 : latest_generation: Arc::default(),
768 12 : }
769 12 : }
770 : }
771 :
772 : impl ControlPlaneGenerationsApi for MockControlPlane {
773 0 : async fn re_attach(
774 0 : &self,
775 0 : _conf: &PageServerConf,
776 0 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
777 0 : unimplemented!()
778 : }
779 :
780 16 : async fn validate(
781 16 : &self,
782 16 : tenants: Vec<(TenantShardId, Generation)>,
783 16 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
784 16 : let mut result = HashMap::new();
785 16 :
786 16 : let latest_generation = self.latest_generation.lock().unwrap();
787 :
788 32 : for (tenant_shard_id, generation) in tenants {
789 16 : if let Some(latest) = latest_generation.get(&tenant_shard_id) {
790 16 : result.insert(tenant_shard_id, *latest == generation);
791 16 : }
792 : }
793 :
794 16 : Ok(result)
795 16 : }
796 : }
797 :
798 12 : async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
799 12 : let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
800 12 : let harness = TenantHarness::create(test_name).await?;
801 :
802 : // We do not load() the harness: we only need its config and remote_storage
803 :
804 : // Set up a GenericRemoteStorage targetting a directory
805 12 : let remote_fs_dir = harness.conf.workdir.join("remote_fs");
806 12 : std::fs::create_dir_all(remote_fs_dir)?;
807 12 : let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
808 12 : let storage_config = RemoteStorageConfig {
809 12 : storage: RemoteStorageKind::LocalFs {
810 12 : local_path: remote_fs_dir.clone(),
811 12 : },
812 12 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
813 12 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
814 12 : };
815 12 : let storage = GenericRemoteStorage::from_config(&storage_config)
816 12 : .await
817 12 : .unwrap();
818 12 :
819 12 : let mock_control_plane = MockControlPlane::new();
820 12 :
821 12 : let (deletion_queue, worker) = DeletionQueue::new(
822 12 : storage.clone(),
823 12 : Some(mock_control_plane.clone()),
824 12 : harness.conf,
825 12 : );
826 12 :
827 12 : let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
828 12 :
829 12 : Ok(TestSetup {
830 12 : harness,
831 12 : remote_fs_dir,
832 12 : storage,
833 12 : mock_control_plane,
834 12 : deletion_queue,
835 12 : worker_join,
836 12 : })
837 12 : }
838 :
839 : // TODO: put this in a common location so that we can share with remote_timeline_client's tests
840 36 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
841 36 : let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
842 36 : expected.sort();
843 36 :
844 36 : let mut found: Vec<String> = Vec::new();
845 36 : let dir = match std::fs::read_dir(remote_path) {
846 36 : Ok(d) => d,
847 0 : Err(e) => {
848 0 : if e.kind() == ErrorKind::NotFound {
849 0 : if expected.is_empty() {
850 : // We are asserting prefix is empty: it is expected that the dir is missing
851 0 : return;
852 : } else {
853 0 : assert_eq!(expected, Vec::<String>::new());
854 0 : unreachable!();
855 : }
856 : } else {
857 0 : panic!("Unexpected error listing {remote_path}: {e}");
858 : }
859 : }
860 : };
861 :
862 36 : for entry in dir.flatten() {
863 32 : let entry_name = entry.file_name();
864 32 : let fname = entry_name.to_str().unwrap();
865 32 : found.push(String::from(fname));
866 32 : }
867 36 : found.sort();
868 36 :
869 36 : assert_eq!(expected, found);
870 36 : }
871 :
872 20 : fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
873 20 : let dir = match std::fs::read_dir(directory) {
874 16 : Ok(d) => d,
875 : Err(_) => {
876 4 : assert_eq!(expected, &Vec::<String>::new());
877 4 : return;
878 : }
879 : };
880 16 : let mut found = Vec::new();
881 36 : for dentry in dir {
882 20 : let dentry = dentry.unwrap();
883 20 : let file_name = dentry.file_name();
884 20 : let file_name_str = file_name.to_string_lossy();
885 20 : found.push(file_name_str.to_string());
886 20 : }
887 16 : found.sort();
888 16 : assert_eq!(expected, found);
889 20 : }
890 :
891 : #[tokio::test]
892 4 : async fn deletion_queue_smoke() -> anyhow::Result<()> {
893 4 : // Basic test that the deletion queue processes the deletions we pass into it
894 4 : let ctx = setup("deletion_queue_smoke")
895 4 : .await
896 4 : .expect("Failed test setup");
897 4 : let client = ctx.deletion_queue.new_client();
898 4 : client.recover(HashMap::new())?;
899 4 :
900 4 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
901 4 : let tenant_shard_id = ctx.harness.tenant_shard_id;
902 4 :
903 4 : let content: Vec<u8> = "victim1 contents".into();
904 4 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
905 4 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
906 4 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
907 4 :
908 4 : // Exercise the distinction between the generation of the layers
909 4 : // we delete, and the generation of the running Tenant.
910 4 : let layer_generation = Generation::new(0xdeadbeef);
911 4 : let now_generation = Generation::new(0xfeedbeef);
912 4 : let layer_metadata =
913 4 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
914 4 :
915 4 : let remote_layer_file_name_1 =
916 4 : format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
917 4 :
918 4 : // Set mock control plane state to valid for our generation
919 4 : ctx.set_latest_generation(now_generation);
920 4 :
921 4 : // Inject a victim file to remote storage
922 4 : info!("Writing");
923 4 : std::fs::create_dir_all(&remote_timeline_path)?;
924 4 : std::fs::write(
925 4 : remote_timeline_path.join(remote_layer_file_name_1.clone()),
926 4 : content,
927 4 : )?;
928 4 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
929 4 :
930 4 : // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
931 4 : info!("Pushing");
932 4 : client.push_layers(
933 4 : tenant_shard_id,
934 4 : TIMELINE_ID,
935 4 : now_generation,
936 4 : [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
937 4 : )?;
938 4 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
939 4 :
940 4 : assert_local_files(&[], &deletion_prefix);
941 4 :
942 4 : // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
943 4 : info!("Flushing");
944 4 : client.flush().await?;
945 4 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
946 4 : assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
947 4 :
948 4 : // File should go away when we execute
949 4 : info!("Flush-executing");
950 4 : client.flush_execute().await?;
951 4 : assert_remote_files(&[], &remote_timeline_path);
952 4 : assert_local_files(&["header-01"], &deletion_prefix);
953 4 :
954 4 : // Flushing on an empty queue should succeed immediately, and not write any lists
955 4 : info!("Flush-executing on empty");
956 4 : client.flush_execute().await?;
957 4 : assert_local_files(&["header-01"], &deletion_prefix);
958 4 :
959 4 : Ok(())
960 4 : }
961 :
962 : #[tokio::test]
963 4 : async fn deletion_queue_validation() -> anyhow::Result<()> {
964 4 : let ctx = setup("deletion_queue_validation")
965 4 : .await
966 4 : .expect("Failed test setup");
967 4 : let client = ctx.deletion_queue.new_client();
968 4 : client.recover(HashMap::new())?;
969 4 :
970 4 : // Generation that the control plane thinks is current
971 4 : let latest_generation = Generation::new(0xdeadbeef);
972 4 : // Generation that our DeletionQueue thinks the tenant is running with
973 4 : let stale_generation = latest_generation.previous();
974 4 : // Generation that our example layer file was written with
975 4 : let layer_generation = stale_generation.previous();
976 4 : let layer_metadata =
977 4 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
978 4 :
979 4 : ctx.set_latest_generation(latest_generation);
980 4 :
981 4 : let tenant_shard_id = ctx.harness.tenant_shard_id;
982 4 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
983 4 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
984 4 :
985 4 : // Initial state: a remote layer exists
986 4 : let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
987 4 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
988 4 :
989 4 : tracing::debug!("Pushing...");
990 4 : client.push_layers(
991 4 : tenant_shard_id,
992 4 : TIMELINE_ID,
993 4 : stale_generation,
994 4 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
995 4 : )?;
996 4 :
997 4 : // We enqueued the operation in a stale generation: it should have failed validation
998 4 : tracing::debug!("Flushing...");
999 4 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1000 4 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1001 4 :
1002 4 : tracing::debug!("Pushing...");
1003 4 : client.push_layers(
1004 4 : tenant_shard_id,
1005 4 : TIMELINE_ID,
1006 4 : latest_generation,
1007 4 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1008 4 : )?;
1009 4 :
1010 4 : // We enqueued the operation in a fresh generation: it should have passed validation
1011 4 : tracing::debug!("Flushing...");
1012 4 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1013 4 : assert_remote_files(&[], &remote_timeline_path);
1014 4 :
1015 4 : Ok(())
1016 4 : }
1017 :
1018 : #[tokio::test]
1019 4 : async fn deletion_queue_recovery() -> anyhow::Result<()> {
1020 4 : // Basic test that the deletion queue processes the deletions we pass into it
1021 4 : let mut ctx = setup("deletion_queue_recovery")
1022 4 : .await
1023 4 : .expect("Failed test setup");
1024 4 : let client = ctx.deletion_queue.new_client();
1025 4 : client.recover(HashMap::new())?;
1026 4 :
1027 4 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1028 4 :
1029 4 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1030 4 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1031 4 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
1032 4 :
1033 4 : let layer_generation = Generation::new(0xdeadbeef);
1034 4 : let now_generation = Generation::new(0xfeedbeef);
1035 4 : let layer_metadata =
1036 4 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1037 4 :
1038 4 : // Inject a deletion in the generation before generation_now: after restart,
1039 4 : // this deletion should _not_ get executed (only the immediately previous
1040 4 : // generation gets that treatment)
1041 4 : let remote_layer_file_name_historical =
1042 4 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1043 4 : client.push_layers(
1044 4 : tenant_shard_id,
1045 4 : TIMELINE_ID,
1046 4 : now_generation.previous(),
1047 4 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1048 4 : )?;
1049 4 :
1050 4 : // Inject a deletion in the generation before generation_now: after restart,
1051 4 : // this deletion should get executed, because we execute deletions in the
1052 4 : // immediately previous generation on the same node.
1053 4 : let remote_layer_file_name_previous =
1054 4 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
1055 4 : client.push_layers(
1056 4 : tenant_shard_id,
1057 4 : TIMELINE_ID,
1058 4 : now_generation,
1059 4 : [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
1060 4 : )?;
1061 4 :
1062 4 : client.flush().await?;
1063 4 : assert_remote_files(
1064 4 : &[
1065 4 : &remote_layer_file_name_historical,
1066 4 : &remote_layer_file_name_previous,
1067 4 : ],
1068 4 : &remote_timeline_path,
1069 4 : );
1070 4 :
1071 4 : // Different generatinos for the same tenant will cause two separate
1072 4 : // deletion lists to be emitted.
1073 4 : assert_local_files(
1074 4 : &["0000000000000001-01.list", "0000000000000002-01.list"],
1075 4 : &deletion_prefix,
1076 4 : );
1077 4 :
1078 4 : // Simulate a node restart: the latest generation advances
1079 4 : let now_generation = now_generation.next();
1080 4 : ctx.set_latest_generation(now_generation);
1081 4 :
1082 4 : // Restart the deletion queue
1083 4 : drop(client);
1084 4 : ctx.restart().await;
1085 4 : let client = ctx.deletion_queue.new_client();
1086 4 : client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
1087 4 :
1088 4 : info!("Flush-executing");
1089 4 : client.flush_execute().await?;
1090 4 : // The deletion from immediately prior generation was executed, the one from
1091 4 : // an older generation was not.
1092 4 : assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
1093 4 : Ok(())
1094 4 : }
1095 : }
1096 :
1097 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
1098 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
1099 : #[cfg(test)]
1100 : pub(crate) mod mock {
1101 : use tracing::info;
1102 :
1103 : use super::*;
1104 : use crate::tenant::remote_timeline_client::remote_layer_path;
1105 : use std::sync::atomic::{AtomicUsize, Ordering};
1106 :
1107 : pub struct ConsumerState {
1108 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
1109 : executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
1110 : cancel: CancellationToken,
1111 : executed: Arc<AtomicUsize>,
1112 : }
1113 :
1114 : impl ConsumerState {
1115 444 : async fn consume(&mut self, remote_storage: &GenericRemoteStorage) {
1116 444 : info!("Executing all pending deletions");
1117 :
1118 : // Transform all executor messages to generic frontend messages
1119 1019 : loop {
1120 1019 : use either::Either;
1121 1019 : let msg = tokio::select! {
1122 1019 : left = self.executor_rx.recv() => Either::Left(left),
1123 1019 : right = self.rx.recv() => Either::Right(right),
1124 : };
1125 4 : match msg {
1126 0 : Either::Left(None) => break,
1127 0 : Either::Right(None) => break,
1128 0 : Either::Left(Some(DeleterMessage::Delete(objects))) => {
1129 0 : for path in objects {
1130 0 : match remote_storage.delete(&path, &self.cancel).await {
1131 : Ok(_) => {
1132 0 : debug!("Deleted {path}");
1133 : }
1134 0 : Err(e) => {
1135 0 : error!("Failed to delete {path}, leaking object! ({e})");
1136 : }
1137 : }
1138 0 : self.executed.fetch_add(1, Ordering::Relaxed);
1139 : }
1140 : }
1141 4 : Either::Left(Some(DeleterMessage::Flush(flush_op))) => {
1142 4 : flush_op.notify();
1143 4 : }
1144 578 : Either::Right(Some(ListWriterQueueMessage::Delete(op))) => {
1145 578 : let mut objects = op.objects;
1146 1156 : for (layer, meta) in op.layers {
1147 578 : objects.push(remote_layer_path(
1148 578 : &op.tenant_shard_id.tenant_id,
1149 578 : &op.timeline_id,
1150 578 : meta.shard,
1151 578 : &layer,
1152 578 : meta.generation,
1153 578 : ));
1154 578 : }
1155 :
1156 1149 : for path in objects {
1157 578 : info!("Executing deletion {path}");
1158 578 : match remote_storage.delete(&path, &self.cancel).await {
1159 : Ok(_) => {
1160 571 : debug!("Deleted {path}");
1161 : }
1162 0 : Err(e) => {
1163 0 : error!("Failed to delete {path}, leaking object! ({e})");
1164 : }
1165 : }
1166 571 : self.executed.fetch_add(1, Ordering::Relaxed);
1167 : }
1168 : }
1169 0 : Either::Right(Some(ListWriterQueueMessage::Flush(op))) => {
1170 0 : op.notify();
1171 0 : }
1172 0 : Either::Right(Some(ListWriterQueueMessage::FlushExecute(op))) => {
1173 0 : // We have already executed all prior deletions because mock does them inline
1174 0 : op.notify();
1175 0 : }
1176 0 : Either::Right(Some(ListWriterQueueMessage::Recover(_))) => {
1177 0 : // no-op in mock
1178 0 : }
1179 : }
1180 : }
1181 0 : }
1182 : }
1183 :
1184 : pub struct MockDeletionQueue {
1185 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
1186 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
1187 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
1188 : }
1189 :
1190 : impl MockDeletionQueue {
1191 444 : pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
1192 444 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1193 444 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
1194 444 :
1195 444 : let executed = Arc::new(AtomicUsize::new(0));
1196 444 :
1197 444 : let mut consumer = ConsumerState {
1198 444 : rx,
1199 444 : executor_rx,
1200 444 : cancel: CancellationToken::new(),
1201 444 : executed: executed.clone(),
1202 444 : };
1203 444 :
1204 444 : tokio::spawn(async move {
1205 444 : if let Some(remote_storage) = &remote_storage {
1206 444 : consumer.consume(remote_storage).await;
1207 0 : }
1208 444 : });
1209 444 :
1210 444 : Self {
1211 444 : tx,
1212 444 : executor_tx,
1213 444 : lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
1214 444 : }
1215 444 : }
1216 :
1217 : #[allow(clippy::await_holding_lock)]
1218 4 : pub async fn pump(&self) {
1219 4 : let (tx, rx) = tokio::sync::oneshot::channel();
1220 4 : self.executor_tx
1221 4 : .send(DeleterMessage::Flush(FlushOp { tx }))
1222 4 : .await
1223 4 : .expect("Failed to send flush message");
1224 4 : rx.await.ok();
1225 4 : }
1226 :
1227 464 : pub(crate) fn new_client(&self) -> DeletionQueueClient {
1228 464 : DeletionQueueClient {
1229 464 : tx: self.tx.clone(),
1230 464 : executor_tx: self.executor_tx.clone(),
1231 464 : lsn_table: self.lsn_table.clone(),
1232 464 : }
1233 464 : }
1234 : }
1235 :
1236 : /// Test round-trip serialization/deserialization, and test stability of the format
1237 : /// vs. a static expected string for the serialized version.
1238 : #[test]
1239 4 : fn deletion_list_serialization() -> anyhow::Result<()> {
1240 4 : let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
1241 4 : .to_string()
1242 4 : .parse::<TenantShardId>()?;
1243 4 : let timeline_id = "be322c834ed9e709e63b5c9698691910"
1244 4 : .to_string()
1245 4 : .parse::<TimelineId>()?;
1246 4 : let generation = Generation::new(123);
1247 :
1248 4 : let object =
1249 4 : RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
1250 4 : let mut objects = [object].to_vec();
1251 4 :
1252 4 : let mut example = DeletionList::new(1);
1253 4 : example.push(&tenant_id, &timeline_id, generation, &mut objects);
1254 :
1255 4 : let encoded = serde_json::to_string(&example)?;
1256 :
1257 4 : let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
1258 4 : assert_eq!(encoded, expected);
1259 :
1260 4 : let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
1261 4 : assert_eq!(example, decoded);
1262 :
1263 4 : Ok(())
1264 4 : }
1265 : }
|