TLA Line data Source code
1 : mod deleter;
2 : mod list_writer;
3 : mod validator;
4 :
5 : use std::collections::HashMap;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use crate::control_plane_client::ControlPlaneGenerationsApi;
10 : use crate::metrics;
11 : use crate::tenant::remote_timeline_client::remote_layer_path;
12 : use crate::tenant::remote_timeline_client::remote_timeline_path;
13 : use crate::virtual_file::VirtualFile;
14 : use anyhow::Context;
15 : use camino::Utf8PathBuf;
16 : use hex::FromHex;
17 : use remote_storage::{GenericRemoteStorage, RemotePath};
18 : use serde::Deserialize;
19 : use serde::Serialize;
20 : use serde_with::serde_as;
21 : use thiserror::Error;
22 : use tokio;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::Instrument;
25 : use tracing::{self, debug, error};
26 : use utils::crashsafe::path_with_suffix_extension;
27 : use utils::generation::Generation;
28 : use utils::id::{TenantId, TimelineId};
29 : use utils::lsn::AtomicLsn;
30 : use utils::lsn::Lsn;
31 :
32 : use self::deleter::Deleter;
33 : use self::list_writer::DeletionOp;
34 : use self::list_writer::ListWriter;
35 : use self::list_writer::RecoverOp;
36 : use self::validator::Validator;
37 : use deleter::DeleterMessage;
38 : use list_writer::ListWriterQueueMessage;
39 : use validator::ValidatorQueueMessage;
40 :
41 : use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
42 :
43 : // TODO: configurable for how long to wait before executing deletions
44 :
45 : /// We aggregate object deletions from many tenants in one place, for several reasons:
46 : /// - Coalesce deletions into fewer DeleteObjects calls
47 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
48 : /// to flush any outstanding deletions.
49 : /// - Globally control throughput of deletions, as these are a low priority task: do
50 : /// not compete with the same S3 clients/connections used for higher priority uploads.
51 : /// - Enable gating deletions on validation of a tenant's generation number, to make
52 : /// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
53 : ///
54 : /// There are two kinds of deletion: deferred and immediate. A deferred deletion
55 : /// may be intentionally delayed to protect passive readers of S3 data, and is
56 : /// subject to a generation number validation step. An immediate deletion is
57 : /// ready to execute immediately, and is only queued up so that it can be coalesced
58 : /// with other deletions in flight.
59 : ///
60 : /// Deferred deletions pass through three steps:
61 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
62 : /// DeletionLists, which are persisted to disk.
63 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
64 : /// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn
65 : /// updates for running timelines.
66 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
67 : /// batches of 1000 keys via DeleteObjects.
68 : ///
69 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
70 : /// two stages and are passed straight into the Deleter.
71 : ///
72 : /// Internally, each stage is joined by a channel to the next. On disk, there is only
73 : /// one queue (of DeletionLists), which is written by the frontend and consumed
74 : /// by the backend.
75 CBC 144 : #[derive(Clone)]
76 : pub struct DeletionQueue {
77 : client: DeletionQueueClient,
78 :
79 : // Parent cancellation token for the tokens passed into background workers
80 : cancel: CancellationToken,
81 : }
82 :
83 : /// Opaque wrapper around individual worker tasks, to avoid making the
84 : /// worker objects themselves public
85 : pub struct DeletionQueueWorkers<C>
86 : where
87 : C: ControlPlaneGenerationsApi + Send + Sync,
88 : {
89 : frontend: ListWriter,
90 : backend: Validator<C>,
91 : executor: Deleter,
92 : }
93 :
94 : impl<C> DeletionQueueWorkers<C>
95 : where
96 : C: ControlPlaneGenerationsApi + Send + Sync + 'static,
97 : {
98 564 : pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
99 564 : let jh_frontend = runtime.spawn(async move {
100 564 : self.frontend
101 564 : .background()
102 564 : .instrument(tracing::info_span!(parent:None, "deletion frontend"))
103 749 : .await
104 564 : });
105 564 : let jh_backend = runtime.spawn(async move {
106 564 : self.backend
107 564 : .background()
108 564 : .instrument(tracing::info_span!(parent:None, "deletion backend"))
109 612 : .await
110 564 : });
111 564 : let jh_executor = runtime.spawn(async move {
112 564 : self.executor
113 564 : .background()
114 564 : .instrument(tracing::info_span!(parent:None, "deletion executor"))
115 12750 : .await
116 564 : });
117 564 :
118 564 : runtime.spawn({
119 564 : async move {
120 564 : jh_frontend.await.expect("error joining frontend worker");
121 145 : jh_backend.await.expect("error joining backend worker");
122 145 : drop(jh_executor.await.expect("error joining executor worker"));
123 564 : }
124 564 : })
125 564 : }
126 : }
127 :
128 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
129 : /// another channel, and the receive side will receive a message when the channel
130 : /// we're flushing has reached the FlushOp we sent into it.
131 : ///
132 : /// The only extra behavior beyond the channel is that the notify() method does not
133 : /// return an error when the receive side has been dropped, because in this use case
134 : /// it is harmless (the code that initiated the flush no longer cares about the result).
135 UBC 0 : #[derive(Debug)]
136 : struct FlushOp {
137 : tx: tokio::sync::oneshot::Sender<()>,
138 : }
139 :
140 : impl FlushOp {
141 CBC 1189 : fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
142 1189 : let (tx, rx) = tokio::sync::oneshot::channel::<()>();
143 1189 : (Self { tx }, rx)
144 1189 : }
145 :
146 1187 : fn notify(self) {
147 1187 : if self.tx.send(()).is_err() {
148 : // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
149 38 : debug!("deletion queue flush from dropped client");
150 1149 : };
151 1187 : }
152 : }
153 :
154 5566 : #[derive(Clone, Debug)]
155 : pub struct DeletionQueueClient {
156 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
157 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
158 :
159 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
160 : }
161 :
162 48 : #[derive(Debug, Serialize, Deserialize)]
163 : struct TenantDeletionList {
164 : /// For each Timeline, a list of key fragments to append to the timeline remote path
165 : /// when reconstructing a full key
166 : #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
167 : timelines: HashMap<TimelineId, Vec<String>>,
168 :
169 : /// The generation in which this deletion was emitted: note that this may not be the
170 : /// same as the generation of any layers being deleted. The generation of the layer
171 : /// has already been absorbed into the keys in `objects`
172 : generation: Generation,
173 : }
174 :
175 : impl TenantDeletionList {
176 14 : pub(crate) fn len(&self) -> usize {
177 14 : self.timelines.values().map(|v| v.len()).sum()
178 14 : }
179 : }
180 :
181 : /// For HashMaps using a `hex` compatible key, where we would like to encode the key as a string
182 34 : fn to_hex_map<S, V, I>(input: &HashMap<I, V>, serializer: S) -> Result<S::Ok, S::Error>
183 34 : where
184 34 : S: serde::Serializer,
185 34 : V: Serialize,
186 34 : I: AsRef<[u8]>,
187 34 : {
188 34 : let transformed = input.iter().map(|(k, v)| (hex::encode(k), v));
189 34 :
190 34 : transformed
191 34 : .collect::<HashMap<String, &V>>()
192 34 : .serialize(serializer)
193 34 : }
194 :
195 : /// For HashMaps using a FromHex key, where we would like to decode the key
196 16 : fn from_hex_map<'de, D, V, I>(deserializer: D) -> Result<HashMap<I, V>, D::Error>
197 16 : where
198 16 : D: serde::de::Deserializer<'de>,
199 16 : V: Deserialize<'de>,
200 16 : I: FromHex + std::hash::Hash + Eq,
201 16 : {
202 16 : let hex_map = HashMap::<String, V>::deserialize(deserializer)?;
203 16 : hex_map
204 16 : .into_iter()
205 16 : .map(|(k, v)| {
206 16 : I::from_hex(k)
207 16 : .map(|k| (k, v))
208 16 : .map_err(|_| serde::de::Error::custom("Invalid hex ID"))
209 16 : })
210 16 : .collect()
211 16 : }
212 :
213 : /// Files ending with this suffix will be ignored and erased
214 : /// during recovery as startup.
215 : const TEMP_SUFFIX: &str = "tmp";
216 :
217 : #[serde_as]
218 80 : #[derive(Debug, Serialize, Deserialize)]
219 : struct DeletionList {
220 : /// Serialization version, for future use
221 : version: u8,
222 :
223 : /// Used for constructing a unique key for each deletion list we write out.
224 : sequence: u64,
225 :
226 : /// To avoid repeating tenant/timeline IDs in every key, we store keys in
227 : /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
228 : /// with one unique generation ID: if someone tries to push a second generation
229 : /// ID for the same tenant, we will start a new DeletionList.
230 : #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
231 : tenants: HashMap<TenantId, TenantDeletionList>,
232 :
233 : /// Avoid having to walk `tenants` to calculate the number of keys in
234 : /// the nested deletion lists
235 : size: usize,
236 :
237 : /// Set to true when the list has undergone validation with the control
238 : /// plane and the remaining contents of `tenants` are valid. A list may
239 : /// also be implicitly marked valid by DeletionHeader.validated_sequence
240 : /// advancing to >= DeletionList.sequence
241 : #[serde(default)]
242 : #[serde(skip_serializing_if = "std::ops::Not::not")]
243 : validated: bool,
244 : }
245 :
246 : #[serde_as]
247 35 : #[derive(Debug, Serialize, Deserialize)]
248 : struct DeletionHeader {
249 : /// Serialization version, for future use
250 : version: u8,
251 :
252 : /// The highest sequence number (inclusive) that has been validated. All deletion
253 : /// lists on disk with a sequence <= this value are safe to execute.
254 : validated_sequence: u64,
255 : }
256 :
257 : impl DeletionHeader {
258 : const VERSION_LATEST: u8 = 1;
259 :
260 13 : fn new(validated_sequence: u64) -> Self {
261 13 : Self {
262 13 : version: Self::VERSION_LATEST,
263 13 : validated_sequence,
264 13 : }
265 13 : }
266 :
267 13 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
268 UBC 0 : debug!("Saving deletion list header {:?}", self);
269 CBC 13 : let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
270 13 : let header_path = conf.deletion_header_path();
271 13 : let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
272 13 : VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
273 UBC 0 : .await
274 CBC 13 : .map_err(Into::into)
275 13 : }
276 : }
277 :
278 : impl DeletionList {
279 : const VERSION_LATEST: u8 = 1;
280 579 : fn new(sequence: u64) -> Self {
281 579 : Self {
282 579 : version: Self::VERSION_LATEST,
283 579 : sequence,
284 579 : tenants: HashMap::new(),
285 579 : size: 0,
286 579 : validated: false,
287 579 : }
288 579 : }
289 :
290 511 : fn is_empty(&self) -> bool {
291 511 : self.tenants.is_empty()
292 511 : }
293 :
294 349 : fn len(&self) -> usize {
295 349 : self.size
296 349 : }
297 :
298 : /// Returns true if the push was accepted, false if the caller must start a new
299 : /// deletion list.
300 62 : fn push(
301 62 : &mut self,
302 62 : tenant: &TenantId,
303 62 : timeline: &TimelineId,
304 62 : generation: Generation,
305 62 : objects: &mut Vec<RemotePath>,
306 62 : ) -> bool {
307 62 : if objects.is_empty() {
308 : // Avoid inserting an empty TimelineDeletionList: this preserves the property
309 : // that if we have no keys, then self.objects is empty (used in Self::is_empty)
310 13 : return true;
311 49 : }
312 49 :
313 49 : let tenant_entry = self
314 49 : .tenants
315 49 : .entry(*tenant)
316 49 : .or_insert_with(|| TenantDeletionList {
317 16 : timelines: HashMap::new(),
318 16 : generation,
319 49 : });
320 49 :
321 49 : if tenant_entry.generation != generation {
322 : // Only one generation per tenant per list: signal to
323 : // caller to start a new list.
324 1 : return false;
325 48 : }
326 48 :
327 48 : let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
328 48 :
329 48 : let timeline_remote_path = remote_timeline_path(tenant, timeline);
330 48 :
331 48 : self.size += objects.len();
332 1672 : timeline_entry.extend(objects.drain(..).map(|p| {
333 1672 : p.strip_prefix(&timeline_remote_path)
334 1672 : .expect("Timeline paths always start with the timeline prefix")
335 1672 : .to_string()
336 1672 : }));
337 48 : true
338 62 : }
339 :
340 16 : fn into_remote_paths(self) -> Vec<RemotePath> {
341 16 : let mut result = Vec::new();
342 16 : for (tenant, tenant_deletions) in self.tenants.into_iter() {
343 12 : for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
344 12 : let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
345 12 : result.extend(
346 12 : timeline_layers
347 12 : .into_iter()
348 1636 : .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
349 12 : );
350 12 : }
351 : }
352 :
353 16 : result
354 16 : }
355 :
356 19 : async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
357 19 : let path = conf.deletion_list_path(self.sequence);
358 19 : let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
359 19 :
360 19 : let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
361 19 : VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
362 UBC 0 : .await
363 CBC 19 : .map_err(Into::into)
364 19 : }
365 : }
366 :
367 : impl std::fmt::Display for DeletionList {
368 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369 0 : write!(
370 0 : f,
371 0 : "DeletionList<seq={}, tenants={}, keys={}>",
372 0 : self.sequence,
373 0 : self.tenants.len(),
374 0 : self.size
375 0 : )
376 0 : }
377 : }
378 :
379 : struct PendingLsn {
380 : projected: Lsn,
381 : result_slot: Arc<AtomicLsn>,
382 : }
383 :
384 : struct TenantLsnState {
385 : timelines: HashMap<TimelineId, PendingLsn>,
386 :
387 : // In what generation was the most recent update proposed?
388 : generation: Generation,
389 : }
390 :
391 CBC 381 : #[derive(Default)]
392 : struct VisibleLsnUpdates {
393 : tenants: HashMap<TenantId, TenantLsnState>,
394 : }
395 :
396 : impl VisibleLsnUpdates {
397 605 : fn new() -> Self {
398 605 : Self {
399 605 : tenants: HashMap::new(),
400 605 : }
401 605 : }
402 : }
403 :
404 : impl std::fmt::Debug for VisibleLsnUpdates {
405 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 0 : write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
407 0 : }
408 : }
409 :
410 0 : #[derive(Error, Debug)]
411 : pub enum DeletionQueueError {
412 : #[error("Deletion queue unavailable during shutdown")]
413 : ShuttingDown,
414 : }
415 :
416 : impl DeletionQueueClient {
417 0 : pub(crate) fn broken() -> Self {
418 0 : // Channels whose receivers are immediately dropped.
419 0 : let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
420 0 : let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
421 0 : Self {
422 0 : tx,
423 0 : executor_tx,
424 0 : lsn_table: Arc::default(),
425 0 : }
426 0 : }
427 :
428 : /// This is cancel-safe. If you drop the future before it completes, the message
429 : /// is not pushed, although in the context of the deletion queue it doesn't matter: once
430 : /// we decide to do a deletion the decision is always final.
431 CBC 294 : fn do_push<T>(
432 294 : &self,
433 294 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
434 294 : msg: T,
435 294 : ) -> Result<(), DeletionQueueError> {
436 294 : match queue.send(msg) {
437 294 : Ok(_) => Ok(()),
438 UBC 0 : Err(e) => {
439 0 : // This shouldn't happen, we should shut down all tenants before
440 0 : // we shut down the global delete queue. If we encounter a bug like this,
441 0 : // we may leak objects as deletions won't be processed.
442 0 : error!("Deletion queue closed while pushing, shutting down? ({e})");
443 0 : Err(DeletionQueueError::ShuttingDown)
444 : }
445 : }
446 CBC 294 : }
447 :
448 35 : pub(crate) fn recover(
449 35 : &self,
450 35 : attached_tenants: HashMap<TenantId, Generation>,
451 35 : ) -> Result<(), DeletionQueueError> {
452 35 : self.do_push(
453 35 : &self.tx,
454 35 : ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
455 35 : )
456 35 : }
457 :
458 : /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
459 : /// world, it must validate its generation number before doing so. Rather than do this synchronously,
460 : /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
461 : /// recently validated separately.
462 : ///
463 : /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The
464 : /// backend will later wake up and notice that the tenant's generation requires validation.
465 428 : pub(crate) async fn update_remote_consistent_lsn(
466 428 : &self,
467 428 : tenant_id: TenantId,
468 428 : timeline_id: TimelineId,
469 428 : current_generation: Generation,
470 428 : lsn: Lsn,
471 428 : result_slot: Arc<AtomicLsn>,
472 428 : ) {
473 428 : let mut locked = self
474 428 : .lsn_table
475 428 : .write()
476 428 : .expect("Lock should never be poisoned");
477 428 :
478 428 : let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState {
479 428 : timelines: HashMap::new(),
480 428 : generation: current_generation,
481 428 : });
482 428 :
483 428 : if tenant_entry.generation != current_generation {
484 UBC 0 : // Generation might have changed if we were detached and then re-attached: in this case,
485 0 : // state from the previous generation cannot be trusted.
486 0 : tenant_entry.timelines.clear();
487 0 : tenant_entry.generation = current_generation;
488 CBC 428 : }
489 :
490 428 : tenant_entry.timelines.insert(
491 428 : timeline_id,
492 428 : PendingLsn {
493 428 : projected: lsn,
494 428 : result_slot,
495 428 : },
496 428 : );
497 428 : }
498 :
499 : /// Submit a list of layers for deletion: this function will return before the deletion is
500 : /// persistent, but it may be executed at any time after this function enters: do not push
501 : /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
502 : /// references them).
503 : ///
504 : /// The `current_generation` is the generation of this pageserver's current attachment. The
505 : /// generations in `layers` are the generations in which those layers were written.
506 591 : pub(crate) async fn push_layers(
507 591 : &self,
508 591 : tenant_id: TenantId,
509 591 : timeline_id: TimelineId,
510 591 : current_generation: Generation,
511 591 : layers: Vec<(LayerFileName, Generation)>,
512 591 : ) -> Result<(), DeletionQueueError> {
513 591 : if current_generation.is_none() {
514 UBC 0 : debug!("Enqueuing deletions in legacy mode, skipping queue");
515 CBC 518 : let mut layer_paths = Vec::new();
516 4356 : for (layer, generation) in layers {
517 3838 : layer_paths.push(remote_layer_path(
518 3838 : &tenant_id,
519 3838 : &timeline_id,
520 3838 : &layer,
521 3838 : generation,
522 3838 : ));
523 3838 : }
524 518 : self.push_immediate(layer_paths).await?;
525 518 : return self.flush_immediate().await;
526 73 : }
527 73 :
528 73 : self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
529 591 : }
530 :
531 : /// When a Tenant has a generation, push_layers is always synchronous because
532 : /// the ListValidator channel is an unbounded channel.
533 : ///
534 : /// This can be merged into push_layers when we remove the Generation-less mode
535 : /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
536 73 : pub(crate) fn push_layers_sync(
537 73 : &self,
538 73 : tenant_id: TenantId,
539 73 : timeline_id: TimelineId,
540 73 : current_generation: Generation,
541 73 : layers: Vec<(LayerFileName, Generation)>,
542 73 : ) -> Result<(), DeletionQueueError> {
543 73 : metrics::DELETION_QUEUE
544 73 : .keys_submitted
545 73 : .inc_by(layers.len() as u64);
546 73 : self.do_push(
547 73 : &self.tx,
548 73 : ListWriterQueueMessage::Delete(DeletionOp {
549 73 : tenant_id,
550 73 : timeline_id,
551 73 : layers,
552 73 : generation: current_generation,
553 73 : objects: Vec::new(),
554 73 : }),
555 73 : )
556 73 : }
557 :
558 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
559 186 : async fn do_flush<T>(
560 186 : &self,
561 186 : queue: &tokio::sync::mpsc::UnboundedSender<T>,
562 186 : msg: T,
563 186 : rx: tokio::sync::oneshot::Receiver<()>,
564 186 : ) -> Result<(), DeletionQueueError> {
565 186 : self.do_push(queue, msg)?;
566 186 : if rx.await.is_err() {
567 : // This shouldn't happen if tenants are shut down before deletion queue. If we
568 : // encounter a bug like this, then a flusher will incorrectly believe it has flushed
569 : // when it hasn't, possibly leading to leaking objects.
570 UBC 0 : error!("Deletion queue dropped flush op while client was still waiting");
571 0 : Err(DeletionQueueError::ShuttingDown)
572 : } else {
573 CBC 186 : Ok(())
574 : }
575 186 : }
576 :
577 : /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
578 : ///
579 : /// This is cancel-safe. If you drop the future the flush may still happen in the background.
580 170 : pub async fn flush(&self) -> Result<(), DeletionQueueError> {
581 170 : let (flush_op, rx) = FlushOp::new();
582 170 : self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
583 170 : .await
584 170 : }
585 :
586 : /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
587 : /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
588 : /// detach where flushing is nice but not necessary.
589 : ///
590 : /// This function provides no guarantees of work being done.
591 38 : pub fn flush_advisory(&self) {
592 38 : let (flush_op, _) = FlushOp::new();
593 38 :
594 38 : // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
595 38 : drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
596 38 : }
597 :
598 : // Wait until all previous deletions are executed
599 16 : pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
600 UBC 0 : debug!("flush_execute: flushing to deletion lists...");
601 : // Flush any buffered work to deletion lists
602 CBC 16 : self.flush().await?;
603 :
604 : // Flush the backend into the executor of deletion lists
605 16 : let (flush_op, rx) = FlushOp::new();
606 UBC 0 : debug!("flush_execute: flushing backend...");
607 CBC 16 : self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
608 16 : .await?;
609 UBC 0 : debug!("flush_execute: finished flushing backend...");
610 :
611 : // Flush any immediate-mode deletions (the above backend flush will only flush
612 : // the executor if deletions had flowed through the backend)
613 0 : debug!("flush_execute: flushing execution...");
614 CBC 16 : self.flush_immediate().await?;
615 UBC 0 : debug!("flush_execute: finished flushing execution...");
616 CBC 16 : Ok(())
617 16 : }
618 :
619 : /// This interface bypasses the persistent deletion queue, and any validation
620 : /// that this pageserver is still elegible to execute the deletions. It is for
621 : /// use in timeline deletions, where the control plane is telling us we may
622 : /// delete everything in the timeline.
623 : ///
624 : /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
625 986 : pub(crate) async fn push_immediate(
626 986 : &self,
627 986 : objects: Vec<RemotePath>,
628 986 : ) -> Result<(), DeletionQueueError> {
629 986 : metrics::DELETION_QUEUE
630 986 : .keys_submitted
631 986 : .inc_by(objects.len() as u64);
632 986 : self.executor_tx
633 986 : .send(DeleterMessage::Delete(objects))
634 UBC 0 : .await
635 CBC 986 : .map_err(|_| DeletionQueueError::ShuttingDown)
636 986 : }
637 :
638 : /// Companion to push_immediate. When this returns Ok, all prior objects sent
639 : /// into push_immediate have been deleted from remote storage.
640 950 : pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
641 950 : let (flush_op, rx) = FlushOp::new();
642 950 : self.executor_tx
643 950 : .send(DeleterMessage::Flush(flush_op))
644 UBC 0 : .await
645 CBC 950 : .map_err(|_| DeletionQueueError::ShuttingDown)?;
646 :
647 950 : rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
648 950 : }
649 : }
650 :
651 : impl DeletionQueue {
652 1124 : pub fn new_client(&self) -> DeletionQueueClient {
653 1124 : self.client.clone()
654 1124 : }
655 :
656 : /// Caller may use the returned object to construct clients with new_client.
657 : /// Caller should tokio::spawn the background() members of the two worker objects returned:
658 : /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
659 : ///
660 : /// If remote_storage is None, then the returned workers will also be None.
661 564 : pub fn new<C>(
662 564 : remote_storage: Option<GenericRemoteStorage>,
663 564 : control_plane_client: Option<C>,
664 564 : conf: &'static PageServerConf,
665 564 : ) -> (Self, Option<DeletionQueueWorkers<C>>)
666 564 : where
667 564 : C: ControlPlaneGenerationsApi + Send + Sync,
668 564 : {
669 564 : // Unbounded channel: enables non-async functions to submit deletions. The actual length is
670 564 : // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
671 564 : // enough to avoid this taking pathologically large amount of memory.
672 564 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
673 564 :
674 564 : // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
675 564 : let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
676 564 :
677 564 : // Shallow channel: it carries lists of paths, and we expect the main queueing to
678 564 : // happen in the backend (persistent), not in this queue.
679 564 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
680 564 :
681 564 : let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
682 564 :
683 564 : // The deletion queue has an independent cancellation token to
684 564 : // the general pageserver shutdown token, because it stays alive a bit
685 564 : // longer to flush after Tenants have all been torn down.
686 564 : let cancel = CancellationToken::new();
687 :
688 564 : let remote_storage = match remote_storage {
689 : None => {
690 UBC 0 : return (
691 0 : Self {
692 0 : client: DeletionQueueClient {
693 0 : tx,
694 0 : executor_tx,
695 0 : lsn_table: lsn_table.clone(),
696 0 : },
697 0 : cancel,
698 0 : },
699 0 : None,
700 0 : )
701 : }
702 CBC 564 : Some(r) => r,
703 564 : };
704 564 :
705 564 : (
706 564 : Self {
707 564 : client: DeletionQueueClient {
708 564 : tx,
709 564 : executor_tx: executor_tx.clone(),
710 564 : lsn_table: lsn_table.clone(),
711 564 : },
712 564 : cancel: cancel.clone(),
713 564 : },
714 564 : Some(DeletionQueueWorkers {
715 564 : frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
716 564 : backend: Validator::new(
717 564 : conf,
718 564 : backend_rx,
719 564 : executor_tx,
720 564 : control_plane_client,
721 564 : lsn_table.clone(),
722 564 : cancel.clone(),
723 564 : ),
724 564 : executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
725 564 : }),
726 564 : )
727 564 : }
728 :
729 144 : pub async fn shutdown(&mut self, timeout: Duration) {
730 144 : self.cancel.cancel();
731 144 :
732 144 : match tokio::time::timeout(timeout, self.client.flush()).await {
733 : Ok(Ok(())) => {
734 144 : tracing::info!("Deletion queue flushed successfully on shutdown")
735 : }
736 : Ok(Err(DeletionQueueError::ShuttingDown)) => {
737 : // This is not harmful for correctness, but is unexpected: the deletion
738 : // queue's workers should stay alive as long as there are any client handles instantiated.
739 UBC 0 : tracing::warn!("Deletion queue stopped prematurely");
740 : }
741 0 : Err(_timeout) => {
742 0 : tracing::warn!("Timed out flushing deletion queue on shutdown")
743 : }
744 : }
745 CBC 144 : }
746 : }
747 :
748 : #[cfg(test)]
749 : mod test {
750 : use camino::Utf8Path;
751 : use hex_literal::hex;
752 : use std::{io::ErrorKind, time::Duration};
753 : use tracing::info;
754 :
755 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
756 : use tokio::task::JoinHandle;
757 :
758 : use crate::{
759 : control_plane_client::RetryForeverError,
760 : repository::Key,
761 : tenant::{
762 : harness::TenantHarness, remote_timeline_client::remote_timeline_path,
763 : storage_layer::DeltaFileName,
764 : },
765 : };
766 :
767 : use super::*;
768 : pub const TIMELINE_ID: TimelineId =
769 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
770 :
771 : pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName {
772 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
773 : lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
774 : });
775 :
776 : // When you need a second layer in a test.
777 : pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName {
778 : key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
779 : lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
780 : });
781 :
782 : struct TestSetup {
783 : harness: TenantHarness,
784 : remote_fs_dir: Utf8PathBuf,
785 : storage: GenericRemoteStorage,
786 : mock_control_plane: MockControlPlane,
787 : deletion_queue: DeletionQueue,
788 : worker_join: JoinHandle<()>,
789 : }
790 :
791 : impl TestSetup {
792 : /// Simulate a pageserver restart by destroying and recreating the deletion queue
793 1 : async fn restart(&mut self) {
794 1 : let (deletion_queue, workers) = DeletionQueue::new(
795 1 : Some(self.storage.clone()),
796 1 : Some(self.mock_control_plane.clone()),
797 1 : self.harness.conf,
798 1 : );
799 :
800 UBC 0 : tracing::debug!("Spawning worker for new queue queue");
801 CBC 1 : let worker_join = workers
802 1 : .unwrap()
803 1 : .spawn_with(&tokio::runtime::Handle::current());
804 1 :
805 1 : let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
806 1 : let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
807 :
808 UBC 0 : tracing::debug!("Joining worker from previous queue");
809 CBC 1 : old_deletion_queue.cancel.cancel();
810 1 : old_worker_join
811 1 : .await
812 1 : .expect("Failed to join workers for previous deletion queue");
813 1 : }
814 :
815 3 : fn set_latest_generation(&self, gen: Generation) {
816 3 : let tenant_id = self.harness.tenant_id;
817 3 : self.mock_control_plane
818 3 : .latest_generation
819 3 : .lock()
820 3 : .unwrap()
821 3 : .insert(tenant_id, gen);
822 3 : }
823 :
824 : /// Returns remote layer file name, suitable for use in assert_remote_files
825 3 : fn write_remote_layer(
826 3 : &self,
827 3 : file_name: LayerFileName,
828 3 : gen: Generation,
829 3 : ) -> anyhow::Result<String> {
830 3 : let tenant_id = self.harness.tenant_id;
831 3 : let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
832 3 : let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
833 3 : std::fs::create_dir_all(&remote_timeline_path)?;
834 3 : let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
835 3 :
836 3 : let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
837 3 :
838 3 : std::fs::write(
839 3 : remote_timeline_path.join(remote_layer_file_name.clone()),
840 3 : content,
841 3 : )?;
842 :
843 3 : Ok(remote_layer_file_name)
844 3 : }
845 : }
846 :
847 4 : #[derive(Debug, Clone)]
848 : struct MockControlPlane {
849 : pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantId, Generation>>>,
850 : }
851 :
852 : impl MockControlPlane {
853 3 : fn new() -> Self {
854 3 : Self {
855 3 : latest_generation: Arc::default(),
856 3 : }
857 3 : }
858 : }
859 :
860 : #[async_trait::async_trait]
861 : impl ControlPlaneGenerationsApi for MockControlPlane {
862 : #[allow(clippy::diverging_sub_expression)] // False positive via async_trait
863 UBC 0 : async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError> {
864 0 : unimplemented!()
865 0 : }
866 CBC 4 : async fn validate(
867 4 : &self,
868 4 : tenants: Vec<(TenantId, Generation)>,
869 4 : ) -> Result<HashMap<TenantId, bool>, RetryForeverError> {
870 4 : let mut result = HashMap::new();
871 4 :
872 4 : let latest_generation = self.latest_generation.lock().unwrap();
873 :
874 8 : for (tenant_id, generation) in tenants {
875 4 : if let Some(latest) = latest_generation.get(&tenant_id) {
876 4 : result.insert(tenant_id, *latest == generation);
877 4 : }
878 : }
879 :
880 4 : Ok(result)
881 8 : }
882 : }
883 :
884 3 : fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
885 3 : let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
886 3 : let harness = TenantHarness::create(test_name)?;
887 :
888 : // We do not load() the harness: we only need its config and remote_storage
889 :
890 : // Set up a GenericRemoteStorage targetting a directory
891 3 : let remote_fs_dir = harness.conf.workdir.join("remote_fs");
892 3 : std::fs::create_dir_all(remote_fs_dir)?;
893 3 : let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
894 3 : let storage_config = RemoteStorageConfig {
895 3 : max_concurrent_syncs: std::num::NonZeroUsize::new(
896 3 : remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
897 3 : )
898 3 : .unwrap(),
899 3 : max_sync_errors: std::num::NonZeroU32::new(
900 3 : remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
901 3 : )
902 3 : .unwrap(),
903 3 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
904 3 : };
905 3 : let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
906 3 :
907 3 : let mock_control_plane = MockControlPlane::new();
908 3 :
909 3 : let (deletion_queue, worker) = DeletionQueue::new(
910 3 : Some(storage.clone()),
911 3 : Some(mock_control_plane.clone()),
912 3 : harness.conf,
913 3 : );
914 3 :
915 3 : let worker = worker.unwrap();
916 3 : let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
917 3 :
918 3 : Ok(TestSetup {
919 3 : harness,
920 3 : remote_fs_dir,
921 3 : storage,
922 3 : mock_control_plane,
923 3 : deletion_queue,
924 3 : worker_join,
925 3 : })
926 3 : }
927 :
928 : // TODO: put this in a common location so that we can share with remote_timeline_client's tests
929 9 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
930 9 : let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
931 9 : expected.sort();
932 9 :
933 9 : let mut found: Vec<String> = Vec::new();
934 9 : let dir = match std::fs::read_dir(remote_path) {
935 9 : Ok(d) => d,
936 UBC 0 : Err(e) => {
937 0 : if e.kind() == ErrorKind::NotFound {
938 0 : if expected.is_empty() {
939 : // We are asserting prefix is empty: it is expected that the dir is missing
940 0 : return;
941 : } else {
942 0 : assert_eq!(expected, Vec::<String>::new());
943 0 : unreachable!();
944 : }
945 : } else {
946 0 : panic!("Unexpected error listing {remote_path}: {e}");
947 : }
948 : }
949 : };
950 :
951 CBC 9 : for entry in dir.flatten() {
952 8 : let entry_name = entry.file_name();
953 8 : let fname = entry_name.to_str().unwrap();
954 8 : found.push(String::from(fname));
955 8 : }
956 9 : found.sort();
957 9 :
958 9 : assert_eq!(expected, found);
959 9 : }
960 :
961 5 : fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
962 5 : let dir = match std::fs::read_dir(directory) {
963 4 : Ok(d) => d,
964 : Err(_) => {
965 1 : assert_eq!(expected, &Vec::<String>::new());
966 1 : return;
967 : }
968 : };
969 4 : let mut found = Vec::new();
970 9 : for dentry in dir {
971 5 : let dentry = dentry.unwrap();
972 5 : let file_name = dentry.file_name();
973 5 : let file_name_str = file_name.to_string_lossy();
974 5 : found.push(file_name_str.to_string());
975 5 : }
976 4 : found.sort();
977 4 : assert_eq!(expected, found);
978 5 : }
979 :
980 1 : #[tokio::test]
981 1 : async fn deletion_queue_smoke() -> anyhow::Result<()> {
982 1 : // Basic test that the deletion queue processes the deletions we pass into it
983 1 : let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
984 1 : let client = ctx.deletion_queue.new_client();
985 1 : client.recover(HashMap::new())?;
986 :
987 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
988 1 : let tenant_id = ctx.harness.tenant_id;
989 1 :
990 1 : let content: Vec<u8> = "victim1 contents".into();
991 1 : let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
992 1 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
993 1 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
994 1 :
995 1 : // Exercise the distinction between the generation of the layers
996 1 : // we delete, and the generation of the running Tenant.
997 1 : let layer_generation = Generation::new(0xdeadbeef);
998 1 : let now_generation = Generation::new(0xfeedbeef);
999 1 :
1000 1 : let remote_layer_file_name_1 =
1001 1 : format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
1002 1 :
1003 1 : // Set mock control plane state to valid for our generation
1004 1 : ctx.set_latest_generation(now_generation);
1005 1 :
1006 1 : // Inject a victim file to remote storage
1007 1 : info!("Writing");
1008 1 : std::fs::create_dir_all(&remote_timeline_path)?;
1009 1 : std::fs::write(
1010 1 : remote_timeline_path.join(remote_layer_file_name_1.clone()),
1011 1 : content,
1012 1 : )?;
1013 1 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
1014 1 :
1015 1 : // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
1016 1 : info!("Pushing");
1017 1 : client
1018 1 : .push_layers(
1019 1 : tenant_id,
1020 1 : TIMELINE_ID,
1021 1 : now_generation,
1022 1 : [(layer_file_name_1.clone(), layer_generation)].to_vec(),
1023 1 : )
1024 UBC 0 : .await?;
1025 CBC 1 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
1026 1 :
1027 1 : assert_local_files(&[], &deletion_prefix);
1028 1 :
1029 1 : // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
1030 1 : info!("Flushing");
1031 1 : client.flush().await?;
1032 1 : assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
1033 1 : assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
1034 1 :
1035 1 : // File should go away when we execute
1036 1 : info!("Flush-executing");
1037 3 : client.flush_execute().await?;
1038 1 : assert_remote_files(&[], &remote_timeline_path);
1039 1 : assert_local_files(&["header-01"], &deletion_prefix);
1040 1 :
1041 1 : // Flushing on an empty queue should succeed immediately, and not write any lists
1042 1 : info!("Flush-executing on empty");
1043 3 : client.flush_execute().await?;
1044 1 : assert_local_files(&["header-01"], &deletion_prefix);
1045 1 :
1046 1 : Ok(())
1047 : }
1048 :
1049 1 : #[tokio::test]
1050 1 : async fn deletion_queue_validation() -> anyhow::Result<()> {
1051 1 : let ctx = setup("deletion_queue_validation").expect("Failed test setup");
1052 1 : let client = ctx.deletion_queue.new_client();
1053 1 : client.recover(HashMap::new())?;
1054 :
1055 : // Generation that the control plane thinks is current
1056 1 : let latest_generation = Generation::new(0xdeadbeef);
1057 1 : // Generation that our DeletionQueue thinks the tenant is running with
1058 1 : let stale_generation = latest_generation.previous();
1059 1 : // Generation that our example layer file was written with
1060 1 : let layer_generation = stale_generation.previous();
1061 1 :
1062 1 : ctx.set_latest_generation(latest_generation);
1063 1 :
1064 1 : let tenant_id = ctx.harness.tenant_id;
1065 1 : let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
1066 1 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1067 :
1068 : // Initial state: a remote layer exists
1069 1 : let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1070 1 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1071 1 :
1072 1 : tracing::debug!("Pushing...");
1073 1 : client
1074 1 : .push_layers(
1075 1 : tenant_id,
1076 1 : TIMELINE_ID,
1077 1 : stale_generation,
1078 1 : [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
1079 1 : )
1080 UBC 0 : .await?;
1081 :
1082 : // We enqueued the operation in a stale generation: it should have failed validation
1083 CBC 1 : tracing::debug!("Flushing...");
1084 3 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1085 1 : assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
1086 1 :
1087 1 : tracing::debug!("Pushing...");
1088 1 : client
1089 1 : .push_layers(
1090 1 : tenant_id,
1091 1 : TIMELINE_ID,
1092 1 : latest_generation,
1093 1 : [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
1094 1 : )
1095 UBC 0 : .await?;
1096 :
1097 : // We enqueued the operation in a fresh generation: it should have passed validation
1098 CBC 1 : tracing::debug!("Flushing...");
1099 3 : tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
1100 1 : assert_remote_files(&[], &remote_timeline_path);
1101 1 :
1102 1 : Ok(())
1103 : }
1104 :
1105 1 : #[tokio::test]
1106 1 : async fn deletion_queue_recovery() -> anyhow::Result<()> {
1107 1 : // Basic test that the deletion queue processes the deletions we pass into it
1108 1 : let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
1109 1 : let client = ctx.deletion_queue.new_client();
1110 1 : client.recover(HashMap::new())?;
1111 :
1112 1 : let tenant_id = ctx.harness.tenant_id;
1113 1 :
1114 1 : let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
1115 1 : let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
1116 1 : let deletion_prefix = ctx.harness.conf.deletion_prefix();
1117 1 :
1118 1 : let layer_generation = Generation::new(0xdeadbeef);
1119 1 : let now_generation = Generation::new(0xfeedbeef);
1120 :
1121 : // Inject a deletion in the generation before generation_now: after restart,
1122 : // this deletion should _not_ get executed (only the immediately previous
1123 : // generation gets that treatment)
1124 1 : let remote_layer_file_name_historical =
1125 1 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
1126 1 : client
1127 1 : .push_layers(
1128 1 : tenant_id,
1129 1 : TIMELINE_ID,
1130 1 : now_generation.previous(),
1131 1 : [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
1132 1 : )
1133 UBC 0 : .await?;
1134 :
1135 : // Inject a deletion in the generation before generation_now: after restart,
1136 : // this deletion should get executed, because we execute deletions in the
1137 : // immediately previous generation on the same node.
1138 CBC 1 : let remote_layer_file_name_previous =
1139 1 : ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
1140 1 : client
1141 1 : .push_layers(
1142 1 : tenant_id,
1143 1 : TIMELINE_ID,
1144 1 : now_generation,
1145 1 : [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
1146 1 : )
1147 UBC 0 : .await?;
1148 :
1149 CBC 1 : client.flush().await?;
1150 1 : assert_remote_files(
1151 1 : &[
1152 1 : &remote_layer_file_name_historical,
1153 1 : &remote_layer_file_name_previous,
1154 1 : ],
1155 1 : &remote_timeline_path,
1156 1 : );
1157 1 :
1158 1 : // Different generatinos for the same tenant will cause two separate
1159 1 : // deletion lists to be emitted.
1160 1 : assert_local_files(
1161 1 : &["0000000000000001-01.list", "0000000000000002-01.list"],
1162 1 : &deletion_prefix,
1163 1 : );
1164 1 :
1165 1 : // Simulate a node restart: the latest generation advances
1166 1 : let now_generation = now_generation.next();
1167 1 : ctx.set_latest_generation(now_generation);
1168 1 :
1169 1 : // Restart the deletion queue
1170 1 : drop(client);
1171 1 : ctx.restart().await;
1172 1 : let client = ctx.deletion_queue.new_client();
1173 1 : client.recover(HashMap::from([(tenant_id, now_generation)]))?;
1174 :
1175 1 : info!("Flush-executing");
1176 3 : client.flush_execute().await?;
1177 : // The deletion from immediately prior generation was executed, the one from
1178 : // an older generation was not.
1179 1 : assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
1180 1 : Ok(())
1181 : }
1182 : }
1183 :
1184 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
1185 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
1186 : #[cfg(test)]
1187 : pub(crate) mod mock {
1188 : use tracing::info;
1189 :
1190 : use crate::tenant::remote_timeline_client::remote_layer_path;
1191 :
1192 : use super::*;
1193 : use std::sync::{
1194 : atomic::{AtomicUsize, Ordering},
1195 : Arc,
1196 : };
1197 :
1198 : pub struct ConsumerState {
1199 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
1200 : executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
1201 : }
1202 :
1203 : impl ConsumerState {
1204 1 : async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
1205 1 : let mut executed = 0;
1206 :
1207 1 : info!("Executing all pending deletions");
1208 :
1209 : // Transform all executor messages to generic frontend messages
1210 1 : while let Ok(msg) = self.executor_rx.try_recv() {
1211 UBC 0 : match msg {
1212 0 : DeleterMessage::Delete(objects) => {
1213 0 : for path in objects {
1214 0 : match remote_storage.delete(&path).await {
1215 : Ok(_) => {
1216 0 : debug!("Deleted {path}");
1217 : }
1218 0 : Err(e) => {
1219 0 : error!("Failed to delete {path}, leaking object! ({e})");
1220 : }
1221 : }
1222 0 : executed += 1;
1223 : }
1224 : }
1225 0 : DeleterMessage::Flush(flush_op) => {
1226 0 : flush_op.notify();
1227 0 : }
1228 : }
1229 : }
1230 :
1231 CBC 2 : while let Ok(msg) = self.rx.try_recv() {
1232 1 : match msg {
1233 1 : ListWriterQueueMessage::Delete(op) => {
1234 1 : let mut objects = op.objects;
1235 2 : for (layer, generation) in op.layers {
1236 1 : objects.push(remote_layer_path(
1237 1 : &op.tenant_id,
1238 1 : &op.timeline_id,
1239 1 : &layer,
1240 1 : generation,
1241 1 : ));
1242 1 : }
1243 :
1244 2 : for path in objects {
1245 1 : info!("Executing deletion {path}");
1246 1 : match remote_storage.delete(&path).await {
1247 : Ok(_) => {
1248 UBC 0 : debug!("Deleted {path}");
1249 : }
1250 0 : Err(e) => {
1251 0 : error!("Failed to delete {path}, leaking object! ({e})");
1252 : }
1253 : }
1254 CBC 1 : executed += 1;
1255 : }
1256 : }
1257 UBC 0 : ListWriterQueueMessage::Flush(op) => {
1258 0 : op.notify();
1259 0 : }
1260 0 : ListWriterQueueMessage::FlushExecute(op) => {
1261 0 : // We have already executed all prior deletions because mock does them inline
1262 0 : op.notify();
1263 0 : }
1264 0 : ListWriterQueueMessage::Recover(_) => {
1265 0 : // no-op in mock
1266 0 : }
1267 : }
1268 CBC 1 : info!("All pending deletions have been executed");
1269 : }
1270 :
1271 1 : executed
1272 1 : }
1273 : }
1274 :
1275 : pub struct MockDeletionQueue {
1276 : tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
1277 : executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
1278 : executed: Arc<AtomicUsize>,
1279 : remote_storage: Option<GenericRemoteStorage>,
1280 : consumer: std::sync::Mutex<ConsumerState>,
1281 : lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
1282 : }
1283 :
1284 : impl MockDeletionQueue {
1285 41 : pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
1286 41 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1287 41 : let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
1288 41 :
1289 41 : let executed = Arc::new(AtomicUsize::new(0));
1290 41 :
1291 41 : Self {
1292 41 : tx,
1293 41 : executor_tx,
1294 41 : executed,
1295 41 : remote_storage,
1296 41 : consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
1297 41 : lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
1298 41 : }
1299 41 : }
1300 :
1301 : #[allow(clippy::await_holding_lock)]
1302 1 : pub async fn pump(&self) {
1303 1 : if let Some(remote_storage) = &self.remote_storage {
1304 : // Permit holding mutex across await, because this is only ever
1305 : // called once at a time in tests.
1306 1 : let mut locked = self.consumer.lock().unwrap();
1307 1 : let count = locked.consume(remote_storage).await;
1308 1 : self.executed.fetch_add(count, Ordering::Relaxed);
1309 UBC 0 : }
1310 CBC 1 : }
1311 :
1312 47 : pub(crate) fn new_client(&self) -> DeletionQueueClient {
1313 47 : DeletionQueueClient {
1314 47 : tx: self.tx.clone(),
1315 47 : executor_tx: self.executor_tx.clone(),
1316 47 : lsn_table: self.lsn_table.clone(),
1317 47 : }
1318 47 : }
1319 : }
1320 : }
|