Line data Source code
1 : mod deleter;
2 : mod list_writer;
3 : mod validator;
4 :
5 : use std::collections::HashMap;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use crate::control_plane_client::ControlPlaneGenerationsApi;
10 : use crate::metrics;
11 : use crate::tenant::remote_timeline_client::remote_layer_path;
12 : use crate::tenant::remote_timeline_client::remote_timeline_path;
13 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
14 : use crate::virtual_file::MaybeFatalIo;
15 : use crate::virtual_file::VirtualFile;
16 : use anyhow::Context;
17 : use camino::Utf8PathBuf;
18 : use pageserver_api::shard::TenantShardId;
19 : use remote_storage::{GenericRemoteStorage, RemotePath};
20 : use serde::Deserialize;
21 : use serde::Serialize;
22 : use thiserror::Error;
23 : use tokio;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::Instrument;
26 : use tracing::{self, debug, error};
27 : use utils::crashsafe::path_with_suffix_extension;
28 : use utils::generation::Generation;
29 : use utils::id::TimelineId;
30 : use utils::lsn::AtomicLsn;
31 : use utils::lsn::Lsn;
32 :
33 : use self::deleter::Deleter;
34 : use self::list_writer::DeletionOp;
35 : use self::list_writer::ListWriter;
36 : use self::list_writer::RecoverOp;
37 : use self::validator::Validator;
38 : use deleter::DeleterMessage;
39 : use list_writer::ListWriterQueueMessage;
40 : use validator::ValidatorQueueMessage;
41 :
42 : use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
43 :
44 : // TODO: configurable for how long to wait before executing deletions
45 :
46 : /// We aggregate object deletions from many tenants in one place, for several reasons:
47 : /// - Coalesce deletions into fewer DeleteObjects calls
48 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
49 : /// to flush any outstanding deletions.
50 : /// - Globally control throughput of deletions, as these are a low priority task: do
51 : /// not compete with the same S3 clients/connections used for higher priority uploads.
52 : /// - Enable gating deletions on validation of a tenant's generation number, to make
53 : /// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
54 : ///
55 : /// There are two kinds of deletion: deferred and immediate. A deferred deletion
56 : /// may be intentionally delayed to protect passive readers of S3 data, and is
57 : /// subject to a generation number validation step. An immediate deletion is
58 : /// ready to execute immediately, and is only queued up so that it can be coalesced
59 : /// with other deletions in flight.
60 : ///
61 : /// Deferred deletions pass through three steps:
62 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
63 : /// DeletionLists, which are persisted to disk.
64 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
65 : /// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn
66 : /// updates for running timelines.
67 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
68 : /// batches of 1000 keys via DeleteObjects.
69 : ///
70 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
71 : /// two stages and are passed straight into the Deleter.
72 : ///
73 : /// Internally, each stage is joined by a channel to the next. On disk, there is only
74 : /// one queue (of DeletionLists), which is written by the frontend and consumed
75 : /// by the backend.
76 183 : #[derive(Clone)]
77 : pub struct DeletionQueue {
78 : client: DeletionQueueClient,
79 :
80 : // Parent cancellation token for the tokens passed into background workers
81 : cancel: CancellationToken,
82 : }
83 :
84 : /// Opaque wrapper around individual worker tasks, to avoid making the
85 : /// worker objects themselves public
86 : pub struct DeletionQueueWorkers<C>
87 : where
88 : C: ControlPlaneGenerationsApi + Send + Sync,
89 : {
90 : frontend: ListWriter,
91 : backend: Validator<C>,
92 : executor: Deleter,
93 : }
94 :
95 : impl<C> DeletionQueueWorkers<C>
96 : where
97 : C: ControlPlaneGenerationsApi + Send + Sync + 'static,
98 : {
99 633 : pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
100 633 : let jh_frontend = runtime.spawn(async move {
101 633 : self.frontend
102 633 : .background()
103 633 : .instrument(tracing::info_span!(parent:None, "deletion frontend"))
104 3970 : .await
105 633 : });
106 633 : let jh_backend = runtime.spawn(async move {
107 633 : self.backend
108 633 : .background()
109 633 : .instrument(tracing::info_span!(parent:None, "deletion backend"))
110 2035 : .await
111 633 : });
112 633 : let jh_executor = runtime.spawn(async move {
113 633 : self.executor
114 633 : .background()
115 633 : .instrument(tracing::info_span!(parent:None, "deletion executor"))
116 11233 : .await
117 633 : });
118 633 :
119 633 : runtime.spawn({
120 633 : async move {
121 633 : jh_frontend.await.expect("error joining frontend worker");
122 3 : jh_backend.await.expect("error joining backend worker");
123 3 : drop(jh_executor.await.expect("error joining executor worker"));
124 633 : }
125 633 : })
126 633 : }
127 : }
128 :
129 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
130 : /// another channel, and the receive side will receive a message when the channel
131 : /// we're flushing has reached the FlushOp we sent into it.
132 : ///
133 : /// The only extra behavior beyond the channel is that the notify() method does not
134 : /// return an error when the receive side has been dropped, because in this use case
135 : /// it is harmless (the code that initiated the flush no longer cares about the result).
136 0 : #[derive(Debug)]
137 : struct FlushOp {
138 : tx: tokio::sync::oneshot::Sender<()>,
139 : }
140 :
141 : impl FlushOp {
142 909 : fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
143 909 : let (tx, rx) = tokio::sync::oneshot::channel::<()>();
144 909 : (Self { tx }, rx)
145 909 : }
146 :
147 907 : fn notify(self) {
148 907 : if self.tx.send(()).is_err() {
149 : // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
150 163 : debug!("deletion queue flush from dropped client");
151 744 : };
152 907 : }
153 : }
154 :
155 5513 : #[derive(Clone, Debug)]
156 : pub struct DeletionQueueClient {
157 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
158 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
159 :
160 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
161 : }
162 :
163 180 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
164 : struct TenantDeletionList {
165 : /// For each Timeline, a list of key fragments to append to the timeline remote path
166 : /// when reconstructing a full key
167 : timelines: HashMap<TimelineId, Vec<String>>,
168 :
169 : /// The generation in which this deletion was emitted: note that this may not be the
170 : /// same as the generation of any layers being deleted. The generation of the layer
171 : /// has already been absorbed into the keys in `objects`
172 : generation: Generation,
173 : }
174 :
175 : impl TenantDeletionList {
176 40 : pub(crate) fn len(&self) -> usize {
177 54 : self.timelines.values().map(|v| v.len()).sum()
178 40 : }
179 : }
180 :
181 : /// Files ending with this suffix will be ignored and erased
182 : /// during recovery as startup.
183 : const TEMP_SUFFIX: &str = "tmp";
184 :
185 297 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
186 : struct DeletionList {
187 : /// Serialization version, for future use
188 : version: u8,
189 :
190 : /// Used for constructing a unique key for each deletion list we write out.
191 : sequence: u64,
192 :
193 : /// To avoid repeating tenant/timeline IDs in every key, we store keys in
194 : /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
195 : /// with one unique generation ID: if someone tries to push a second generation
196 : /// ID for the same tenant, we will start a new DeletionList.
197 : tenants: HashMap<TenantShardId, TenantDeletionList>,
198 :
199 : /// Avoid having to walk `tenants` to calculate the number of keys in
200 : /// the nested deletion lists
201 : size: usize,
202 :
203 : /// Set to true when the list has undergone validation with the control
204 : /// plane and the remaining contents of `tenants` are valid. A list may
205 : /// also be implicitly marked valid by DeletionHeader.validated_sequence
206 : /// advancing to >= DeletionList.sequence
207 : #[serde(default)]
208 : #[serde(skip_serializing_if = "std::ops::Not::not")]
209 : validated: bool,
210 : }
211 :
212 90 : #[derive(Debug, Serialize, Deserialize)]
213 : struct DeletionHeader {
214 : /// Serialization version, for future use
215 : version: u8,
216 :
217 : /// The highest sequence number (inclusive) that has been validated. All deletion
218 : /// lists on disk with a sequence <= this value are safe to execute.
219 : validated_sequence: u64,
220 : }
221 :
222 : impl DeletionHeader {
223 : const VERSION_LATEST: u8 = 1;
224 :
225 38 : fn new(validated_sequence: u64) -> Self {
226 38 : Self {
227 38 : version: Self::VERSION_LATEST,
228 38 : validated_sequence,
229 38 : }
230 38 : }
231 :
232 38 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
233 0 : debug!("Saving deletion list header {:?}", self);
234 38 : let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
235 38 : let header_path = conf.deletion_header_path();
236 38 : let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
237 38 : VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
238 38 : .await
239 38 : .maybe_fatal_err("save deletion header")?;
240 :
241 38 : Ok(())
242 38 : }
243 : }
244 :
245 : impl DeletionList {
246 : const VERSION_LATEST: u8 = 1;
247 690 : fn new(sequence: u64) -> Self {
248 690 : Self {
249 690 : version: Self::VERSION_LATEST,
250 690 : sequence,
251 690 : tenants: HashMap::new(),
252 690 : size: 0,
253 690 : validated: false,
254 690 : }
255 690 : }
256 :
257 835 : fn is_empty(&self) -> bool {
258 835 : self.tenants.is_empty()
259 835 : }
260 :
261 5252 : fn len(&self) -> usize {
262 5252 : self.size
263 5252 : }
264 :
265 : /// Returns true if the push was accepted, false if the caller must start a new
266 : /// deletion list.
267 4100 : fn push(
268 4100 : &mut self,
269 4100 : tenant: &TenantShardId,
270 4100 : timeline: &TimelineId,
271 4100 : generation: Generation,
272 4100 : objects: &mut Vec<RemotePath>,
273 4100 : ) -> bool {
274 4100 : if objects.is_empty() {
275 : // Avoid inserting an empty TimelineDeletionList: this preserves the property
276 : // that if we have no keys, then self.objects is empty (used in Self::is_empty)
277 422 : return true;
278 3678 : }
279 3678 :
280 3678 : let tenant_entry = self
281 3678 : .tenants
282 3678 : .entry(*tenant)
283 3678 : .or_insert_with(|| TenantDeletionList {
284 112 : timelines: HashMap::new(),
285 112 : generation,
286 3678 : });
287 3678 :
288 3678 : if tenant_entry.generation != generation {
289 : // Only one generation per tenant per list: signal to
290 : // caller to start a new list.
291 3 : return false;
292 3675 : }
293 3675 :
294 3675 : let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
295 3675 :
296 3675 : let timeline_remote_path = remote_timeline_path(tenant, timeline);
297 3675 :
298 3675 : self.size += objects.len();
299 3675 : timeline_entry.extend(objects.drain(..).map(|p| {
300 3675 : p.strip_prefix(&timeline_remote_path)
301 3675 : .expect("Timeline paths always start with the timeline prefix")
302 3675 : .to_string()
303 3675 : }));
304 3675 : true
305 4100 : }
306 :
307 42 : fn into_remote_paths(self) -> Vec<RemotePath> {
308 42 : let mut result = Vec::new();
309 42 : for (tenant, tenant_deletions) in self.tenants.into_iter() {
310 42 : for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
311 42 : let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
312 42 : result.extend(
313 42 : timeline_layers
314 42 : .into_iter()
315 1082 : .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
316 42 : );
317 42 : }
318 : }
319 :
320 42 : result
321 42 : }
322 :
323 69 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
324 69 : let path = conf.deletion_list_path(self.sequence);
325 69 : let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
326 69 :
327 69 : let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
328 69 :
329 69 : VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
330 69 : .await
331 69 : .maybe_fatal_err("save deletion list")
332 69 : .map_err(Into::into)
333 69 : }
334 : }
335 :
336 : impl std::fmt::Display for DeletionList {
337 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 0 : write!(
339 0 : f,
340 0 : "DeletionList<seq={}, tenants={}, keys={}>",
341 0 : self.sequence,
342 0 : self.tenants.len(),
343 0 : self.size
344 0 : )
345 0 : }
346 : }
347 :
348 : struct PendingLsn {
349 : projected: Lsn,
350 : result_slot: Arc<AtomicLsn>,
351 : }
352 :
353 : struct TenantLsnState {
354 : timelines: HashMap<TimelineId, PendingLsn>,
355 :
356 : // In what generation was the most recent update proposed?
357 : generation: Generation,
358 : }
359 :
360 812 : #[derive(Default)]
361 : struct VisibleLsnUpdates {
362 : tenants: HashMap<TenantShardId, TenantLsnState>,
363 : }
364 :
365 : impl VisibleLsnUpdates {
366 717 : fn new() -> Self {
367 717 : Self {
368 717 : tenants: HashMap::new(),
369 717 : }
370 717 : }
371 : }
372 :
373 : impl std::fmt::Debug for VisibleLsnUpdates {
374 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375 0 : write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
376 0 : }
377 : }
378 :
379 0 : #[derive(Error, Debug)]
380 : pub enum DeletionQueueError {
381 : #[error("Deletion queue unavailable during shutdown")]
382 : ShuttingDown,
383 : }
384 :
385 : impl DeletionQueueClient {
386 0 : pub(crate) fn broken() -> Self {
387 0 : // Channels whose receivers are immediately dropped.
388 0 : let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
389 0 : let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
390 0 : Self {
391 0 : tx,
392 0 : executor_tx,
393 0 : lsn_table: Arc::default(),
394 0 : }
395 0 : }
396 :
397 : /// This is cancel-safe. If you drop the future before it completes, the message
398 : /// is not pushed, although in the context of the deletion queue it doesn't matter: once
399 : /// we decide to do a deletion the decision is always final.
400 5140 : fn do_push<T>(
401 5140 : &self,
402 5140 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
403 5140 : msg: T,
404 5140 : ) -> Result<(), DeletionQueueError> {
405 5140 : match queue.send(msg) {
406 5140 : Ok(_) => Ok(()),
407 0 : Err(e) => {
408 0 : // This shouldn't happen, we should shut down all tenants before
409 0 : // we shut down the global delete queue. If we encounter a bug like this,
410 0 : // we may leak objects as deletions won't be processed.
411 0 : error!("Deletion queue closed while pushing, shutting down? ({e})");
412 0 : Err(DeletionQueueError::ShuttingDown)
413 : }
414 : }
415 5140 : }
416 :
417 632 : pub(crate) fn recover(
418 632 : &self,
419 632 : attached_tenants: HashMap<TenantShardId, Generation>,
420 632 : ) -> Result<(), DeletionQueueError> {
421 632 : self.do_push(
422 632 : &self.tx,
423 632 : ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
424 632 : )
425 632 : }
426 :
427 : /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
428 : /// world, it must validate its generation number before doing so. Rather than do this synchronously,
429 : /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
430 : /// recently validated separately.
431 : ///
432 : /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The
433 : /// backend will later wake up and notice that the tenant's generation requires validation.
434 5964 : pub(crate) async fn update_remote_consistent_lsn(
435 5964 : &self,
436 5964 : tenant_shard_id: TenantShardId,
437 5964 : timeline_id: TimelineId,
438 5964 : current_generation: Generation,
439 5964 : lsn: Lsn,
440 5964 : result_slot: Arc<AtomicLsn>,
441 5964 : ) {
442 5964 : let mut locked = self
443 5964 : .lsn_table
444 5964 : .write()
445 5964 : .expect("Lock should never be poisoned");
446 5964 :
447 5964 : let tenant_entry = locked
448 5964 : .tenants
449 5964 : .entry(tenant_shard_id)
450 5964 : .or_insert(TenantLsnState {
451 5964 : timelines: HashMap::new(),
452 5964 : generation: current_generation,
453 5964 : });
454 5964 :
455 5964 : if tenant_entry.generation != current_generation {
456 2 : // Generation might have changed if we were detached and then re-attached: in this case,
457 2 : // state from the previous generation cannot be trusted.
458 2 : tenant_entry.timelines.clear();
459 2 : tenant_entry.generation = current_generation;
460 5962 : }
461 :
462 5964 : tenant_entry.timelines.insert(
463 5964 : timeline_id,
464 5964 : PendingLsn {
465 5964 : projected: lsn,
466 5964 : result_slot,
467 5964 : },
468 5964 : );
469 5964 : }
470 :
471 : /// Submit a list of layers for deletion: this function will return before the deletion is
472 : /// persistent, but it may be executed at any time after this function enters: do not push
473 : /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
474 : /// references them).
475 : ///
476 : /// The `current_generation` is the generation of this pageserver's current attachment. The
477 : /// generations in `layers` are the generations in which those layers were written.
478 4319 : pub(crate) async fn push_layers(
479 4319 : &self,
480 4319 : tenant_shard_id: TenantShardId,
481 4319 : timeline_id: TimelineId,
482 4319 : current_generation: Generation,
483 4319 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
484 4319 : ) -> Result<(), DeletionQueueError> {
485 4319 : if current_generation.is_none() {
486 0 : debug!("Enqueuing deletions in legacy mode, skipping queue");
487 :
488 60 : let mut layer_paths = Vec::new();
489 120 : for (layer, meta) in layers {
490 60 : layer_paths.push(remote_layer_path(
491 60 : &tenant_shard_id.tenant_id,
492 60 : &timeline_id,
493 60 : meta.shard,
494 60 : &layer,
495 60 : meta.generation,
496 60 : ));
497 60 : }
498 60 : self.push_immediate(layer_paths).await?;
499 105 : return self.flush_immediate().await;
500 4259 : }
501 4259 :
502 4259 : self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
503 4319 : }
504 :
505 : /// When a Tenant has a generation, push_layers is always synchronous because
506 : /// the ListValidator channel is an unbounded channel.
507 : ///
508 : /// This can be merged into push_layers when we remove the Generation-less mode
509 : /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
510 4259 : pub(crate) fn push_layers_sync(
511 4259 : &self,
512 4259 : tenant_shard_id: TenantShardId,
513 4259 : timeline_id: TimelineId,
514 4259 : current_generation: Generation,
515 4259 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
516 4259 : ) -> Result<(), DeletionQueueError> {
517 4259 : metrics::DELETION_QUEUE
518 4259 : .keys_submitted
519 4259 : .inc_by(layers.len() as u64);
520 4259 : self.do_push(
521 4259 : &self.tx,
522 4259 : ListWriterQueueMessage::Delete(DeletionOp {
523 4259 : tenant_shard_id,
524 4259 : timeline_id,
525 4259 : layers,
526 4259 : generation: current_generation,
527 4259 : objects: Vec::new(),
528 4259 : }),
529 4259 : )
530 4259 : }
531 :
532 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
533 249 : async fn do_flush<T>(
534 249 : &self,
535 249 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
536 249 : msg: T,
537 249 : rx: tokio::sync::oneshot::Receiver<()>,
538 249 : ) -> Result<(), DeletionQueueError> {
539 249 : self.do_push(queue, msg)?;
540 250 : if rx.await.is_err() {
541 : // This shouldn't happen if tenants are shut down before deletion queue. If we
542 : // encounter a bug like this, then a flusher will incorrectly believe it has flushed
543 : // when it hasn't, possibly leading to leaking objects.
544 0 : error!("Deletion queue dropped flush op while client was still waiting");
545 0 : Err(DeletionQueueError::ShuttingDown)
546 : } else {
547 249 : Ok(())
548 : }
549 249 : }
550 :
551 : /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
552 : ///
553 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
554 222 : pub async fn flush(&self) -> Result<(), DeletionQueueError> {
555 222 : let (flush_op, rx) = FlushOp::new();
556 222 : self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
557 223 : .await
558 222 : }
559 :
560 : /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
561 : /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
562 : /// detach where flushing is nice but not necessary.
563 : ///
564 : /// This function provides no guarantees of work being done.
565 163 : pub fn flush_advisory(&self) {
566 163 : let (flush_op, _) = FlushOp::new();
567 163 :
568 163 : // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
569 163 : drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
570 163 : }
571 :
572 : // Wait until all previous deletions are executed
573 27 : pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
574 0 : debug!("flush_execute: flushing to deletion lists...");
575 : // Flush any buffered work to deletion lists
576 27 : self.flush().await?;
577 :
578 : // Flush the backend into the executor of deletion lists
579 27 : let (flush_op, rx) = FlushOp::new();
580 0 : debug!("flush_execute: flushing backend...");
581 27 : self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
582 27 : .await?;
583 0 : debug!("flush_execute: finished flushing backend...");
584 :
585 : // Flush any immediate-mode deletions (the above backend flush will only flush
586 : // the executor if deletions had flowed through the backend)
587 0 : debug!("flush_execute: flushing execution...");
588 27 : self.flush_immediate().await?;
589 0 : debug!("flush_execute: finished flushing execution...");
590 27 : Ok(())
591 27 : }
592 :
593 : /// This interface bypasses the persistent deletion queue, and any validation
594 : /// that this pageserver is still elegible to execute the deletions. It is for
595 : /// use in timeline deletions, where the control plane is telling us we may
596 : /// delete everything in the timeline.
597 : ///
598 : /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
599 722 : pub(crate) async fn push_immediate(
600 722 : &self,
601 722 : objects: Vec<RemotePath>,
602 722 : ) -> Result<(), DeletionQueueError> {
603 722 : metrics::DELETION_QUEUE
604 722 : .keys_submitted
605 722 : .inc_by(objects.len() as u64);
606 722 : self.executor_tx
607 722 : .send(DeleterMessage::Delete(objects))
608 45 : .await
609 722 : .map_err(|_| DeletionQueueError::ShuttingDown)
610 722 : }
611 :
612 : /// Companion to push_immediate. When this returns Ok, all prior objects sent
613 : /// into push_immediate have been deleted from remote storage.
614 457 : pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
615 457 : let (flush_op, rx) = FlushOp::new();
616 457 : self.executor_tx
617 457 : .send(DeleterMessage::Flush(flush_op))
618 45 : .await
619 457 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
620 :
621 457 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
622 457 : }
623 : }
624 :
625 : impl DeletionQueue {
626 1258 : pub fn new_client(&self) -> DeletionQueueClient {
627 1258 : self.client.clone()
628 1258 : }
629 :
630 : /// Caller may use the returned object to construct clients with new_client.
631 : /// Caller should tokio::spawn the background() members of the two worker objects returned:
632 : /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
633 : ///
634 : /// If remote_storage is None, then the returned workers will also be None.
635 633 : pub fn new<C>(
636 633 : remote_storage: Option<GenericRemoteStorage>,
637 633 : control_plane_client: Option<C>,
638 633 : conf: &'static PageServerConf,
639 633 : ) -> (Self, Option<DeletionQueueWorkers<C>>)
640 633 : where
641 633 : C: ControlPlaneGenerationsApi + Send + Sync,
642 633 : {
643 633 : // Unbounded channel: enables non-async functions to submit deletions. The actual length is
644 633 : // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
645 633 : // enough to avoid this taking pathologically large amount of memory.
646 633 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
647 633 :
648 633 : // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
649 633 : let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
650 633 :
651 633 : // Shallow channel: it carries lists of paths, and we expect the main queueing to
652 633 : // happen in the backend (persistent), not in this queue.
653 633 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
654 633 :
655 633 : let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
656 633 :
657 633 : // The deletion queue has an independent cancellation token to
658 633 : // the general pageserver shutdown token, because it stays alive a bit
659 633 : // longer to flush after Tenants have all been torn down.
660 633 : let cancel = CancellationToken::new();
661 :
662 633 : let remote_storage = match remote_storage {
663 : None => {
664 0 : return (
665 0 : Self {
666 0 : client: DeletionQueueClient {
667 0 : tx,
668 0 : executor_tx,
669 0 : lsn_table: lsn_table.clone(),
670 0 : },
671 0 : cancel,
672 0 : },
673 0 : None,
674 0 : )
675 : }
676 633 : Some(r) => r,
677 633 : };
678 633 :
679 633 : (
680 633 : Self {
681 633 : client: DeletionQueueClient {
682 633 : tx,
683 633 : executor_tx: executor_tx.clone(),
684 633 : lsn_table: lsn_table.clone(),
685 633 : },
686 633 : cancel: cancel.clone(),
687 633 : },
688 633 : Some(DeletionQueueWorkers {
689 633 : frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
690 633 : backend: Validator::new(
691 633 : conf,
692 633 : backend_rx,
693 633 : executor_tx,
694 633 : control_plane_client,
695 633 : lsn_table.clone(),
696 633 : cancel.clone(),
697 633 : ),
698 633 : executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
699 633 : }),
700 633 : )
701 633 : }
702 :
703 183 : pub async fn shutdown(&mut self, timeout: Duration) {
704 184 : match tokio::time::timeout(timeout, self.client.flush()).await {
705 : Ok(Ok(())) => {
706 183 : tracing::info!("Deletion queue flushed successfully on shutdown")
707 : }
708 : Ok(Err(DeletionQueueError::ShuttingDown)) => {
709 : // This is not harmful for correctness, but is unexpected: the deletion
710 : // queue's workers should stay alive as long as there are any client handles instantiated.
711 0 : tracing::warn!("Deletion queue stopped prematurely");
712 : }
713 0 : Err(_timeout) => {
714 0 : tracing::warn!("Timed out flushing deletion queue on shutdown")
715 : }
716 : }
717 :
718 : // We only cancel _after_ flushing: otherwise we would be shutting down the
719 : // components that do the flush.
720 183 : self.cancel.cancel();
721 183 : }
722 : }
723 :
724 : #[cfg(test)]
725 : mod test {
726 : use camino::Utf8Path;
727 : use hex_literal::hex;
728 : use pageserver_api::shard::ShardIndex;
729 : use std::{io::ErrorKind, time::Duration};
730 : use tracing::info;
731 :
732 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
733 : use tokio::task::JoinHandle;
734 :
735 : use crate::{
736 : control_plane_client::RetryForeverError,
737 : repository::Key,
738 : tenant::{
739 : harness::TenantHarness, remote_timeline_client::remote_timeline_path,
740 : storage_layer::DeltaFileName,
741 : },
742 : };
743 :
744 : use super::*;
745 : pub const TIMELINE_ID: TimelineId =
746 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
747 :
748 : pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName {
749 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
750 : lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
751 : });
752 :
753 : // When you need a second layer in a test.
754 : pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName {
755 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
756 : lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
757 : });
758 :
759 : struct TestSetup {
760 : harness: TenantHarness,
761 : remote_fs_dir: Utf8PathBuf,
762 : storage: GenericRemoteStorage,
763 : mock_control_plane: MockControlPlane,
764 : deletion_queue: DeletionQueue,
765 : worker_join: JoinHandle<()>,
766 : }
767 :
768 : impl TestSetup {
769 : /// Simulate a pageserver restart by destroying and recreating the deletion queue
770 2 : async fn restart(&mut self) {
771 2 : let (deletion_queue, workers) = DeletionQueue::new(
772 2 : Some(self.storage.clone()),
773 2 : Some(self.mock_control_plane.clone()),
774 2 : self.harness.conf,
775 2 : );
776 :
777 0 : tracing::debug!("Spawning worker for new queue queue");
778 2 : let worker_join = workers
779 2 : .unwrap()
780 2 : .spawn_with(&tokio::runtime::Handle::current());
781 2 :
782 2 : let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
783 2 : let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
784 :
785 0 : tracing::debug!("Joining worker from previous queue");
786 2 : old_deletion_queue.cancel.cancel();
787 2 : old_worker_join
788 2 : .await
789 2 : .expect("Failed to join workers for previous deletion queue");
790 2 : }
791 :
792 6 : fn set_latest_generation(&self, gen: Generation) {
793 6 : let tenant_shard_id = self.harness.tenant_shard_id;
794 6 : self.mock_control_plane
795 6 : .latest_generation
796 6 : .lock()
797 6 : .unwrap()
798 6 : .insert(tenant_shard_id, gen);
799 6 : }
800 :
801 : /// Returns remote layer file name, suitable for use in assert_remote_files
802 6 : fn write_remote_layer(
803 6 : &self,
804 6 : file_name: LayerFileName,
805 6 : gen: Generation,
806 6 : ) -> anyhow::Result<String> {
807 6 : let tenant_shard_id = self.harness.tenant_shard_id;
808 6 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
809 6 : let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
810 6 : std::fs::create_dir_all(&remote_timeline_path)?;
811 6 : let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
812 6 :
813 6 : let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
814 6 :
815 6 : std::fs::write(
816 6 : remote_timeline_path.join(remote_layer_file_name.clone()),
817 6 : content,
818 6 : )?;
819 :
820 6 : Ok(remote_layer_file_name)
821 6 : }
822 : }
823 :
824 8 : #[derive(Debug, Clone)]
825 : struct MockControlPlane {
826 : pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
827 : }
828 :
829 : impl MockControlPlane {
830 6 : fn new() -> Self {
831 6 : Self {
832 6 : latest_generation: Arc::default(),
833 6 : }
834 6 : }
835 : }
836 :
837 : impl ControlPlaneGenerationsApi for MockControlPlane {
838 : #[allow(clippy::diverging_sub_expression)] // False positive via async_trait
839 0 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
840 0 : unimplemented!()
841 0 : }
842 8 : async fn validate(
843 8 : &self,
844 8 : tenants: Vec<(TenantShardId, Generation)>,
845 8 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
846 8 : let mut result = HashMap::new();
847 8 :
848 8 : let latest_generation = self.latest_generation.lock().unwrap();
849 :
850 16 : for (tenant_shard_id, generation) in tenants {
851 8 : if let Some(latest) = latest_generation.get(&tenant_shard_id) {
852 8 : result.insert(tenant_shard_id, *latest == generation);
853 8 : }
854 : }
855 :
856 8 : Ok(result)
857 8 : }
858 : }
859 :
860 6 : fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
861 6 : let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
862 6 : let harness = TenantHarness::create(test_name)?;
863 :
864 : // We do not load() the harness: we only need its config and remote_storage
865 :
866 : // Set up a GenericRemoteStorage targetting a directory
867 6 : let remote_fs_dir = harness.conf.workdir.join("remote_fs");
868 6 : std::fs::create_dir_all(remote_fs_dir)?;
869 6 : let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
870 6 : let storage_config = RemoteStorageConfig {
871 6 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
872 6 : };
873 6 : let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
874 6 :
875 6 : let mock_control_plane = MockControlPlane::new();
876 6 :
877 6 : let (deletion_queue, worker) = DeletionQueue::new(
878 6 : Some(storage.clone()),
879 6 : Some(mock_control_plane.clone()),
880 6 : harness.conf,
881 6 : );
882 6 :
883 6 : let worker = worker.unwrap();
884 6 : let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
885 6 :
886 6 : Ok(TestSetup {
887 6 : harness,
888 6 : remote_fs_dir,
889 6 : storage,
890 6 : mock_control_plane,
891 6 : deletion_queue,
892 6 : worker_join,
893 6 : })
894 6 : }
895 :
896 : // TODO: put this in a common location so that we can share with remote_timeline_client's tests
897 18 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
898 18 : let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
899 18 : expected.sort();
900 18 :
901 18 : let mut found: Vec<String> = Vec::new();
902 18 : let dir = match std::fs::read_dir(remote_path) {
903 18 : Ok(d) => d,
904 0 : Err(e) => {
905 0 : if e.kind() == ErrorKind::NotFound {
906 0 : if expected.is_empty() {
907 : // We are asserting prefix is empty: it is expected that the dir is missing
908 0 : return;
909 : } else {
910 0 : assert_eq!(expected, Vec::<String>::new());
911 0 : unreachable!();
912 : }
913 : } else {
914 0 : panic!("Unexpected error listing {remote_path}: {e}");
915 : }
916 : }
917 : };
918 :
919 18 : for entry in dir.flatten() {
920 16 : let entry_name = entry.file_name();
921 16 : let fname = entry_name.to_str().unwrap();
922 16 : found.push(String::from(fname));
923 16 : }
924 18 : found.sort();
925 18 :
926 18 : assert_eq!(expected, found);
927 18 : }
928 :
929 10 : fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
930 10 : let dir = match std::fs::read_dir(directory) {
931 8 : Ok(d) => d,
932 : Err(_) => {
933 2 : assert_eq!(expected, &Vec::<String>::new());
934 2 : return;
935 : }
936 : };
937 8 : let mut found = Vec::new();
938 18 : for dentry in dir {
939 10 : let dentry = dentry.unwrap();
940 10 : let file_name = dentry.file_name();
941 10 : let file_name_str = file_name.to_string_lossy();
942 10 : found.push(file_name_str.to_string());
943 10 : }
944 8 : found.sort();
945 8 : assert_eq!(expected, found);
946 10 : }
947 :
948 2 : #[tokio::test]
949 2 : async fn deletion_queue_smoke() -> anyhow::Result<()> {
950 2 : // Basic test that the deletion queue processes the deletions we pass into it
951 2 : let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
952 2 : let client = ctx.deletion_queue.new_client();
953 2 : client.recover(HashMap::new())?;
954 2 :
955 2 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
956 2 : let tenant_shard_id = ctx.harness.tenant_shard_id;
957 2 :
958 2 : let content: Vec<u8> = "victim1 contents".into();
959 2 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
960 2 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
961 2 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
962 2 :
963 2 : // Exercise the distinction between the generation of the layers
964 2 : // we delete, and the generation of the running Tenant.
965 2 : let layer_generation = Generation::new(0xdeadbeef);
966 2 : let now_generation = Generation::new(0xfeedbeef);
967 2 : let layer_metadata =
968 2 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
969 2 :
970 2 : let remote_layer_file_name_1 =
971 2 : format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
972 2 :
973 2 : // Set mock control plane state to valid for our generation
974 2 : ctx.set_latest_generation(now_generation);
975 2 :
976 2 : // Inject a victim file to remote storage
977 2 : info!("Writing");
978 2 : std::fs::create_dir_all(&remote_timeline_path)?;
979 2 : std::fs::write(
980 2 : remote_timeline_path.join(remote_layer_file_name_1.clone()),
981 2 : content,
982 2 : )?;
983 2 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
984 2 :
985 2 : // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
986 2 : info!("Pushing");
987 2 : client
988 2 : .push_layers(
989 2 : tenant_shard_id,
990 2 : TIMELINE_ID,
991 2 : now_generation,
992 2 : [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
993 2 : )
994 2 : .await?;
995 2 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
996 2 :
997 2 : assert_local_files(&[], &deletion_prefix);
998 2 :
999 2 : // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
1000 2 : info!("Flushing");
1001 2 : client.flush().await?;
1002 2 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
1003 2 : assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
1004 2 :
1005 2 : // File should go away when we execute
1006 2 : info!("Flush-executing");
1007 6 : client.flush_execute().await?;
1008 2 : assert_remote_files(&[], &remote_timeline_path);
1009 2 : assert_local_files(&["header-01"], &deletion_prefix);
1010 2 :
1011 2 : // Flushing on an empty queue should succeed immediately, and not write any lists
1012 2 : info!("Flush-executing on empty");
1013 6 : client.flush_execute().await?;
1014 2 : assert_local_files(&["header-01"], &deletion_prefix);
1015 2 :
1016 2 : Ok(())
1017 2 : }
1018 :
1019 2 : #[tokio::test]
1020 2 : async fn deletion_queue_validation() -> anyhow::Result<()> {
1021 2 : let ctx = setup("deletion_queue_validation").expect("Failed test setup");
1022 2 : let client = ctx.deletion_queue.new_client();
1023 2 : client.recover(HashMap::new())?;
1024 2 :
1025 2 : // Generation that the control plane thinks is current
1026 2 : let latest_generation = Generation::new(0xdeadbeef);
1027 2 : // Generation that our DeletionQueue thinks the tenant is running with
1028 2 : let stale_generation = latest_generation.previous();
1029 2 : // Generation that our example layer file was written with
1030 2 : let layer_generation = stale_generation.previous();
1031 2 : let layer_metadata =
1032 2 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1033 2 :
1034 2 : ctx.set_latest_generation(latest_generation);
1035 2 :
1036 2 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1037 2 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1038 2 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1039 2 :
1040 2 : // Initial state: a remote layer exists
1041 2 : let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1042 2 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1043 2 :
1044 2 : tracing::debug!("Pushing...");
1045 2 : client
1046 2 : .push_layers(
1047 2 : tenant_shard_id,
1048 2 : TIMELINE_ID,
1049 2 : stale_generation,
1050 2 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1051 2 : )
1052 2 : .await?;
1053 2 :
1054 2 : // We enqueued the operation in a stale generation: it should have failed validation
1055 2 : tracing::debug!("Flushing...");
1056 6 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1057 2 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1058 2 :
1059 2 : tracing::debug!("Pushing...");
1060 2 : client
1061 2 : .push_layers(
1062 2 : tenant_shard_id,
1063 2 : TIMELINE_ID,
1064 2 : latest_generation,
1065 2 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1066 2 : )
1067 2 : .await?;
1068 2 :
1069 2 : // We enqueued the operation in a fresh generation: it should have passed validation
1070 2 : tracing::debug!("Flushing...");
1071 6 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1072 2 : assert_remote_files(&[], &remote_timeline_path);
1073 2 :
1074 2 : Ok(())
1075 2 : }
1076 :
1077 2 : #[tokio::test]
1078 2 : async fn deletion_queue_recovery() -> anyhow::Result<()> {
1079 2 : // Basic test that the deletion queue processes the deletions we pass into it
1080 2 : let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
1081 2 : let client = ctx.deletion_queue.new_client();
1082 2 : client.recover(HashMap::new())?;
1083 2 :
1084 2 : let tenant_shard_id = ctx.harness.tenant_shard_id;
1085 2 :
1086 2 : let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
1087 2 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1088 2 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
1089 2 :
1090 2 : let layer_generation = Generation::new(0xdeadbeef);
1091 2 : let now_generation = Generation::new(0xfeedbeef);
1092 2 : let layer_metadata =
1093 2 : LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
1094 2 :
1095 2 : // Inject a deletion in the generation before generation_now: after restart,
1096 2 : // this deletion should _not_ get executed (only the immediately previous
1097 2 : // generation gets that treatment)
1098 2 : let remote_layer_file_name_historical =
1099 2 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1100 2 : client
1101 2 : .push_layers(
1102 2 : tenant_shard_id,
1103 2 : TIMELINE_ID,
1104 2 : now_generation.previous(),
1105 2 : [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
1106 2 : )
1107 2 : .await?;
1108 2 :
1109 2 : // Inject a deletion in the generation before generation_now: after restart,
1110 2 : // this deletion should get executed, because we execute deletions in the
1111 2 : // immediately previous generation on the same node.
1112 2 : let remote_layer_file_name_previous =
1113 2 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
1114 2 : client
1115 2 : .push_layers(
1116 2 : tenant_shard_id,
1117 2 : TIMELINE_ID,
1118 2 : now_generation,
1119 2 : [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
1120 2 : )
1121 2 : .await?;
1122 2 :
1123 2 : client.flush().await?;
1124 2 : assert_remote_files(
1125 2 : &[
1126 2 : &remote_layer_file_name_historical,
1127 2 : &remote_layer_file_name_previous,
1128 2 : ],
1129 2 : &remote_timeline_path,
1130 2 : );
1131 2 :
1132 2 : // Different generatinos for the same tenant will cause two separate
1133 2 : // deletion lists to be emitted.
1134 2 : assert_local_files(
1135 2 : &["0000000000000001-01.list", "0000000000000002-01.list"],
1136 2 : &deletion_prefix,
1137 2 : );
1138 2 :
1139 2 : // Simulate a node restart: the latest generation advances
1140 2 : let now_generation = now_generation.next();
1141 2 : ctx.set_latest_generation(now_generation);
1142 2 :
1143 2 : // Restart the deletion queue
1144 2 : drop(client);
1145 2 : ctx.restart().await;
1146 2 : let client = ctx.deletion_queue.new_client();
1147 2 : client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
1148 2 :
1149 2 : info!("Flush-executing");
1150 6 : client.flush_execute().await?;
1151 2 : // The deletion from immediately prior generation was executed, the one from
1152 2 : // an older generation was not.
1153 2 : assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
1154 2 : Ok(())
1155 2 : }
1156 : }
1157 :
1158 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
1159 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
1160 : #[cfg(test)]
1161 : pub(crate) mod mock {
1162 : use tracing::info;
1163 :
1164 : use crate::tenant::remote_timeline_client::remote_layer_path;
1165 :
1166 : use super::*;
1167 : use std::sync::{
1168 : atomic::{AtomicUsize, Ordering},
1169 : Arc,
1170 : };
1171 :
1172 : pub struct ConsumerState {
1173 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
1174 : executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
1175 : }
1176 :
1177 : impl ConsumerState {
1178 2 : async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
1179 2 : let mut executed = 0;
1180 :
1181 2 : info!("Executing all pending deletions");
1182 :
1183 : // Transform all executor messages to generic frontend messages
1184 2 : while let Ok(msg) = self.executor_rx.try_recv() {
1185 0 : match msg {
1186 0 : DeleterMessage::Delete(objects) => {
1187 0 : for path in objects {
1188 0 : match remote_storage.delete(&path).await {
1189 : Ok(_) => {
1190 0 : debug!("Deleted {path}");
1191 : }
1192 0 : Err(e) => {
1193 0 : error!("Failed to delete {path}, leaking object! ({e})");
1194 : }
1195 : }
1196 0 : executed += 1;
1197 : }
1198 : }
1199 0 : DeleterMessage::Flush(flush_op) => {
1200 0 : flush_op.notify();
1201 0 : }
1202 : }
1203 : }
1204 :
1205 4 : while let Ok(msg) = self.rx.try_recv() {
1206 2 : match msg {
1207 2 : ListWriterQueueMessage::Delete(op) => {
1208 2 : let mut objects = op.objects;
1209 4 : for (layer, meta) in op.layers {
1210 2 : objects.push(remote_layer_path(
1211 2 : &op.tenant_shard_id.tenant_id,
1212 2 : &op.timeline_id,
1213 2 : meta.shard,
1214 2 : &layer,
1215 2 : meta.generation,
1216 2 : ));
1217 2 : }
1218 :
1219 4 : for path in objects {
1220 2 : info!("Executing deletion {path}");
1221 2 : match remote_storage.delete(&path).await {
1222 : Ok(_) => {
1223 0 : debug!("Deleted {path}");
1224 : }
1225 0 : Err(e) => {
1226 0 : error!("Failed to delete {path}, leaking object! ({e})");
1227 : }
1228 : }
1229 2 : executed += 1;
1230 : }
1231 : }
1232 0 : ListWriterQueueMessage::Flush(op) => {
1233 0 : op.notify();
1234 0 : }
1235 0 : ListWriterQueueMessage::FlushExecute(op) => {
1236 0 : // We have already executed all prior deletions because mock does them inline
1237 0 : op.notify();
1238 0 : }
1239 0 : ListWriterQueueMessage::Recover(_) => {
1240 0 : // no-op in mock
1241 0 : }
1242 : }
1243 2 : info!("All pending deletions have been executed");
1244 : }
1245 :
1246 2 : executed
1247 2 : }
1248 : }
1249 :
1250 : pub struct MockDeletionQueue {
1251 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
1252 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
1253 : executed: Arc<AtomicUsize>,
1254 : remote_storage: Option<GenericRemoteStorage>,
1255 : consumer: std::sync::Mutex<ConsumerState>,
1256 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
1257 : }
1258 :
1259 : impl MockDeletionQueue {
1260 84 : pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
1261 84 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1262 84 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
1263 84 :
1264 84 : let executed = Arc::new(AtomicUsize::new(0));
1265 84 :
1266 84 : Self {
1267 84 : tx,
1268 84 : executor_tx,
1269 84 : executed,
1270 84 : remote_storage,
1271 84 : consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
1272 84 : lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
1273 84 : }
1274 84 : }
1275 :
1276 : #[allow(clippy::await_holding_lock)]
1277 2 : pub async fn pump(&self) {
1278 2 : if let Some(remote_storage) = &self.remote_storage {
1279 : // Permit holding mutex across await, because this is only ever
1280 : // called once at a time in tests.
1281 2 : let mut locked = self.consumer.lock().unwrap();
1282 2 : let count = locked.consume(remote_storage).await;
1283 2 : self.executed.fetch_add(count, Ordering::Relaxed);
1284 0 : }
1285 2 : }
1286 :
1287 96 : pub(crate) fn new_client(&self) -> DeletionQueueClient {
1288 96 : DeletionQueueClient {
1289 96 : tx: self.tx.clone(),
1290 96 : executor_tx: self.executor_tx.clone(),
1291 96 : lsn_table: self.lsn_table.clone(),
1292 96 : }
1293 96 : }
1294 : }
1295 :
1296 : /// Test round-trip serialization/deserialization, and test stability of the format
1297 : /// vs. a static expected string for the serialized version.
1298 2 : #[test]
1299 2 : fn deletion_list_serialization() -> anyhow::Result<()> {
1300 2 : let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
1301 2 : .to_string()
1302 2 : .parse::<TenantShardId>()?;
1303 2 : let timeline_id = "be322c834ed9e709e63b5c9698691910"
1304 2 : .to_string()
1305 2 : .parse::<TimelineId>()?;
1306 2 : let generation = Generation::new(123);
1307 :
1308 2 : let object =
1309 2 : RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
1310 2 : let mut objects = [object].to_vec();
1311 2 :
1312 2 : let mut example = DeletionList::new(1);
1313 2 : example.push(&tenant_id, &timeline_id, generation, &mut objects);
1314 :
1315 2 : let encoded = serde_json::to_string(&example)?;
1316 :
1317 2 : let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
1318 2 : assert_eq!(encoded, expected);
1319 :
1320 2 : let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
1321 2 : assert_eq!(example, decoded);
1322 :
1323 2 : Ok(())
1324 2 : }
1325 : }
|