Line data Source code
1 : mod deleter;
2 : mod list_writer;
3 : mod validator;
4 :
5 : use std::collections::HashMap;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::Context;
10 : use camino::Utf8PathBuf;
11 : use deleter::DeleterMessage;
12 : use list_writer::ListWriterQueueMessage;
13 : use pageserver_api::shard::TenantShardId;
14 : use remote_storage::{GenericRemoteStorage, RemotePath};
15 : use serde::{Deserialize, Serialize};
16 : use thiserror::Error;
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::{Instrument, debug, error};
19 : use utils::crashsafe::path_with_suffix_extension;
20 : use utils::generation::Generation;
21 : use utils::id::TimelineId;
22 : use utils::lsn::{AtomicLsn, Lsn};
23 : use validator::ValidatorQueueMessage;
24 :
25 : use self::deleter::Deleter;
26 : use self::list_writer::{DeletionOp, ListWriter, RecoverOp};
27 : use self::validator::Validator;
28 : use crate::config::PageServerConf;
29 : use crate::controller_upcall_client::StorageControllerUpcallApi;
30 : use crate::metrics;
31 : use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_timeline_path};
32 : use crate::tenant::storage_layer::LayerName;
33 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
34 :
35 : // TODO: configurable for how long to wait before executing deletions
36 :
37 : /// We aggregate object deletions from many tenants in one place, for several reasons:
38 : /// - Coalesce deletions into fewer DeleteObjects calls
39 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
40 : /// to flush any outstanding deletions.
41 : /// - Globally control throughput of deletions, as these are a low priority task: do
42 : /// not compete with the same S3 clients/connections used for higher priority uploads.
43 : /// - Enable gating deletions on validation of a tenant's generation number, to make
44 : /// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
45 : ///
46 : /// There are two kinds of deletion: deferred and immediate. A deferred deletion
47 : /// may be intentionally delayed to protect passive readers of S3 data, and is
48 : /// subject to a generation number validation step. An immediate deletion is
49 : /// ready to execute immediately, and is only queued up so that it can be coalesced
50 : /// with other deletions in flight.
51 : ///
52 : /// Deferred deletions pass through three steps:
53 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
54 : /// DeletionLists, which are persisted to disk.
55 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
56 : /// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn
57 : /// updates for running timelines.
58 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
59 : /// batches of 1000 keys via DeleteObjects.
60 : ///
61 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
62 : /// two stages and are passed straight into the Deleter.
63 : ///
64 : /// Internally, each stage is joined by a channel to the next. On disk, there is only
65 : /// one queue (of DeletionLists), which is written by the frontend and consumed
66 : /// by the backend.
67 : #[derive(Clone)]
68 : pub struct DeletionQueue {
69 : client: DeletionQueueClient,
70 :
71 : // Parent cancellation token for the tokens passed into background workers
72 : cancel: CancellationToken,
73 : }
74 :
75 : /// Opaque wrapper around individual worker tasks, to avoid making the
76 : /// worker objects themselves public
77 : pub struct DeletionQueueWorkers<C>
78 : where
79 : C: StorageControllerUpcallApi + Send + Sync,
80 : {
81 : frontend: ListWriter,
82 : backend: Validator<C>,
83 : executor: Deleter,
84 : }
85 :
86 : impl<C> DeletionQueueWorkers<C>
87 : where
88 : C: StorageControllerUpcallApi + Send + Sync + 'static,
89 : {
90 48 : pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
91 48 : let jh_frontend = runtime.spawn(async move {
92 48 : self.frontend
93 48 : .background()
94 48 : .instrument(tracing::info_span!(parent:None, "deletion frontend"))
95 48 : .await
96 48 : });
97 48 : let jh_backend = runtime.spawn(async move {
98 48 : self.backend
99 48 : .background()
100 48 : .instrument(tracing::info_span!(parent:None, "deletion backend"))
101 48 : .await
102 48 : });
103 48 : let jh_executor = runtime.spawn(async move {
104 48 : self.executor
105 48 : .background()
106 48 : .instrument(tracing::info_span!(parent:None, "deletion executor"))
107 48 : .await
108 48 : });
109 48 :
110 48 : runtime.spawn({
111 48 : async move {
112 48 : jh_frontend.await.expect("error joining frontend worker");
113 12 : jh_backend.await.expect("error joining backend worker");
114 12 : drop(jh_executor.await.expect("error joining executor worker"));
115 48 : }
116 48 : })
117 48 : }
118 : }
119 :
120 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
121 : /// another channel, and the receive side will receive a message when the channel
122 : /// we're flushing has reached the FlushOp we sent into it.
123 : ///
124 : /// The only extra behavior beyond the channel is that the notify() method does not
125 : /// return an error when the receive side has been dropped, because in this use case
126 : /// it is harmless (the code that initiated the flush no longer cares about the result).
127 : #[derive(Debug)]
128 : struct FlushOp {
129 : tx: tokio::sync::oneshot::Sender<()>,
130 : }
131 :
132 : impl FlushOp {
133 252 : fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
134 252 : let (tx, rx) = tokio::sync::oneshot::channel::<()>();
135 252 : (Self { tx }, rx)
136 252 : }
137 :
138 264 : fn notify(self) {
139 264 : if self.tx.send(()).is_err() {
140 : // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
141 0 : debug!("deletion queue flush from dropped client");
142 264 : };
143 264 : }
144 : }
145 :
146 : #[derive(Clone, Debug)]
147 : pub struct DeletionQueueClient {
148 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
149 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
150 :
151 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
152 : }
153 :
154 72 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
155 : struct TenantDeletionList {
156 : /// For each Timeline, a list of key fragments to append to the timeline remote path
157 : /// when reconstructing a full key
158 : timelines: HashMap<TimelineId, Vec<String>>,
159 :
160 : /// The generation in which this deletion was emitted: note that this may not be the
161 : /// same as the generation of any layers being deleted. The generation of the layer
162 : /// has already been absorbed into the keys in `objects`
163 : generation: Generation,
164 : }
165 :
166 : impl TenantDeletionList {
167 60 : pub(crate) fn len(&self) -> usize {
168 60 : self.timelines.values().map(|v| v.len()).sum()
169 60 : }
170 : }
171 :
172 : /// Files ending with this suffix will be ignored and erased
173 : /// during recovery as startup.
174 : const TEMP_SUFFIX: &str = "tmp";
175 :
176 144 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
177 : struct DeletionList {
178 : /// Serialization version, for future use
179 : version: u8,
180 :
181 : /// Used for constructing a unique key for each deletion list we write out.
182 : sequence: u64,
183 :
184 : /// To avoid repeating tenant/timeline IDs in every key, we store keys in
185 : /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
186 : /// with one unique generation ID: if someone tries to push a second generation
187 : /// ID for the same tenant, we will start a new DeletionList.
188 : tenants: HashMap<TenantShardId, TenantDeletionList>,
189 :
190 : /// Avoid having to walk `tenants` to calculate the number of keys in
191 : /// the nested deletion lists
192 : size: usize,
193 :
194 : /// Set to true when the list has undergone validation with the control
195 : /// plane and the remaining contents of `tenants` are valid. A list may
196 : /// also be implicitly marked valid by DeletionHeader.validated_sequence
197 : /// advancing to >= DeletionList.sequence
198 : #[serde(default)]
199 : #[serde(skip_serializing_if = "std::ops::Not::not")]
200 : validated: bool,
201 : }
202 :
203 0 : #[derive(Debug, Serialize, Deserialize)]
204 : struct DeletionHeader {
205 : /// Serialization version, for future use
206 : version: u8,
207 :
208 : /// The highest sequence number (inclusive) that has been validated. All deletion
209 : /// lists on disk with a sequence <= this value are safe to execute.
210 : validated_sequence: u64,
211 : }
212 :
213 : impl DeletionHeader {
214 : const VERSION_LATEST: u8 = 1;
215 :
216 48 : fn new(validated_sequence: u64) -> Self {
217 48 : Self {
218 48 : version: Self::VERSION_LATEST,
219 48 : validated_sequence,
220 48 : }
221 48 : }
222 :
223 48 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
224 48 : debug!("Saving deletion list header {:?}", self);
225 48 : let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
226 48 : let header_path = conf.deletion_header_path();
227 48 : let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
228 48 : VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
229 48 : .await
230 48 : .maybe_fatal_err("save deletion header")?;
231 :
232 48 : Ok(())
233 48 : }
234 : }
235 :
236 : impl DeletionList {
237 : const VERSION_LATEST: u8 = 1;
238 120 : fn new(sequence: u64) -> Self {
239 120 : Self {
240 120 : version: Self::VERSION_LATEST,
241 120 : sequence,
242 120 : tenants: HashMap::new(),
243 120 : size: 0,
244 120 : validated: false,
245 120 : }
246 120 : }
247 :
248 156 : fn is_empty(&self) -> bool {
249 156 : self.tenants.is_empty()
250 156 : }
251 :
252 360 : fn len(&self) -> usize {
253 360 : self.size
254 360 : }
255 :
256 : /// Returns true if the push was accepted, false if the caller must start a new
257 : /// deletion list.
258 84 : fn push(
259 84 : &mut self,
260 84 : tenant: &TenantShardId,
261 84 : timeline: &TimelineId,
262 84 : generation: Generation,
263 84 : objects: &mut Vec<RemotePath>,
264 84 : ) -> bool {
265 84 : if objects.is_empty() {
266 : // Avoid inserting an empty TimelineDeletionList: this preserves the property
267 : // that if we have no keys, then self.objects is empty (used in Self::is_empty)
268 0 : return true;
269 84 : }
270 84 :
271 84 : let tenant_entry = self
272 84 : .tenants
273 84 : .entry(*tenant)
274 84 : .or_insert_with(|| TenantDeletionList {
275 72 : timelines: HashMap::new(),
276 72 : generation,
277 84 : });
278 84 :
279 84 : if tenant_entry.generation != generation {
280 : // Only one generation per tenant per list: signal to
281 : // caller to start a new list.
282 12 : return false;
283 72 : }
284 72 :
285 72 : let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
286 72 :
287 72 : let timeline_remote_path = remote_timeline_path(tenant, timeline);
288 72 :
289 72 : self.size += objects.len();
290 72 : timeline_entry.extend(objects.drain(..).map(|p| {
291 72 : p.strip_prefix(&timeline_remote_path)
292 72 : .expect("Timeline paths always start with the timeline prefix")
293 72 : .to_string()
294 72 : }));
295 72 : true
296 84 : }
297 :
298 60 : fn into_remote_paths(self) -> Vec<RemotePath> {
299 60 : let mut result = Vec::new();
300 60 : for (tenant, tenant_deletions) in self.tenants.into_iter() {
301 36 : for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
302 36 : let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
303 36 : result.extend(
304 36 : timeline_layers
305 36 : .into_iter()
306 36 : .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
307 36 : );
308 36 : }
309 : }
310 :
311 60 : result
312 60 : }
313 :
314 84 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
315 84 : let path = conf.deletion_list_path(self.sequence);
316 84 : let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
317 84 :
318 84 : let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
319 84 :
320 84 : VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
321 84 : .await
322 84 : .maybe_fatal_err("save deletion list")
323 84 : .map_err(Into::into)
324 84 : }
325 : }
326 :
327 : impl std::fmt::Display for DeletionList {
328 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 0 : write!(
330 0 : f,
331 0 : "DeletionList<seq={}, tenants={}, keys={}>",
332 0 : self.sequence,
333 0 : self.tenants.len(),
334 0 : self.size
335 0 : )
336 0 : }
337 : }
338 :
339 : struct PendingLsn {
340 : projected: Lsn,
341 : result_slot: Arc<AtomicLsn>,
342 : }
343 :
344 : struct TenantLsnState {
345 : timelines: HashMap<TimelineId, PendingLsn>,
346 :
347 : // In what generation was the most recent update proposed?
348 : generation: Generation,
349 : }
350 :
351 : #[derive(Default)]
352 : struct VisibleLsnUpdates {
353 : tenants: HashMap<TenantShardId, TenantLsnState>,
354 : }
355 :
356 : impl VisibleLsnUpdates {
357 1440 : fn new() -> Self {
358 1440 : Self {
359 1440 : tenants: HashMap::new(),
360 1440 : }
361 1440 : }
362 : }
363 :
364 : impl std::fmt::Debug for VisibleLsnUpdates {
365 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 0 : write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
367 0 : }
368 : }
369 :
370 : #[derive(Error, Debug)]
371 : pub enum DeletionQueueError {
372 : #[error("Deletion queue unavailable during shutdown")]
373 : ShuttingDown,
374 : }
375 :
376 : impl DeletionQueueClient {
377 : /// This is cancel-safe. If you drop the future before it completes, the message
378 : /// is not pushed, although in the context of the deletion queue it doesn't matter: once
379 : /// we decide to do a deletion the decision is always final.
380 2119 : fn do_push<T>(
381 2119 : &self,
382 2119 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
383 2119 : msg: T,
384 2119 : ) -> Result<(), DeletionQueueError> {
385 2119 : match queue.send(msg) {
386 2119 : Ok(_) => Ok(()),
387 0 : Err(e) => {
388 0 : // This shouldn't happen, we should shut down all tenants before
389 0 : // we shut down the global delete queue. If we encounter a bug like this,
390 0 : // we may leak objects as deletions won't be processed.
391 0 : error!("Deletion queue closed while pushing, shutting down? ({e})");
392 0 : Err(DeletionQueueError::ShuttingDown)
393 : }
394 : }
395 2119 : }
396 :
397 48 : pub(crate) fn recover(
398 48 : &self,
399 48 : attached_tenants: HashMap<TenantShardId, Generation>,
400 48 : ) -> Result<(), DeletionQueueError> {
401 48 : self.do_push(
402 48 : &self.tx,
403 48 : ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
404 48 : )
405 48 : }
406 :
407 : /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
408 : /// world, it must validate its generation number before doing so. Rather than do this synchronously,
409 : /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
410 : /// recently validated separately.
411 : ///
412 : /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The
413 : /// backend will later wake up and notice that the tenant's generation requires validation.
414 8905 : pub(crate) async fn update_remote_consistent_lsn(
415 8905 : &self,
416 8905 : tenant_shard_id: TenantShardId,
417 8905 : timeline_id: TimelineId,
418 8905 : current_generation: Generation,
419 8905 : lsn: Lsn,
420 8905 : result_slot: Arc<AtomicLsn>,
421 8905 : ) {
422 8905 : let mut locked = self
423 8905 : .lsn_table
424 8905 : .write()
425 8905 : .expect("Lock should never be poisoned");
426 8905 :
427 8905 : let tenant_entry = locked
428 8905 : .tenants
429 8905 : .entry(tenant_shard_id)
430 8905 : .or_insert(TenantLsnState {
431 8905 : timelines: HashMap::new(),
432 8905 : generation: current_generation,
433 8905 : });
434 8905 :
435 8905 : if tenant_entry.generation != current_generation {
436 0 : // Generation might have changed if we were detached and then re-attached: in this case,
437 0 : // state from the previous generation cannot be trusted.
438 0 : tenant_entry.timelines.clear();
439 0 : tenant_entry.generation = current_generation;
440 8905 : }
441 :
442 8905 : tenant_entry.timelines.insert(
443 8905 : timeline_id,
444 8905 : PendingLsn {
445 8905 : projected: lsn,
446 8905 : result_slot,
447 8905 : },
448 8905 : );
449 8905 : }
450 :
451 : /// Submit a list of layers for deletion: this function will return before the deletion is
452 : /// persistent, but it may be executed at any time after this function enters: do not push
453 : /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
454 : /// references them).
455 : ///
456 : /// The `current_generation` is the generation of this pageserver's current attachment. The
457 : /// generations in `layers` are the generations in which those layers were written.
458 1927 : pub(crate) fn push_layers(
459 1927 : &self,
460 1927 : tenant_shard_id: TenantShardId,
461 1927 : timeline_id: TimelineId,
462 1927 : current_generation: Generation,
463 1927 : layers: Vec<(LayerName, LayerFileMetadata)>,
464 1927 : ) -> Result<(), DeletionQueueError> {
465 1927 : // None generations are not valid for attached tenants: they must always be attached in
466 1927 : // a known generation. None generations are still permitted for layers in the index because
467 1927 : // they may be historical.
468 1927 : assert!(!current_generation.is_none());
469 :
470 1927 : metrics::DELETION_QUEUE
471 1927 : .keys_submitted
472 1927 : .inc_by(layers.len() as u64);
473 1927 : self.do_push(
474 1927 : &self.tx,
475 1927 : ListWriterQueueMessage::Delete(DeletionOp {
476 1927 : tenant_shard_id,
477 1927 : timeline_id,
478 1927 : layers,
479 1927 : generation: current_generation,
480 1927 : objects: Vec::new(),
481 1927 : }),
482 1927 : )
483 1927 : }
484 :
485 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
486 144 : async fn do_flush<T>(
487 144 : &self,
488 144 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
489 144 : msg: T,
490 144 : rx: tokio::sync::oneshot::Receiver<()>,
491 144 : ) -> Result<(), DeletionQueueError> {
492 144 : self.do_push(queue, msg)?;
493 144 : if rx.await.is_err() {
494 : // This shouldn't happen if tenants are shut down before deletion queue. If we
495 : // encounter a bug like this, then a flusher will incorrectly believe it has flushed
496 : // when it hasn't, possibly leading to leaking objects.
497 0 : error!("Deletion queue dropped flush op while client was still waiting");
498 0 : Err(DeletionQueueError::ShuttingDown)
499 : } else {
500 144 : Ok(())
501 : }
502 144 : }
503 :
504 : /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
505 : ///
506 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
507 84 : pub async fn flush(&self) -> Result<(), DeletionQueueError> {
508 84 : let (flush_op, rx) = FlushOp::new();
509 84 : self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
510 84 : .await
511 84 : }
512 :
513 : /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
514 : /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
515 : /// detach where flushing is nice but not necessary.
516 : ///
517 : /// This function provides no guarantees of work being done.
518 0 : pub fn flush_advisory(&self) {
519 0 : let (flush_op, _) = FlushOp::new();
520 0 :
521 0 : // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
522 0 : drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
523 0 : }
524 :
525 : // Wait until all previous deletions are executed
526 60 : pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
527 60 : debug!("flush_execute: flushing to deletion lists...");
528 : // Flush any buffered work to deletion lists
529 60 : self.flush().await?;
530 :
531 : // Flush the backend into the executor of deletion lists
532 60 : let (flush_op, rx) = FlushOp::new();
533 60 : debug!("flush_execute: flushing backend...");
534 60 : self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
535 60 : .await?;
536 60 : debug!("flush_execute: finished flushing backend...");
537 :
538 : // Flush any immediate-mode deletions (the above backend flush will only flush
539 : // the executor if deletions had flowed through the backend)
540 60 : debug!("flush_execute: flushing execution...");
541 60 : self.flush_immediate().await?;
542 60 : debug!("flush_execute: finished flushing execution...");
543 60 : Ok(())
544 60 : }
545 :
546 : /// This interface bypasses the persistent deletion queue, and any validation
547 : /// that this pageserver is still elegible to execute the deletions. It is for
548 : /// use in timeline deletions, where the control plane is telling us we may
549 : /// delete everything in the timeline.
550 : ///
551 : /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
552 0 : pub(crate) async fn push_immediate(
553 0 : &self,
554 0 : objects: Vec<RemotePath>,
555 0 : ) -> Result<(), DeletionQueueError> {
556 0 : metrics::DELETION_QUEUE
557 0 : .keys_submitted
558 0 : .inc_by(objects.len() as u64);
559 0 : self.executor_tx
560 0 : .send(DeleterMessage::Delete(objects))
561 0 : .await
562 0 : .map_err(|_| DeletionQueueError::ShuttingDown)
563 0 : }
564 :
565 : /// Companion to push_immediate. When this returns Ok, all prior objects sent
566 : /// into push_immediate have been deleted from remote storage.
567 60 : pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
568 60 : let (flush_op, rx) = FlushOp::new();
569 60 : self.executor_tx
570 60 : .send(DeleterMessage::Flush(flush_op))
571 60 : .await
572 60 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
573 :
574 60 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
575 60 : }
576 : }
577 :
578 : impl DeletionQueue {
579 48 : pub fn new_client(&self) -> DeletionQueueClient {
580 48 : self.client.clone()
581 48 : }
582 :
583 : /// Caller may use the returned object to construct clients with new_client.
584 : /// Caller should tokio::spawn the background() members of the two worker objects returned:
585 : /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
586 48 : pub fn new<C>(
587 48 : remote_storage: GenericRemoteStorage,
588 48 : controller_upcall_client: Option<C>,
589 48 : conf: &'static PageServerConf,
590 48 : ) -> (Self, DeletionQueueWorkers<C>)
591 48 : where
592 48 : C: StorageControllerUpcallApi + Send + Sync,
593 48 : {
594 48 : // Unbounded channel: enables non-async functions to submit deletions. The actual length is
595 48 : // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
596 48 : // enough to avoid this taking pathologically large amount of memory.
597 48 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
598 48 :
599 48 : // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
600 48 : let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
601 48 :
602 48 : // Shallow channel: it carries lists of paths, and we expect the main queueing to
603 48 : // happen in the backend (persistent), not in this queue.
604 48 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
605 48 :
606 48 : let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
607 48 :
608 48 : // The deletion queue has an independent cancellation token to
609 48 : // the general pageserver shutdown token, because it stays alive a bit
610 48 : // longer to flush after Tenants have all been torn down.
611 48 : let cancel = CancellationToken::new();
612 48 :
613 48 : (
614 48 : Self {
615 48 : client: DeletionQueueClient {
616 48 : tx,
617 48 : executor_tx: executor_tx.clone(),
618 48 : lsn_table: lsn_table.clone(),
619 48 : },
620 48 : cancel: cancel.clone(),
621 48 : },
622 48 : DeletionQueueWorkers {
623 48 : frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
624 48 : backend: Validator::new(
625 48 : conf,
626 48 : backend_rx,
627 48 : executor_tx,
628 48 : controller_upcall_client,
629 48 : lsn_table.clone(),
630 48 : cancel.clone(),
631 48 : ),
632 48 : executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
633 48 : },
634 48 : )
635 48 : }
636 :
637 0 : pub async fn shutdown(&mut self, timeout: Duration) {
638 0 : match tokio::time::timeout(timeout, self.client.flush()).await {
639 : Ok(Ok(())) => {
640 0 : tracing::info!("Deletion queue flushed successfully on shutdown")
641 : }
642 : Ok(Err(DeletionQueueError::ShuttingDown)) => {
643 : // This is not harmful for correctness, but is unexpected: the deletion
644 : // queue's workers should stay alive as long as there are any client handles instantiated.
645 0 : tracing::warn!("Deletion queue stopped prematurely");
646 : }
647 0 : Err(_timeout) => {
648 0 : tracing::warn!("Timed out flushing deletion queue on shutdown")
649 : }
650 : }
651 :
652 : // We only cancel _after_ flushing: otherwise we would be shutting down the
653 : // components that do the flush.
654 0 : self.cancel.cancel();
655 0 : }
656 : }
657 :
658 : #[cfg(test)]
659 : mod test {
660 : use std::io::ErrorKind;
661 : use std::time::Duration;
662 :
663 : use camino::Utf8Path;
664 : use hex_literal::hex;
665 : use pageserver_api::key::Key;
666 : use pageserver_api::shard::ShardIndex;
667 : use pageserver_api::upcall_api::ReAttachResponseTenant;
668 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
669 : use tokio::task::JoinHandle;
670 : use tracing::info;
671 :
672 : use super::*;
673 : use crate::controller_upcall_client::RetryForeverError;
674 : use crate::tenant::harness::TenantHarness;
675 : use crate::tenant::storage_layer::DeltaLayerName;
676 : pub const TIMELINE_ID: TimelineId =
677 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
678 :
679 : pub const EXAMPLE_LAYER_NAME: LayerName = LayerName::Delta(DeltaLayerName {
680 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
681 : lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
682 : });
683 :
684 : // When you need a second layer in a test.
685 : pub const EXAMPLE_LAYER_NAME_ALT: LayerName = LayerName::Delta(DeltaLayerName {
686 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
687 : lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
688 : });
689 :
690 : struct TestSetup {
691 : harness: TenantHarness,
692 : remote_fs_dir: Utf8PathBuf,
693 : storage: GenericRemoteStorage,
694 : mock_control_plane: MockStorageController,
695 : deletion_queue: DeletionQueue,
696 : worker_join: JoinHandle<()>,
697 : }
698 :
699 : impl TestSetup {
700 : /// Simulate a pageserver restart by destroying and recreating the deletion queue
701 12 : async fn restart(&mut self) {
702 12 : let (deletion_queue, workers) = DeletionQueue::new(
703 12 : self.storage.clone(),
704 12 : Some(self.mock_control_plane.clone()),
705 12 : self.harness.conf,
706 12 : );
707 12 :
708 12 : tracing::debug!("Spawning worker for new queue queue");
709 12 : let worker_join = workers.spawn_with(&tokio::runtime::Handle::current());
710 12 :
711 12 : let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
712 12 : let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
713 12 :
714 12 : tracing::debug!("Joining worker from previous queue");
715 12 : old_deletion_queue.cancel.cancel();
716 12 : old_worker_join
717 12 : .await
718 12 : .expect("Failed to join workers for previous deletion queue");
719 12 : }
720 :
721 36 : fn set_latest_generation(&self, gen_: Generation) {
722 36 : let tenant_shard_id = self.harness.tenant_shard_id;
723 36 : self.mock_control_plane
724 36 : .latest_generation
725 36 : .lock()
726 36 : .unwrap()
727 36 : .insert(tenant_shard_id, gen_);
728 36 : }
729 :
730 : /// Returns remote layer file name, suitable for use in assert_remote_files
731 36 : fn write_remote_layer(
732 36 : &self,
733 36 : file_name: LayerName,
734 36 : gen_: Generation,
735 36 : ) -> anyhow::Result<String> {
736 36 : let tenant_shard_id = self.harness.tenant_shard_id;
737 36 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
738 36 : let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
739 36 : std::fs::create_dir_all(&remote_timeline_path)?;
740 36 : let remote_layer_file_name = format!("{}{}", file_name, gen_.get_suffix());
741 36 :
742 36 : let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
743 36 :
744 36 : std::fs::write(
745 36 : remote_timeline_path.join(remote_layer_file_name.clone()),
746 36 : content,
747 36 : )?;
748 :
749 36 : Ok(remote_layer_file_name)
750 36 : }
751 : }
752 :
753 : #[derive(Debug, Clone)]
754 : struct MockStorageController {
755 : pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
756 : }
757 :
758 : impl MockStorageController {
759 36 : fn new() -> Self {
760 36 : Self {
761 36 : latest_generation: Arc::default(),
762 36 : }
763 36 : }
764 : }
765 :
766 : impl StorageControllerUpcallApi for MockStorageController {
767 0 : async fn re_attach(
768 0 : &self,
769 0 : _conf: &PageServerConf,
770 0 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
771 0 : unimplemented!()
772 : }
773 :
774 48 : async fn validate(
775 48 : &self,
776 48 : tenants: Vec<(TenantShardId, Generation)>,
777 48 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
778 48 : let mut result = HashMap::new();
779 48 :
780 48 : let latest_generation = self.latest_generation.lock().unwrap();
781 :
782 96 : for (tenant_shard_id, generation) in tenants {
783 48 : if let Some(latest) = latest_generation.get(&tenant_shard_id) {
784 48 : result.insert(tenant_shard_id, *latest == generation);
785 48 : }
786 : }
787 :
788 48 : Ok(result)
789 48 : }
790 :
791 0 : async fn put_timeline_import_status(
792 0 : &self,
793 0 : _tenant_shard_id: TenantShardId,
794 0 : _timeline_id: TimelineId,
795 0 : _status: pageserver_api::models::ShardImportStatus,
796 0 : ) -> Result<(), RetryForeverError> {
797 0 : unimplemented!()
798 : }
799 : }
800 :
801 36 : async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
802 36 : let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
803 36 : let harness = TenantHarness::create(test_name).await?;
804 :
805 : // We do not load() the harness: we only need its config and remote_storage
806 :
807 : // Set up a GenericRemoteStorage targetting a directory
808 36 : let remote_fs_dir = harness.conf.workdir.join("remote_fs");
809 36 : std::fs::create_dir_all(remote_fs_dir)?;
810 36 : let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
811 36 : let storage_config = RemoteStorageConfig {
812 36 : storage: RemoteStorageKind::LocalFs {
813 36 : local_path: remote_fs_dir.clone(),
814 36 : },
815 36 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
816 36 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
817 36 : };
818 36 : let storage = GenericRemoteStorage::from_config(&storage_config)
819 36 : .await
820 36 : .unwrap();
821 36 :
822 36 : let mock_control_plane = MockStorageController::new();
823 36 :
824 36 : let (deletion_queue, worker) = DeletionQueue::new(
825 36 : storage.clone(),
826 36 : Some(mock_control_plane.clone()),
827 36 : harness.conf,
828 36 : );
829 36 :
830 36 : let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
831 36 :
832 36 : Ok(TestSetup {
833 36 : harness,
834 36 : remote_fs_dir,
835 36 : storage,
836 36 : mock_control_plane,
837 36 : deletion_queue,
838 36 : worker_join,
839 36 : })
840 36 : }
841 :
842 : // TODO: put this in a common location so that we can share with remote_timeline_client's tests
843 108 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
844 108 : let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
845 108 : expected.sort();
846 108 :
847 108 : let mut found: Vec<String> = Vec::new();
848 108 : let dir = match std::fs::read_dir(remote_path) {
849 108 : Ok(d) => d,
850 0 : Err(e) => {
851 0 : if e.kind() == ErrorKind::NotFound {
852 0 : if expected.is_empty() {
853 : // We are asserting prefix is empty: it is expected that the dir is missing
854 0 : return;
855 : } else {
856 0 : assert_eq!(expected, Vec::<String>::new());
857 0 : unreachable!();
858 : }
859 : } else {
860 0 : panic!("Unexpected error listing {remote_path}: {e}");
861 : }
862 : }
863 : };
864 :
865 108 : for entry in dir.flatten() {
866 96 : let entry_name = entry.file_name();
867 96 : let fname = entry_name.to_str().unwrap();
868 96 : found.push(String::from(fname));
869 96 : }
870 108 : found.sort();
871 108 :
872 108 : assert_eq!(expected, found);
873 108 : }
874 :
875 60 : fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
876 60 : let dir = match std::fs::read_dir(directory) {
877 48 : Ok(d) => d,
878 : Err(_) => {
879 12 : assert_eq!(expected, &Vec::<String>::new());
880 12 : return;
881 : }
882 : };
883 48 : let mut found = Vec::new();
884 108 : for dentry in dir {
885 60 : let dentry = dentry.unwrap();
886 60 : let file_name = dentry.file_name();
887 60 : let file_name_str = file_name.to_string_lossy();
888 60 : found.push(file_name_str.to_string());
889 60 : }
890 48 : found.sort();
891 48 : assert_eq!(expected, found);
892 60 : }
893 :
894 : #[tokio::test]
895 12 : async fn deletion_queue_smoke() -> anyhow::Result<()> {
896 12 : // Basic test that the deletion queue processes the deletions we pass into it
897 12 : let ctx = setup("deletion_queue_smoke")
898 12 : .await
899 12 : .expect("Failed test setup");
900 12 : let client = ctx.deletion_queue.new_client();
901 12 : client.recover(HashMap::new())?;
902 12 :
903 12 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
904 12 : let tenant_shard_id = ctx.harness.tenant_shard_id;
905 12 :
906 12 : let content: Vec<u8> = "victim1 contents".into();
907 12 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
908 12 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
909 12 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
910 12 :
911 12 : // Exercise the distinction between the generation of the layers
912 12 : // we delete, and the generation of the running Tenant.
913 12 : let layer_generation = Generation::new(0xdeadbeef);
914 12 : let now_generation = Generation::new(0xfeedbeef);
915 12 : let layer_metadata =
916 12 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
917 12 :
918 12 : let remote_layer_file_name_1 =
919 12 : format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
920 12 :
921 12 : // Set mock control plane state to valid for our generation
922 12 : ctx.set_latest_generation(now_generation);
923 12 :
924 12 : // Inject a victim file to remote storage
925 12 : info!("Writing");
926 12 : std::fs::create_dir_all(&remote_timeline_path)?;
927 12 : std::fs::write(
928 12 : remote_timeline_path.join(remote_layer_file_name_1.clone()),
929 12 : content,
930 12 : )?;
931 12 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
932 12 :
933 12 : // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
934 12 : info!("Pushing");
935 12 : client.push_layers(
936 12 : tenant_shard_id,
937 12 : TIMELINE_ID,
938 12 : now_generation,
939 12 : [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
940 12 : )?;
941 12 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
942 12 :
943 12 : assert_local_files(&[], &deletion_prefix);
944 12 :
945 12 : // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
946 12 : info!("Flushing");
947 12 : client.flush().await?;
948 12 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
949 12 : assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
950 12 :
951 12 : // File should go away when we execute
952 12 : info!("Flush-executing");
953 12 : client.flush_execute().await?;
954 12 : assert_remote_files(&[], &remote_timeline_path);
955 12 : assert_local_files(&["header-01"], &deletion_prefix);
956 12 :
957 12 : // Flushing on an empty queue should succeed immediately, and not write any lists
958 12 : info!("Flush-executing on empty");
959 12 : client.flush_execute().await?;
960 12 : assert_local_files(&["header-01"], &deletion_prefix);
961 12 :
962 12 : Ok(())
963 12 : }
964 :
965 : #[tokio::test]
966 12 : async fn deletion_queue_validation() -> anyhow::Result<()> {
967 12 : let ctx = setup("deletion_queue_validation")
968 12 : .await
969 12 : .expect("Failed test setup");
970 12 : let client = ctx.deletion_queue.new_client();
971 12 : client.recover(HashMap::new())?;
972 12 :
973 12 : // Generation that the control plane thinks is current
974 12 : let latest_generation = Generation::new(0xdeadbeef);
975 12 : // Generation that our DeletionQueue thinks the tenant is running with
976 12 : let stale_generation = latest_generation.previous();
977 12 : // Generation that our example layer file was written with
978 12 : let layer_generation = stale_generation.previous();
979 12 : let layer_metadata =
980 12 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
981 12 :
982 12 : ctx.set_latest_generation(latest_generation);
983 12 :
984 12 : let tenant_shard_id = ctx.harness.tenant_shard_id;
985 12 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
986 12 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
987 12 :
988 12 : // Initial state: a remote layer exists
989 12 : let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
990 12 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
991 12 :
992 12 : tracing::debug!("Pushing...");
993 12 : client.push_layers(
994 12 : tenant_shard_id,
995 12 : TIMELINE_ID,
996 12 : stale_generation,
997 12 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
998 12 : )?;
999 12 :
1000 12 : // We enqueued the operation in a stale generation: it should have failed validation
1001 12 : tracing::debug!("Flushing...");
1002 12 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1003 12 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1004 12 :
1005 12 : tracing::debug!("Pushing...");
1006 12 : client.push_layers(
1007 12 : tenant_shard_id,
1008 12 : TIMELINE_ID,
1009 12 : latest_generation,
1010 12 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1011 12 : )?;
1012 12 :
1013 12 : // We enqueued the operation in a fresh generation: it should have passed validation
1014 12 : tracing::debug!("Flushing...");
1015 12 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1016 12 : assert_remote_files(&[], &remote_timeline_path);
1017 12 :
1018 12 : Ok(())
1019 12 : }
1020 :
1021 : #[tokio::test]
1022 12 : async fn deletion_queue_recovery() -> anyhow::Result<()> {
1023 12 : // Basic test that the deletion queue processes the deletions we pass into it
1024 12 : let mut ctx = setup("deletion_queue_recovery")
1025 12 : .await
1026 12 : .expect("Failed test setup");
1027 12 : let client = ctx.deletion_queue.new_client();
1028 12 : client.recover(HashMap::new())?;
1029 12 :
1030 12 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1031 12 :
1032 12 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1033 12 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1034 12 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
1035 12 :
1036 12 : let layer_generation = Generation::new(0xdeadbeef);
1037 12 : let now_generation = Generation::new(0xfeedbeef);
1038 12 : let layer_metadata =
1039 12 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1040 12 :
1041 12 : // Inject a deletion in the generation before generation_now: after restart,
1042 12 : // this deletion should _not_ get executed (only the immediately previous
1043 12 : // generation gets that treatment)
1044 12 : let remote_layer_file_name_historical =
1045 12 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1046 12 : client.push_layers(
1047 12 : tenant_shard_id,
1048 12 : TIMELINE_ID,
1049 12 : now_generation.previous(),
1050 12 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1051 12 : )?;
1052 12 :
1053 12 : // Inject a deletion in the generation before generation_now: after restart,
1054 12 : // this deletion should get executed, because we execute deletions in the
1055 12 : // immediately previous generation on the same node.
1056 12 : let remote_layer_file_name_previous =
1057 12 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
1058 12 : client.push_layers(
1059 12 : tenant_shard_id,
1060 12 : TIMELINE_ID,
1061 12 : now_generation,
1062 12 : [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
1063 12 : )?;
1064 12 :
1065 12 : client.flush().await?;
1066 12 : assert_remote_files(
1067 12 : &[
1068 12 : &remote_layer_file_name_historical,
1069 12 : &remote_layer_file_name_previous,
1070 12 : ],
1071 12 : &remote_timeline_path,
1072 12 : );
1073 12 :
1074 12 : // Different generatinos for the same tenant will cause two separate
1075 12 : // deletion lists to be emitted.
1076 12 : assert_local_files(
1077 12 : &["0000000000000001-01.list", "0000000000000002-01.list"],
1078 12 : &deletion_prefix,
1079 12 : );
1080 12 :
1081 12 : // Simulate a node restart: the latest generation advances
1082 12 : let now_generation = now_generation.next();
1083 12 : ctx.set_latest_generation(now_generation);
1084 12 :
1085 12 : // Restart the deletion queue
1086 12 : drop(client);
1087 12 : ctx.restart().await;
1088 12 : let client = ctx.deletion_queue.new_client();
1089 12 : client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
1090 12 :
1091 12 : info!("Flush-executing");
1092 12 : client.flush_execute().await?;
1093 12 : // The deletion from immediately prior generation was executed, the one from
1094 12 : // an older generation was not.
1095 12 : assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
1096 12 : Ok(())
1097 12 : }
1098 : }
1099 :
1100 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
1101 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
1102 : #[cfg(test)]
1103 : pub(crate) mod mock {
1104 : use std::sync::atomic::{AtomicUsize, Ordering};
1105 :
1106 : use tracing::info;
1107 :
1108 : use super::*;
1109 : use crate::tenant::remote_timeline_client::remote_layer_path;
1110 :
1111 : pub struct ConsumerState {
1112 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
1113 : executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
1114 : cancel: CancellationToken,
1115 : executed: Arc<AtomicUsize>,
1116 : }
1117 :
1118 : impl ConsumerState {
1119 1392 : async fn consume(&mut self, remote_storage: &GenericRemoteStorage) {
1120 1392 : info!("Executing all pending deletions");
1121 :
1122 : // Transform all executor messages to generic frontend messages
1123 3114 : loop {
1124 3114 : use either::Either;
1125 3114 : let msg = tokio::select! {
1126 3114 : left = self.executor_rx.recv() => Either::Left(left),
1127 3114 : right = self.rx.recv() => Either::Right(right),
1128 : };
1129 12 : match msg {
1130 0 : Either::Left(None) => break,
1131 0 : Either::Right(None) => break,
1132 0 : Either::Left(Some(DeleterMessage::Delete(objects))) => {
1133 0 : for path in objects {
1134 0 : match remote_storage.delete(&path, &self.cancel).await {
1135 : Ok(_) => {
1136 0 : debug!("Deleted {path}");
1137 : }
1138 0 : Err(e) => {
1139 0 : error!("Failed to delete {path}, leaking object! ({e})");
1140 : }
1141 : }
1142 0 : self.executed.fetch_add(1, Ordering::Relaxed);
1143 : }
1144 : }
1145 12 : Either::Left(Some(DeleterMessage::Flush(flush_op))) => {
1146 12 : flush_op.notify();
1147 12 : }
1148 1736 : Either::Right(Some(ListWriterQueueMessage::Delete(op))) => {
1149 1736 : let mut objects = op.objects;
1150 3472 : for (layer, meta) in op.layers {
1151 1736 : objects.push(remote_layer_path(
1152 1736 : &op.tenant_shard_id.tenant_id,
1153 1736 : &op.timeline_id,
1154 1736 : meta.shard,
1155 1736 : &layer,
1156 1736 : meta.generation,
1157 1736 : ));
1158 1736 : }
1159 :
1160 3446 : for path in objects {
1161 1736 : info!("Executing deletion {path}");
1162 1736 : match remote_storage.delete(&path, &self.cancel).await {
1163 : Ok(_) => {
1164 1710 : debug!("Deleted {path}");
1165 : }
1166 0 : Err(e) => {
1167 0 : error!("Failed to delete {path}, leaking object! ({e})");
1168 : }
1169 : }
1170 1710 : self.executed.fetch_add(1, Ordering::Relaxed);
1171 : }
1172 : }
1173 0 : Either::Right(Some(ListWriterQueueMessage::Flush(op))) => {
1174 0 : op.notify();
1175 0 : }
1176 0 : Either::Right(Some(ListWriterQueueMessage::FlushExecute(op))) => {
1177 0 : // We have already executed all prior deletions because mock does them inline
1178 0 : op.notify();
1179 0 : }
1180 0 : Either::Right(Some(ListWriterQueueMessage::Recover(_))) => {
1181 0 : // no-op in mock
1182 0 : }
1183 : }
1184 : }
1185 0 : }
1186 : }
1187 :
1188 : pub struct MockDeletionQueue {
1189 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
1190 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
1191 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
1192 : }
1193 :
1194 : impl MockDeletionQueue {
1195 1392 : pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
1196 1392 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1197 1392 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
1198 1392 :
1199 1392 : let executed = Arc::new(AtomicUsize::new(0));
1200 1392 :
1201 1392 : let mut consumer = ConsumerState {
1202 1392 : rx,
1203 1392 : executor_rx,
1204 1392 : cancel: CancellationToken::new(),
1205 1392 : executed: executed.clone(),
1206 1392 : };
1207 1392 :
1208 1392 : tokio::spawn(async move {
1209 1392 : if let Some(remote_storage) = &remote_storage {
1210 1392 : consumer.consume(remote_storage).await;
1211 0 : }
1212 1392 : });
1213 1392 :
1214 1392 : Self {
1215 1392 : tx,
1216 1392 : executor_tx,
1217 1392 : lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
1218 1392 : }
1219 1392 : }
1220 :
1221 : #[allow(clippy::await_holding_lock)]
1222 12 : pub async fn pump(&self) {
1223 12 : let (tx, rx) = tokio::sync::oneshot::channel();
1224 12 : self.executor_tx
1225 12 : .send(DeleterMessage::Flush(FlushOp { tx }))
1226 12 : .await
1227 12 : .expect("Failed to send flush message");
1228 12 : rx.await.ok();
1229 12 : }
1230 :
1231 1452 : pub(crate) fn new_client(&self) -> DeletionQueueClient {
1232 1452 : DeletionQueueClient {
1233 1452 : tx: self.tx.clone(),
1234 1452 : executor_tx: self.executor_tx.clone(),
1235 1452 : lsn_table: self.lsn_table.clone(),
1236 1452 : }
1237 1452 : }
1238 : }
1239 :
1240 : /// Test round-trip serialization/deserialization, and test stability of the format
1241 : /// vs. a static expected string for the serialized version.
1242 : #[test]
1243 12 : fn deletion_list_serialization() -> anyhow::Result<()> {
1244 12 : let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
1245 12 : .to_string()
1246 12 : .parse::<TenantShardId>()?;
1247 12 : let timeline_id = "be322c834ed9e709e63b5c9698691910"
1248 12 : .to_string()
1249 12 : .parse::<TimelineId>()?;
1250 12 : let generation = Generation::new(123);
1251 :
1252 12 : let object =
1253 12 : RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
1254 12 : let mut objects = [object].to_vec();
1255 12 :
1256 12 : let mut example = DeletionList::new(1);
1257 12 : example.push(&tenant_id, &timeline_id, generation, &mut objects);
1258 :
1259 12 : let encoded = serde_json::to_string(&example)?;
1260 :
1261 12 : let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
1262 12 : assert_eq!(encoded, expected);
1263 :
1264 12 : let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
1265 12 : assert_eq!(example, decoded);
1266 :
1267 12 : Ok(())
1268 12 : }
1269 : }
|