Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in
95 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
96 : //! It is initialized based on the [`IndexPart`] that was passed during init
97 : //! and updated with every `schedule_*` function call.
98 : //! All this is necessary necessary to compute the future [`IndexPart`]s
99 : //! when scheduling an operation while other operations that also affect the
100 : //! remote [`IndexPart`] are in flight.
101 : //!
102 : //! # Retries & Error Handling
103 : //!
104 : //! The client retries operations indefinitely, using exponential back-off.
105 : //! There is no way to force a retry, i.e., interrupt the back-off.
106 : //! This could be built easily.
107 : //!
108 : //! # Cancellation
109 : //!
110 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
111 : //! the client's tenant and timeline.
112 : //! Dropping the client will drop queued operations but not executing operations.
113 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
114 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
115 : //!
116 : //! # Completion
117 : //!
118 : //! Once an operation has completed, we update
119 : //! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
120 : //! and submit a request through the DeletionQueue to update
121 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
122 : //! validated that our generation is not stale. It is this visible value
123 : //! that is advertized to safekeepers as a signal that that they can
124 : //! delete the WAL up to that LSN.
125 : //!
126 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
127 : //! for all pending operations to complete. It does not prevent more
128 : //! operations from getting scheduled.
129 : //!
130 : //! # Crash Consistency
131 : //!
132 : //! We do not persist the upload queue state.
133 : //! If we drop the client, or crash, all unfinished operations are lost.
134 : //!
135 : //! To recover, the following steps need to be taken:
136 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
137 : //! consistent remote state, assuming the user scheduled the operations in
138 : //! the correct order.
139 : //! - Initiate upload queue with that [`IndexPart`].
140 : //! - Reschedule all lost operations by comparing the local filesystem state
141 : //! and remote state as per [`IndexPart`]. This is done in
142 : //! [`Tenant::timeline_init_and_sync`].
143 : //!
144 : //! Note that if we crash during file deletion between the index update
145 : //! that removes the file from the list of files, and deleting the remote file,
146 : //! the file is leaked in the remote storage. Similarly, if a new file is created
147 : //! and uploaded, but the pageserver dies permanently before updating the
148 : //! remote index file, the new file is leaked in remote storage. We accept and
149 : //! tolerate that for now.
150 : //! Note further that we cannot easily fix this by scheduling deletes for every
151 : //! file that is present only on the remote, because we cannot distinguish the
152 : //! following two cases:
153 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
154 : //! but crashed before it finished remotely.
155 : //! - (2) We never had the file locally because we haven't on-demand downloaded
156 : //! it yet.
157 : //!
158 : //! # Downloads
159 : //!
160 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
161 : //! downloading files from the remote storage. Downloads are performed immediately
162 : //! against the `RemoteStorage`, independently of the upload queue.
163 : //!
164 : //! When we attach a tenant, we perform the following steps:
165 : //! - create `Tenant` object in `TenantState::Attaching` state
166 : //! - List timelines that are present in remote storage, and for each:
167 : //! - download their remote [`IndexPart`]s
168 : //! - create `Timeline` struct and a `RemoteTimelineClient`
169 : //! - initialize the client's upload queue with its `IndexPart`
170 : //! - schedule uploads for layers that are only present locally.
171 : //! - After the above is done for each timeline, open the tenant for business by
172 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
173 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
174 : //!
175 : //! # Operating Without Remote Storage
176 : //!
177 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
178 : //! not created and the uploads are skipped.
179 : //!
180 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
181 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
182 :
183 : pub(crate) mod download;
184 : pub mod index;
185 : pub(crate) mod upload;
186 :
187 : use anyhow::Context;
188 : use camino::Utf8Path;
189 : use chrono::{NaiveDateTime, Utc};
190 :
191 : pub(crate) use download::download_initdb_tar_zst;
192 : use pageserver_api::shard::{ShardIndex, TenantShardId};
193 : use scopeguard::ScopeGuard;
194 : use tokio_util::sync::CancellationToken;
195 : pub(crate) use upload::upload_initdb_dir;
196 : use utils::backoff::{
197 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
198 : };
199 :
200 : use std::collections::{HashMap, VecDeque};
201 : use std::sync::atomic::{AtomicU32, Ordering};
202 : use std::sync::{Arc, Mutex};
203 : use std::time::Duration;
204 :
205 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath, TimeoutOrCancel};
206 : use std::ops::DerefMut;
207 : use tracing::{debug, error, info, instrument, warn};
208 : use tracing::{info_span, Instrument};
209 : use utils::lsn::Lsn;
210 :
211 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
212 : use crate::metrics::{
213 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
214 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
215 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
216 : };
217 : use crate::task_mgr::shutdown_token;
218 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
219 : use crate::tenant::remote_timeline_client::download::download_retry;
220 : use crate::tenant::storage_layer::AsLayerDesc;
221 : use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
222 : use crate::tenant::TIMELINES_SEGMENT_NAME;
223 : use crate::{
224 : config::PageServerConf,
225 : task_mgr,
226 : task_mgr::TaskKind,
227 : task_mgr::BACKGROUND_RUNTIME,
228 : tenant::metadata::TimelineMetadata,
229 : tenant::upload_queue::{
230 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
231 : },
232 : TENANT_HEATMAP_BASENAME,
233 : };
234 :
235 : use utils::id::{TenantId, TimelineId};
236 :
237 : use self::index::IndexPart;
238 :
239 : use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
240 : use super::upload_queue::SetDeletedFlagProgress;
241 : use super::Generation;
242 :
243 : pub(crate) use download::{is_temp_download_file, list_remote_timelines};
244 : pub(crate) use index::LayerFileMetadata;
245 :
246 : // Occasional network issues and such can cause remote operations to fail, and
247 : // that's expected. If a download fails, we log it at info-level, and retry.
248 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
249 : // level instead, as repeated failures can mean a more serious problem. If it
250 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
251 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
252 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
253 :
254 : // Similarly log failed uploads and deletions at WARN level, after this many
255 : // retries. Uploads and deletions are retried forever, though.
256 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
257 :
258 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
259 :
260 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
261 :
262 : /// Default buffer size when interfacing with [`tokio::fs::File`].
263 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
264 :
265 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
266 : /// which we warn and skip.
267 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
268 :
269 : pub enum MaybeDeletedIndexPart {
270 : IndexPart(IndexPart),
271 : Deleted(IndexPart),
272 : }
273 :
274 0 : #[derive(Debug, thiserror::Error)]
275 : pub enum PersistIndexPartWithDeletedFlagError {
276 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
277 : AlreadyInProgress(NaiveDateTime),
278 : #[error("the deleted_flag was already set, value is {0:?}")]
279 : AlreadyDeleted(NaiveDateTime),
280 : #[error(transparent)]
281 : Other(#[from] anyhow::Error),
282 : }
283 :
284 : /// A client for accessing a timeline's data in remote storage.
285 : ///
286 : /// This takes care of managing the number of connections, and balancing them
287 : /// across tenants. This also handles retries of failed uploads.
288 : ///
289 : /// Upload and delete requests are ordered so that before a deletion is
290 : /// performed, we wait for all preceding uploads to finish. This ensures sure
291 : /// that if you perform a compaction operation that reshuffles data in layer
292 : /// files, we don't have a transient state where the old files have already been
293 : /// deleted, but new files have not yet been uploaded.
294 : ///
295 : /// Similarly, this enforces an order between index-file uploads, and layer
296 : /// uploads. Before an index-file upload is performed, all preceding layer
297 : /// uploads must be finished.
298 : ///
299 : /// This also maintains a list of remote files, and automatically includes that
300 : /// in the index part file, whenever timeline metadata is uploaded.
301 : ///
302 : /// Downloads are not queued, they are performed immediately.
303 : pub struct RemoteTimelineClient {
304 : conf: &'static PageServerConf,
305 :
306 : runtime: tokio::runtime::Handle,
307 :
308 : tenant_shard_id: TenantShardId,
309 : timeline_id: TimelineId,
310 : generation: Generation,
311 :
312 : upload_queue: Mutex<UploadQueue>,
313 :
314 : metrics: Arc<RemoteTimelineClientMetrics>,
315 :
316 : storage_impl: GenericRemoteStorage,
317 :
318 : deletion_queue_client: DeletionQueueClient,
319 :
320 : cancel: CancellationToken,
321 : }
322 :
323 : impl RemoteTimelineClient {
324 : ///
325 : /// Create a remote storage client for given timeline
326 : ///
327 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
328 : /// by calling init_upload_queue.
329 : ///
330 320 : pub fn new(
331 320 : remote_storage: GenericRemoteStorage,
332 320 : deletion_queue_client: DeletionQueueClient,
333 320 : conf: &'static PageServerConf,
334 320 : tenant_shard_id: TenantShardId,
335 320 : timeline_id: TimelineId,
336 320 : generation: Generation,
337 320 : ) -> RemoteTimelineClient {
338 320 : RemoteTimelineClient {
339 320 : conf,
340 320 : runtime: if cfg!(test) {
341 : // remote_timeline_client.rs tests rely on current-thread runtime
342 320 : tokio::runtime::Handle::current()
343 : } else {
344 0 : BACKGROUND_RUNTIME.handle().clone()
345 : },
346 320 : tenant_shard_id,
347 320 : timeline_id,
348 320 : generation,
349 320 : storage_impl: remote_storage,
350 320 : deletion_queue_client,
351 320 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
352 320 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
353 320 : &tenant_shard_id,
354 320 : &timeline_id,
355 320 : )),
356 320 : cancel: CancellationToken::new(),
357 320 : }
358 320 : }
359 :
360 : /// Initialize the upload queue for a remote storage that already received
361 : /// an index file upload, i.e., it's not empty.
362 : /// The given `index_part` must be the one on the remote.
363 6 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
364 6 : let mut upload_queue = self.upload_queue.lock().unwrap();
365 6 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
366 6 : self.update_remote_physical_size_gauge(Some(index_part));
367 6 : info!(
368 6 : "initialized upload queue from remote index with {} layer files",
369 6 : index_part.layer_metadata.len()
370 6 : );
371 6 : Ok(())
372 6 : }
373 :
374 : /// Initialize the upload queue for the case where the remote storage is empty,
375 : /// i.e., it doesn't have an `IndexPart`.
376 314 : pub fn init_upload_queue_for_empty_remote(
377 314 : &self,
378 314 : local_metadata: &TimelineMetadata,
379 314 : ) -> anyhow::Result<()> {
380 314 : let mut upload_queue = self.upload_queue.lock().unwrap();
381 314 : upload_queue.initialize_empty_remote(local_metadata)?;
382 314 : self.update_remote_physical_size_gauge(None);
383 314 : info!("initialized upload queue as empty");
384 314 : Ok(())
385 314 : }
386 :
387 : /// Initialize the queue in stopped state. Used in startup path
388 : /// to continue deletion operation interrupted by pageserver crash or restart.
389 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
390 0 : &self,
391 0 : index_part: &IndexPart,
392 0 : ) -> anyhow::Result<()> {
393 : // FIXME: consider newtype for DeletedIndexPart.
394 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
395 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
396 0 : ))?;
397 :
398 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
399 0 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
400 0 : self.update_remote_physical_size_gauge(Some(index_part));
401 0 : self.stop_impl(&mut upload_queue);
402 0 :
403 0 : upload_queue
404 0 : .stopped_mut()
405 0 : .expect("stopped above")
406 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
407 0 :
408 0 : Ok(())
409 0 : }
410 :
411 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
412 0 : match &mut *self.upload_queue.lock().unwrap() {
413 0 : UploadQueue::Uninitialized => None,
414 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
415 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
416 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
417 0 : .upload_queue_for_deletion
418 0 : .get_last_remote_consistent_lsn_projected(),
419 : }
420 0 : }
421 :
422 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
423 0 : match &mut *self.upload_queue.lock().unwrap() {
424 0 : UploadQueue::Uninitialized => None,
425 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
426 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
427 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
428 0 : q.upload_queue_for_deletion
429 0 : .get_last_remote_consistent_lsn_visible(),
430 0 : ),
431 : }
432 0 : }
433 :
434 1247 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
435 1247 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
436 933 : current_remote_index_part
437 933 : .layer_metadata
438 933 : .values()
439 933 : // If we don't have the file size for the layer, don't account for it in the metric.
440 4323 : .map(|ilmd| ilmd.file_size)
441 933 : .sum()
442 : } else {
443 314 : 0
444 : };
445 1247 : self.metrics.remote_physical_size_set(size);
446 1247 : }
447 :
448 2 : pub fn get_remote_physical_size(&self) -> u64 {
449 2 : self.metrics.remote_physical_size_get()
450 2 : }
451 :
452 : //
453 : // Download operations.
454 : //
455 : // These don't use the per-timeline queue. They do use the global semaphore in
456 : // S3Bucket, to limit the total number of concurrent operations, though.
457 : //
458 :
459 : /// Download index file
460 20 : pub async fn download_index_file(
461 20 : &self,
462 20 : cancel: &CancellationToken,
463 20 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
464 20 : let _unfinished_gauge_guard = self.metrics.call_begin(
465 20 : &RemoteOpFileKind::Index,
466 20 : &RemoteOpKind::Download,
467 20 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
468 20 : reason: "no need for a downloads gauge",
469 20 : },
470 20 : );
471 :
472 20 : let index_part = download::download_index_part(
473 20 : &self.storage_impl,
474 20 : &self.tenant_shard_id,
475 20 : &self.timeline_id,
476 20 : self.generation,
477 20 : cancel,
478 20 : )
479 20 : .measure_remote_op(
480 20 : RemoteOpFileKind::Index,
481 20 : RemoteOpKind::Download,
482 20 : Arc::clone(&self.metrics),
483 20 : )
484 106 : .await?;
485 :
486 20 : if index_part.deleted_at.is_some() {
487 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
488 : } else {
489 20 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
490 : }
491 20 : }
492 :
493 : /// Download a (layer) file from `path`, into local filesystem.
494 : ///
495 : /// 'layer_metadata' is the metadata from the remote index file.
496 : ///
497 : /// On success, returns the size of the downloaded file.
498 6 : pub async fn download_layer_file(
499 6 : &self,
500 6 : layer_file_name: &LayerFileName,
501 6 : layer_metadata: &LayerFileMetadata,
502 6 : cancel: &CancellationToken,
503 6 : ) -> anyhow::Result<u64> {
504 6 : let downloaded_size = {
505 6 : let _unfinished_gauge_guard = self.metrics.call_begin(
506 6 : &RemoteOpFileKind::Layer,
507 6 : &RemoteOpKind::Download,
508 6 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
509 6 : reason: "no need for a downloads gauge",
510 6 : },
511 6 : );
512 6 : download::download_layer_file(
513 6 : self.conf,
514 6 : &self.storage_impl,
515 6 : self.tenant_shard_id,
516 6 : self.timeline_id,
517 6 : layer_file_name,
518 6 : layer_metadata,
519 6 : cancel,
520 6 : )
521 6 : .measure_remote_op(
522 6 : RemoteOpFileKind::Layer,
523 6 : RemoteOpKind::Download,
524 6 : Arc::clone(&self.metrics),
525 6 : )
526 87 : .await?
527 : };
528 :
529 6 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
530 6 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
531 6 :
532 6 : Ok(downloaded_size)
533 6 : }
534 :
535 : //
536 : // Upload operations.
537 : //
538 :
539 : ///
540 : /// Launch an index-file upload operation in the background, with
541 : /// updated metadata.
542 : ///
543 : /// The upload will be added to the queue immediately, but it
544 : /// won't be performed until all previously scheduled layer file
545 : /// upload operations have completed successfully. This is to
546 : /// ensure that when the index file claims that layers X, Y and Z
547 : /// exist in remote storage, they really do. To wait for the upload
548 : /// to complete, use `wait_completion`.
549 : ///
550 : /// If there were any changes to the list of files, i.e. if any
551 : /// layer file uploads were scheduled, since the last index file
552 : /// upload, those will be included too.
553 908 : pub fn schedule_index_upload_for_metadata_update(
554 908 : self: &Arc<Self>,
555 908 : metadata: &TimelineMetadata,
556 908 : ) -> anyhow::Result<()> {
557 908 : let mut guard = self.upload_queue.lock().unwrap();
558 908 : let upload_queue = guard.initialized_mut()?;
559 :
560 : // As documented in the struct definition, it's ok for latest_metadata to be
561 : // ahead of what's _actually_ on the remote during index upload.
562 908 : upload_queue.latest_metadata = metadata.clone();
563 908 :
564 908 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
565 908 :
566 908 : Ok(())
567 908 : }
568 :
569 : ///
570 : /// Launch an index-file upload operation in the background, if necessary.
571 : ///
572 : /// Use this function to schedule the update of the index file after
573 : /// scheduling file uploads or deletions. If no file uploads or deletions
574 : /// have been scheduled since the last index file upload, this does
575 : /// nothing.
576 : ///
577 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
578 : /// the upload to the upload queue and returns quickly.
579 516 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
580 516 : let mut guard = self.upload_queue.lock().unwrap();
581 516 : let upload_queue = guard.initialized_mut()?;
582 :
583 516 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
584 0 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
585 516 : }
586 :
587 516 : Ok(())
588 516 : }
589 :
590 : /// Launch an index-file upload operation in the background (internal function)
591 954 : fn schedule_index_upload(
592 954 : self: &Arc<Self>,
593 954 : upload_queue: &mut UploadQueueInitialized,
594 954 : metadata: TimelineMetadata,
595 954 : ) {
596 954 : let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
597 954 :
598 954 : info!(
599 954 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
600 954 : upload_queue.latest_files.len(),
601 954 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
602 954 : );
603 :
604 954 : let index_part = IndexPart::new(
605 954 : upload_queue.latest_files.clone(),
606 954 : disk_consistent_lsn,
607 954 : metadata,
608 954 : );
609 954 : let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
610 954 : self.metric_begin(&op);
611 954 : upload_queue.queued_operations.push_back(op);
612 954 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
613 954 :
614 954 : // Launch the task immediately, if possible
615 954 : self.launch_queued_tasks(upload_queue);
616 954 : }
617 :
618 : ///
619 : /// Launch an upload operation in the background.
620 : ///
621 714 : pub(crate) fn schedule_layer_file_upload(
622 714 : self: &Arc<Self>,
623 714 : layer: ResidentLayer,
624 714 : ) -> anyhow::Result<()> {
625 714 : let mut guard = self.upload_queue.lock().unwrap();
626 714 : let upload_queue = guard.initialized_mut()?;
627 :
628 714 : self.schedule_layer_file_upload0(upload_queue, layer);
629 714 : self.launch_queued_tasks(upload_queue);
630 714 : Ok(())
631 714 : }
632 :
633 956 : fn schedule_layer_file_upload0(
634 956 : self: &Arc<Self>,
635 956 : upload_queue: &mut UploadQueueInitialized,
636 956 : layer: ResidentLayer,
637 956 : ) {
638 956 : let metadata = layer.metadata();
639 956 :
640 956 : upload_queue
641 956 : .latest_files
642 956 : .insert(layer.layer_desc().filename(), metadata.clone());
643 956 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
644 956 :
645 956 : info!(
646 956 : "scheduled layer file upload {layer} gen={:?} shard={:?}",
647 956 : metadata.generation, metadata.shard
648 956 : );
649 956 : let op = UploadOp::UploadLayer(layer, metadata);
650 956 : self.metric_begin(&op);
651 956 : upload_queue.queued_operations.push_back(op);
652 956 : }
653 :
654 : /// Launch a delete operation in the background.
655 : ///
656 : /// The operation does not modify local filesystem state.
657 : ///
658 : /// Note: This schedules an index file upload before the deletions. The
659 : /// deletion won't actually be performed, until all previously scheduled
660 : /// upload operations, and the index file upload, have completed
661 : /// successfully.
662 8 : pub fn schedule_layer_file_deletion(
663 8 : self: &Arc<Self>,
664 8 : names: &[LayerFileName],
665 8 : ) -> anyhow::Result<()> {
666 8 : let mut guard = self.upload_queue.lock().unwrap();
667 8 : let upload_queue = guard.initialized_mut()?;
668 :
669 8 : let with_metadata =
670 8 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
671 8 :
672 8 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
673 8 :
674 8 : // Launch the tasks immediately, if possible
675 8 : self.launch_queued_tasks(upload_queue);
676 8 : Ok(())
677 8 : }
678 :
679 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
680 : /// layer files, leaving them dangling.
681 : ///
682 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
683 : /// is invoked on them.
684 2 : pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
685 2 : let mut guard = self.upload_queue.lock().unwrap();
686 2 : let upload_queue = guard.initialized_mut()?;
687 :
688 : // just forget the return value; after uploading the next index_part.json, we can consider
689 : // the layer files as "dangling". this is fine, at worst case we create work for the
690 : // scrubber.
691 :
692 2 : let names = gc_layers.iter().map(|x| x.layer_desc().filename());
693 2 :
694 2 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
695 2 :
696 2 : self.launch_queued_tasks(upload_queue);
697 2 :
698 2 : Ok(())
699 2 : }
700 :
701 : /// Update the remote index file, removing the to-be-deleted files from the index,
702 : /// allowing scheduling of actual deletions later.
703 52 : fn schedule_unlinking_of_layers_from_index_part0<I>(
704 52 : self: &Arc<Self>,
705 52 : upload_queue: &mut UploadQueueInitialized,
706 52 : names: I,
707 52 : ) -> Vec<(LayerFileName, LayerFileMetadata)>
708 52 : where
709 52 : I: IntoIterator<Item = LayerFileName>,
710 52 : {
711 52 : // Deleting layers doesn't affect the values stored in TimelineMetadata,
712 52 : // so we don't need update it. Just serialize it.
713 52 : let metadata = upload_queue.latest_metadata.clone();
714 52 :
715 52 : // Decorate our list of names with each name's metadata, dropping
716 52 : // names that are unexpectedly missing from our metadata. This metadata
717 52 : // is later used when physically deleting layers, to construct key paths.
718 52 : let with_metadata: Vec<_> = names
719 52 : .into_iter()
720 446 : .filter_map(|name| {
721 446 : let meta = upload_queue.latest_files.remove(&name);
722 :
723 446 : if let Some(meta) = meta {
724 446 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
725 446 : Some((name, meta))
726 : } else {
727 : // This can only happen if we forgot to to schedule the file upload
728 : // before scheduling the delete. Log it because it is a rare/strange
729 : // situation, and in case something is misbehaving, we'd like to know which
730 : // layers experienced this.
731 0 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
732 0 : None
733 : }
734 446 : })
735 52 : .collect();
736 :
737 : #[cfg(feature = "testing")]
738 498 : for (name, metadata) in &with_metadata {
739 446 : let gen = metadata.generation;
740 446 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
741 0 : if unexpected == gen {
742 0 : tracing::error!("{name} was unlinked twice with same generation");
743 : } else {
744 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
745 : }
746 446 : }
747 : }
748 :
749 : // after unlinking files from the upload_queue.latest_files we must always schedule an
750 : // index_part update, because that needs to be uploaded before we can actually delete the
751 : // files.
752 52 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
753 46 : self.schedule_index_upload(upload_queue, metadata);
754 46 : }
755 :
756 52 : with_metadata
757 52 : }
758 :
759 : /// Schedules deletion for layer files which have previously been unlinked from the
760 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
761 450 : pub(crate) fn schedule_deletion_of_unlinked(
762 450 : self: &Arc<Self>,
763 450 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
764 450 : ) -> anyhow::Result<()> {
765 450 : let mut guard = self.upload_queue.lock().unwrap();
766 450 : let upload_queue = guard.initialized_mut()?;
767 :
768 450 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
769 450 : self.launch_queued_tasks(upload_queue);
770 450 : Ok(())
771 450 : }
772 :
773 458 : fn schedule_deletion_of_unlinked0(
774 458 : self: &Arc<Self>,
775 458 : upload_queue: &mut UploadQueueInitialized,
776 458 : mut with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
777 458 : ) {
778 458 : // Filter out any layers which were not created by this tenant shard. These are
779 458 : // layers that originate from some ancestor shard after a split, and may still
780 458 : // be referenced by other shards. We are free to delete them locally and remove
781 458 : // them from our index (and would have already done so when we reach this point
782 458 : // in the code), but we may not delete them remotely.
783 458 : with_metadata.retain(|(name, meta)| {
784 452 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
785 452 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
786 452 : if !retain {
787 0 : tracing::debug!(
788 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
789 0 : meta.shard
790 0 : );
791 452 : }
792 452 : retain
793 458 : });
794 :
795 910 : for (name, meta) in &with_metadata {
796 452 : info!(
797 452 : "scheduling deletion of layer {}{} (shard {})",
798 452 : name,
799 452 : meta.generation.get_suffix(),
800 452 : meta.shard
801 452 : );
802 : }
803 :
804 : #[cfg(feature = "testing")]
805 910 : for (name, meta) in &with_metadata {
806 452 : let gen = meta.generation;
807 452 : match upload_queue.dangling_files.remove(name) {
808 446 : Some(same) if same == gen => { /* expected */ }
809 0 : Some(other) => {
810 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
811 : }
812 : None => {
813 6 : tracing::error!("{name} was unlinked but was not dangling");
814 : }
815 : }
816 : }
817 :
818 : // schedule the actual deletions
819 458 : if with_metadata.is_empty() {
820 : // avoid scheduling the op & bumping the metric
821 6 : return;
822 452 : }
823 452 : let op = UploadOp::Delete(Delete {
824 452 : layers: with_metadata,
825 452 : });
826 452 : self.metric_begin(&op);
827 452 : upload_queue.queued_operations.push_back(op);
828 458 : }
829 :
830 : /// Schedules a compaction update to the remote `index_part.json`.
831 : ///
832 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
833 42 : pub(crate) fn schedule_compaction_update(
834 42 : self: &Arc<Self>,
835 42 : compacted_from: &[Layer],
836 42 : compacted_to: &[ResidentLayer],
837 42 : ) -> anyhow::Result<()> {
838 42 : let mut guard = self.upload_queue.lock().unwrap();
839 42 : let upload_queue = guard.initialized_mut()?;
840 :
841 284 : for layer in compacted_to {
842 242 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
843 242 : }
844 :
845 442 : let names = compacted_from.iter().map(|x| x.layer_desc().filename());
846 42 :
847 42 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
848 42 : self.launch_queued_tasks(upload_queue);
849 42 :
850 42 : Ok(())
851 42 : }
852 :
853 : /// Wait for all previously scheduled uploads/deletions to complete
854 102 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
855 102 : let mut receiver = {
856 102 : let mut guard = self.upload_queue.lock().unwrap();
857 102 : let upload_queue = guard.initialized_mut()?;
858 102 : self.schedule_barrier0(upload_queue)
859 102 : };
860 102 :
861 102 : if receiver.changed().await.is_err() {
862 0 : anyhow::bail!("wait_completion aborted because upload queue was stopped");
863 102 : }
864 102 :
865 102 : Ok(())
866 102 : }
867 :
868 6 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
869 6 : let mut guard = self.upload_queue.lock().unwrap();
870 6 : let upload_queue = guard.initialized_mut()?;
871 6 : self.schedule_barrier0(upload_queue);
872 6 : Ok(())
873 6 : }
874 :
875 108 : fn schedule_barrier0(
876 108 : self: &Arc<Self>,
877 108 : upload_queue: &mut UploadQueueInitialized,
878 108 : ) -> tokio::sync::watch::Receiver<()> {
879 108 : let (sender, receiver) = tokio::sync::watch::channel(());
880 108 : let barrier_op = UploadOp::Barrier(sender);
881 108 :
882 108 : upload_queue.queued_operations.push_back(barrier_op);
883 108 : // Don't count this kind of operation!
884 108 :
885 108 : // Launch the task immediately, if possible
886 108 : self.launch_queued_tasks(upload_queue);
887 108 :
888 108 : receiver
889 108 : }
890 :
891 : /// Wait for all previously scheduled operations to complete, and then stop.
892 : ///
893 : /// Not cancellation safe
894 6 : pub(crate) async fn shutdown(self: &Arc<Self>) {
895 6 : // On cancellation the queue is left in ackward state of refusing new operations but
896 6 : // proper stop is yet to be called. On cancel the original or some later task must call
897 6 : // `stop` or `shutdown`.
898 6 : let sg = scopeguard::guard((), |_| {
899 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
900 6 : });
901 :
902 6 : let fut = {
903 6 : let mut guard = self.upload_queue.lock().unwrap();
904 6 : let upload_queue = match &mut *guard {
905 0 : UploadQueue::Stopped(_) => return,
906 : UploadQueue::Uninitialized => {
907 : // transition into Stopped state
908 0 : self.stop_impl(&mut guard);
909 0 : return;
910 : }
911 6 : UploadQueue::Initialized(ref mut init) => init,
912 6 : };
913 6 :
914 6 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
915 6 : // just don't add more of these as they would never complete.
916 6 : //
917 6 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
918 6 : // in every place we would not have to jump through this hoop, and this method could be
919 6 : // made cancellable.
920 6 : if !upload_queue.shutting_down {
921 6 : upload_queue.shutting_down = true;
922 6 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
923 6 : // this operation is not counted similar to Barrier
924 6 :
925 6 : self.launch_queued_tasks(upload_queue);
926 6 : }
927 :
928 6 : upload_queue.shutdown_ready.clone().acquire_owned()
929 : };
930 :
931 6 : let res = fut.await;
932 :
933 6 : scopeguard::ScopeGuard::into_inner(sg);
934 6 :
935 6 : match res {
936 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
937 6 : Err(_closed) => {
938 6 : // expected
939 6 : }
940 6 : }
941 6 :
942 6 : self.stop();
943 6 : }
944 :
945 : /// Set the deleted_at field in the remote index file.
946 : ///
947 : /// This fails if the upload queue has not been `stop()`ed.
948 : ///
949 : /// The caller is responsible for calling `stop()` AND for waiting
950 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
951 : /// Check method [`RemoteTimelineClient::stop`] for details.
952 0 : #[instrument(skip_all)]
953 : pub(crate) async fn persist_index_part_with_deleted_flag(
954 : self: &Arc<Self>,
955 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
956 : let index_part_with_deleted_at = {
957 : let mut locked = self.upload_queue.lock().unwrap();
958 :
959 : // We must be in stopped state because otherwise
960 : // we can have inprogress index part upload that can overwrite the file
961 : // with missing is_deleted flag that we going to set below
962 : let stopped = locked.stopped_mut()?;
963 :
964 : match stopped.deleted_at {
965 : SetDeletedFlagProgress::NotRunning => (), // proceed
966 : SetDeletedFlagProgress::InProgress(at) => {
967 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
968 : }
969 : SetDeletedFlagProgress::Successful(at) => {
970 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
971 : }
972 : };
973 : let deleted_at = Utc::now().naive_utc();
974 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
975 :
976 : let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
977 : .context("IndexPart serialize")?;
978 : index_part.deleted_at = Some(deleted_at);
979 : index_part
980 : };
981 :
982 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
983 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
984 0 : let stopped = locked
985 0 : .stopped_mut()
986 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
987 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
988 0 : });
989 :
990 0 : pausable_failpoint!("persist_deleted_index_part");
991 :
992 : backoff::retry(
993 0 : || {
994 0 : upload::upload_index_part(
995 0 : &self.storage_impl,
996 0 : &self.tenant_shard_id,
997 0 : &self.timeline_id,
998 0 : self.generation,
999 0 : &index_part_with_deleted_at,
1000 0 : &self.cancel,
1001 0 : )
1002 0 : },
1003 0 : |_e| false,
1004 : 1,
1005 : // have just a couple of attempts
1006 : // when executed as part of timeline deletion this happens in context of api call
1007 : // when executed as part of tenant deletion this happens in the background
1008 : 2,
1009 : "persist_index_part_with_deleted_flag",
1010 : &self.cancel,
1011 : )
1012 : .await
1013 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1014 0 : .and_then(|x| x)?;
1015 :
1016 : // all good, disarm the guard and mark as success
1017 : ScopeGuard::into_inner(undo_deleted_at);
1018 : {
1019 : let mut locked = self.upload_queue.lock().unwrap();
1020 :
1021 : let stopped = locked
1022 : .stopped_mut()
1023 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1024 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1025 : index_part_with_deleted_at
1026 : .deleted_at
1027 : .expect("we set it above"),
1028 : );
1029 : }
1030 :
1031 : Ok(())
1032 : }
1033 :
1034 0 : pub(crate) async fn preserve_initdb_archive(
1035 0 : self: &Arc<Self>,
1036 0 : tenant_id: &TenantId,
1037 0 : timeline_id: &TimelineId,
1038 0 : cancel: &CancellationToken,
1039 0 : ) -> anyhow::Result<()> {
1040 0 : backoff::retry(
1041 0 : || async {
1042 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1043 0 : .await
1044 0 : },
1045 0 : TimeoutOrCancel::caused_by_cancel,
1046 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1047 0 : FAILED_REMOTE_OP_RETRIES,
1048 0 : "preserve_initdb_tar_zst",
1049 0 : &cancel.clone(),
1050 0 : )
1051 0 : .await
1052 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1053 0 : .and_then(|x| x)
1054 0 : .context("backing up initdb archive")?;
1055 0 : Ok(())
1056 0 : }
1057 :
1058 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1059 0 : match tokio::time::timeout(
1060 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1061 0 : self.deletion_queue_client.flush_immediate(),
1062 0 : )
1063 0 : .await
1064 : {
1065 0 : Ok(result) => result,
1066 0 : Err(_timeout) => {
1067 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1068 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1069 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1070 0 : tracing::warn!(
1071 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1072 0 : );
1073 0 : Ok(())
1074 : }
1075 : }
1076 0 : }
1077 :
1078 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1079 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1080 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1081 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
1082 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1083 :
1084 0 : let layers: Vec<RemotePath> = {
1085 0 : let mut locked = self.upload_queue.lock().unwrap();
1086 0 : let stopped = locked.stopped_mut()?;
1087 :
1088 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1089 0 : anyhow::bail!("deleted_at is not set")
1090 0 : }
1091 0 :
1092 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1093 :
1094 0 : stopped
1095 0 : .upload_queue_for_deletion
1096 0 : .latest_files
1097 0 : .drain()
1098 0 : .map(|(file_name, meta)| {
1099 0 : remote_layer_path(
1100 0 : &self.tenant_shard_id.tenant_id,
1101 0 : &self.timeline_id,
1102 0 : meta.shard,
1103 0 : &file_name,
1104 0 : meta.generation,
1105 0 : )
1106 0 : })
1107 0 : .collect()
1108 0 : };
1109 0 :
1110 0 : let layer_deletion_count = layers.len();
1111 0 : self.deletion_queue_client.push_immediate(layers).await?;
1112 :
1113 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1114 : // inexistant objects are not considered errors.
1115 0 : let initdb_path =
1116 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1117 0 : self.deletion_queue_client
1118 0 : .push_immediate(vec![initdb_path])
1119 0 : .await?;
1120 :
1121 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1122 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1123 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1124 0 :
1125 0 : // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
1126 0 : // taking the burden of listing all the layers that we already know we should delete.
1127 0 : self.flush_deletion_queue().await?;
1128 :
1129 0 : let cancel = shutdown_token();
1130 :
1131 0 : let remaining = download_retry(
1132 0 : || async {
1133 0 : self.storage_impl
1134 0 : .list_files(Some(&timeline_storage_path), None, &cancel)
1135 0 : .await
1136 0 : },
1137 0 : "list remaining files",
1138 0 : &cancel,
1139 0 : )
1140 0 : .await
1141 0 : .context("list files remaining files")?;
1142 :
1143 : // We will delete the current index_part object last, since it acts as a deletion
1144 : // marker via its deleted_at attribute
1145 0 : let latest_index = remaining
1146 0 : .iter()
1147 0 : .filter(|p| {
1148 0 : p.object_name()
1149 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1150 0 : .unwrap_or(false)
1151 0 : })
1152 0 : .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
1153 0 : .max_by_key(|i| i.1)
1154 0 : .map(|i| i.0.clone())
1155 0 : .unwrap_or(
1156 0 : // No generation-suffixed indices, assume we are dealing with
1157 0 : // a legacy index.
1158 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1159 0 : );
1160 0 :
1161 0 : let remaining_layers: Vec<RemotePath> = remaining
1162 0 : .into_iter()
1163 0 : .filter(|p| {
1164 0 : if p == &latest_index {
1165 0 : return false;
1166 0 : }
1167 0 : if p.object_name() == Some(INITDB_PRESERVED_PATH) {
1168 0 : return false;
1169 0 : }
1170 0 : true
1171 0 : })
1172 0 : .inspect(|path| {
1173 0 : if let Some(name) = path.object_name() {
1174 0 : info!(%name, "deleting a file not referenced from index_part.json");
1175 : } else {
1176 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1177 : }
1178 0 : })
1179 0 : .collect();
1180 0 :
1181 0 : let not_referenced_count = remaining_layers.len();
1182 0 : if !remaining_layers.is_empty() {
1183 0 : self.deletion_queue_client
1184 0 : .push_immediate(remaining_layers)
1185 0 : .await?;
1186 0 : }
1187 :
1188 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1189 0 : Err(anyhow::anyhow!(
1190 0 : "failpoint: timeline-delete-before-index-delete"
1191 0 : ))?
1192 0 : });
1193 :
1194 0 : debug!("enqueuing index part deletion");
1195 0 : self.deletion_queue_client
1196 0 : .push_immediate([latest_index].to_vec())
1197 0 : .await?;
1198 :
1199 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1200 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1201 0 : self.flush_deletion_queue().await?;
1202 :
1203 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1204 0 : Err(anyhow::anyhow!(
1205 0 : "failpoint: timeline-delete-after-index-delete"
1206 0 : ))?
1207 0 : });
1208 :
1209 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1210 :
1211 0 : Ok(())
1212 0 : }
1213 :
1214 : ///
1215 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1216 : /// the ordering constraints.
1217 : ///
1218 : /// The caller needs to already hold the `upload_queue` lock.
1219 4285 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1220 6614 : while let Some(next_op) = upload_queue.queued_operations.front() {
1221 : // Can we run this task now?
1222 3823 : let can_run_now = match next_op {
1223 : UploadOp::UploadLayer(_, _) => {
1224 : // Can always be scheduled.
1225 956 : true
1226 : }
1227 : UploadOp::UploadMetadata(_, _) => {
1228 : // These can only be performed after all the preceding operations
1229 : // have finished.
1230 2283 : upload_queue.inprogress_tasks.is_empty()
1231 : }
1232 : UploadOp::Delete(_) => {
1233 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1234 368 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1235 : }
1236 :
1237 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1238 216 : upload_queue.inprogress_tasks.is_empty()
1239 : }
1240 : };
1241 :
1242 : // If we cannot launch this task, don't look any further.
1243 : //
1244 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1245 : // them now, but we don't try to do that currently. For example, if the frontmost task
1246 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1247 : // could still start layer uploads that were scheduled later.
1248 3823 : if !can_run_now {
1249 1488 : break;
1250 2335 : }
1251 2335 :
1252 2335 : if let UploadOp::Shutdown = next_op {
1253 : // leave the op in the queue but do not start more tasks; it will be dropped when
1254 : // the stop is called.
1255 6 : upload_queue.shutdown_ready.close();
1256 6 : break;
1257 2329 : }
1258 2329 :
1259 2329 : // We can launch this task. Remove it from the queue first.
1260 2329 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1261 2329 :
1262 2329 : debug!("starting op: {}", next_op);
1263 :
1264 : // Update the counters
1265 2329 : match next_op {
1266 956 : UploadOp::UploadLayer(_, _) => {
1267 956 : upload_queue.num_inprogress_layer_uploads += 1;
1268 956 : }
1269 935 : UploadOp::UploadMetadata(_, _) => {
1270 935 : upload_queue.num_inprogress_metadata_uploads += 1;
1271 935 : }
1272 330 : UploadOp::Delete(_) => {
1273 330 : upload_queue.num_inprogress_deletions += 1;
1274 330 : }
1275 108 : UploadOp::Barrier(sender) => {
1276 108 : sender.send_replace(());
1277 108 : continue;
1278 : }
1279 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1280 : };
1281 :
1282 : // Assign unique ID to this task
1283 2221 : upload_queue.task_counter += 1;
1284 2221 : let upload_task_id = upload_queue.task_counter;
1285 2221 :
1286 2221 : // Add it to the in-progress map
1287 2221 : let task = Arc::new(UploadTask {
1288 2221 : task_id: upload_task_id,
1289 2221 : op: next_op,
1290 2221 : retries: AtomicU32::new(0),
1291 2221 : });
1292 2221 : upload_queue
1293 2221 : .inprogress_tasks
1294 2221 : .insert(task.task_id, Arc::clone(&task));
1295 2221 :
1296 2221 : // Spawn task to perform the task
1297 2221 : let self_rc = Arc::clone(self);
1298 2221 : let tenant_shard_id = self.tenant_shard_id;
1299 2221 : let timeline_id = self.timeline_id;
1300 2221 : task_mgr::spawn(
1301 2221 : &self.runtime,
1302 2221 : TaskKind::RemoteUploadTask,
1303 2221 : Some(self.tenant_shard_id),
1304 2221 : Some(self.timeline_id),
1305 2221 : "remote upload",
1306 : false,
1307 2114 : async move {
1308 29634 : self_rc.perform_upload_task(task).await;
1309 2001 : Ok(())
1310 2001 : }
1311 2221 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1312 : );
1313 :
1314 : // Loop back to process next task
1315 : }
1316 4285 : }
1317 :
1318 : ///
1319 : /// Perform an upload task.
1320 : ///
1321 : /// The task is in the `inprogress_tasks` list. This function will try to
1322 : /// execute it, retrying forever. On successful completion, the task is
1323 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1324 : /// queue that were waiting by the completion are launched.
1325 : ///
1326 : /// The task can be shut down, however. That leads to stopping the whole
1327 : /// queue.
1328 : ///
1329 2114 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1330 2114 : let cancel = shutdown_token();
1331 : // Loop to retry until it completes.
1332 2114 : loop {
1333 2114 : // If we're requested to shut down, close up shop and exit.
1334 2114 : //
1335 2114 : // Note: We only check for the shutdown requests between retries, so
1336 2114 : // if a shutdown request arrives while we're busy uploading, in the
1337 2114 : // upload::upload:*() call below, we will wait not exit until it has
1338 2114 : // finished. We probably could cancel the upload by simply dropping
1339 2114 : // the Future, but we're not 100% sure if the remote storage library
1340 2114 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1341 2114 : // upload finishes or times out soon enough.
1342 2114 : if cancel.is_cancelled() {
1343 0 : info!("upload task cancelled by shutdown request");
1344 0 : self.stop();
1345 0 : return;
1346 2114 : }
1347 :
1348 2114 : let upload_result: anyhow::Result<()> = match &task.op {
1349 851 : UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
1350 851 : let path = layer.local_path();
1351 851 : upload::upload_timeline_layer(
1352 851 : self.conf,
1353 851 : &self.storage_impl,
1354 851 : path,
1355 851 : layer_metadata,
1356 851 : self.generation,
1357 851 : &self.cancel,
1358 851 : )
1359 851 : .measure_remote_op(
1360 851 : RemoteOpFileKind::Layer,
1361 851 : RemoteOpKind::Upload,
1362 851 : Arc::clone(&self.metrics),
1363 851 : )
1364 25582 : .await
1365 : }
1366 933 : UploadOp::UploadMetadata(ref index_part, _lsn) => {
1367 933 : let mention_having_future_layers = if cfg!(feature = "testing") {
1368 933 : index_part
1369 933 : .layer_metadata
1370 933 : .keys()
1371 4310 : .any(|x| x.is_in_future(*_lsn))
1372 : } else {
1373 0 : false
1374 : };
1375 :
1376 933 : let res = upload::upload_index_part(
1377 933 : &self.storage_impl,
1378 933 : &self.tenant_shard_id,
1379 933 : &self.timeline_id,
1380 933 : self.generation,
1381 933 : index_part,
1382 933 : &self.cancel,
1383 933 : )
1384 933 : .measure_remote_op(
1385 933 : RemoteOpFileKind::Index,
1386 933 : RemoteOpKind::Upload,
1387 933 : Arc::clone(&self.metrics),
1388 933 : )
1389 3740 : .await;
1390 927 : if res.is_ok() {
1391 927 : self.update_remote_physical_size_gauge(Some(index_part));
1392 927 : if mention_having_future_layers {
1393 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1394 4 : tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
1395 923 : }
1396 0 : }
1397 927 : res
1398 : }
1399 330 : UploadOp::Delete(delete) => {
1400 330 : pausable_failpoint!("before-delete-layer-pausable");
1401 330 : self.deletion_queue_client
1402 330 : .push_layers(
1403 330 : self.tenant_shard_id,
1404 330 : self.timeline_id,
1405 330 : self.generation,
1406 330 : delete.layers.clone(),
1407 330 : )
1408 0 : .await
1409 330 : .map_err(|e| anyhow::anyhow!(e))
1410 : }
1411 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
1412 : // unreachable. Barrier operations are handled synchronously in
1413 : // launch_queued_tasks
1414 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
1415 0 : break;
1416 : }
1417 : };
1418 :
1419 0 : match upload_result {
1420 : Ok(()) => {
1421 2001 : break;
1422 : }
1423 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
1424 0 : // loop around to do the proper stopping
1425 0 : continue;
1426 : }
1427 0 : Err(e) => {
1428 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1429 0 :
1430 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1431 0 : // or other external reasons. Such issues are relatively regular, so log them
1432 0 : // at info level at first, and only WARN if the operation fails repeatedly.
1433 0 : //
1434 0 : // (See similar logic for downloads in `download::download_retry`)
1435 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1436 0 : info!(
1437 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1438 0 : task.op, retries, e
1439 0 : );
1440 : } else {
1441 0 : warn!(
1442 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1443 0 : task.op, retries, e
1444 0 : );
1445 : }
1446 :
1447 : // sleep until it's time to retry, or we're cancelled
1448 0 : exponential_backoff(
1449 0 : retries,
1450 0 : DEFAULT_BASE_BACKOFF_SECONDS,
1451 0 : DEFAULT_MAX_BACKOFF_SECONDS,
1452 0 : &cancel,
1453 0 : )
1454 0 : .await;
1455 : }
1456 : }
1457 : }
1458 :
1459 2001 : let retries = task.retries.load(Ordering::SeqCst);
1460 2001 : if retries > 0 {
1461 0 : info!(
1462 0 : "remote task {} completed successfully after {} retries",
1463 0 : task.op, retries
1464 0 : );
1465 : } else {
1466 2001 : debug!("remote task {} completed successfully", task.op);
1467 : }
1468 :
1469 : // The task has completed successfully. Remove it from the in-progress list.
1470 2001 : let lsn_update = {
1471 2001 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1472 2001 : let upload_queue = match upload_queue_guard.deref_mut() {
1473 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1474 0 : UploadQueue::Stopped(_stopped) => {
1475 0 : None
1476 : },
1477 2001 : UploadQueue::Initialized(qi) => { Some(qi) }
1478 : };
1479 :
1480 2001 : let upload_queue = match upload_queue {
1481 2001 : Some(upload_queue) => upload_queue,
1482 : None => {
1483 0 : info!("another concurrent task already stopped the queue");
1484 0 : return;
1485 : }
1486 : };
1487 :
1488 2001 : upload_queue.inprogress_tasks.remove(&task.task_id);
1489 :
1490 2001 : let lsn_update = match task.op {
1491 : UploadOp::UploadLayer(_, _) => {
1492 744 : upload_queue.num_inprogress_layer_uploads -= 1;
1493 744 : None
1494 : }
1495 927 : UploadOp::UploadMetadata(_, lsn) => {
1496 927 : upload_queue.num_inprogress_metadata_uploads -= 1;
1497 927 : // XXX monotonicity check?
1498 927 :
1499 927 : upload_queue.projected_remote_consistent_lsn = Some(lsn);
1500 927 : if self.generation.is_none() {
1501 : // Legacy mode: skip validating generation
1502 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
1503 0 : None
1504 : } else {
1505 927 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
1506 : }
1507 : }
1508 : UploadOp::Delete(_) => {
1509 330 : upload_queue.num_inprogress_deletions -= 1;
1510 330 : None
1511 : }
1512 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
1513 : };
1514 :
1515 : // Launch any queued tasks that were unblocked by this one.
1516 2001 : self.launch_queued_tasks(upload_queue);
1517 2001 : lsn_update
1518 : };
1519 :
1520 2001 : if let Some((lsn, slot)) = lsn_update {
1521 : // Updates to the remote_consistent_lsn we advertise to pageservers
1522 : // are all routed through the DeletionQueue, to enforce important
1523 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
1524 927 : self.deletion_queue_client
1525 927 : .update_remote_consistent_lsn(
1526 927 : self.tenant_shard_id,
1527 927 : self.timeline_id,
1528 927 : self.generation,
1529 927 : lsn,
1530 927 : slot,
1531 927 : )
1532 0 : .await;
1533 1074 : }
1534 :
1535 2001 : self.metric_end(&task.op);
1536 2001 : }
1537 :
1538 4369 : fn metric_impl(
1539 4369 : &self,
1540 4369 : op: &UploadOp,
1541 4369 : ) -> Option<(
1542 4369 : RemoteOpFileKind,
1543 4369 : RemoteOpKind,
1544 4369 : RemoteTimelineClientMetricsCallTrackSize,
1545 4369 : )> {
1546 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
1547 4369 : let res = match op {
1548 1700 : UploadOp::UploadLayer(_, m) => (
1549 1700 : RemoteOpFileKind::Layer,
1550 1700 : RemoteOpKind::Upload,
1551 1700 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
1552 1700 : ),
1553 1881 : UploadOp::UploadMetadata(_, _) => (
1554 1881 : RemoteOpFileKind::Index,
1555 1881 : RemoteOpKind::Upload,
1556 1881 : DontTrackSize {
1557 1881 : reason: "metadata uploads are tiny",
1558 1881 : },
1559 1881 : ),
1560 782 : UploadOp::Delete(_delete) => (
1561 782 : RemoteOpFileKind::Layer,
1562 782 : RemoteOpKind::Delete,
1563 782 : DontTrackSize {
1564 782 : reason: "should we track deletes? positive or negative sign?",
1565 782 : },
1566 782 : ),
1567 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
1568 : // we do not account these
1569 6 : return None;
1570 : }
1571 : };
1572 4363 : Some(res)
1573 4369 : }
1574 :
1575 2362 : fn metric_begin(&self, op: &UploadOp) {
1576 2362 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
1577 2362 : Some(x) => x,
1578 0 : None => return,
1579 : };
1580 2362 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
1581 2362 : guard.will_decrement_manually(); // in metric_end(), see right below
1582 2362 : }
1583 :
1584 2007 : fn metric_end(&self, op: &UploadOp) {
1585 2007 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
1586 2001 : Some(x) => x,
1587 6 : None => return,
1588 : };
1589 2001 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
1590 2007 : }
1591 :
1592 : /// Close the upload queue for new operations and cancel queued operations.
1593 : ///
1594 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
1595 : ///
1596 : /// In-progress operations will still be running after this function returns.
1597 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
1598 : /// to wait for them to complete, after calling this function.
1599 14 : pub(crate) fn stop(&self) {
1600 14 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
1601 14 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
1602 14 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
1603 14 : let mut guard = self.upload_queue.lock().unwrap();
1604 14 : self.stop_impl(&mut guard);
1605 14 : }
1606 :
1607 14 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
1608 14 : match &mut **guard {
1609 : UploadQueue::Uninitialized => {
1610 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
1611 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
1612 : }
1613 : UploadQueue::Stopped(_) => {
1614 : // nothing to do
1615 6 : info!("another concurrent task already shut down the queue");
1616 : }
1617 8 : UploadQueue::Initialized(initialized) => {
1618 8 : info!("shutting down upload queue");
1619 :
1620 : // Replace the queue with the Stopped state, taking ownership of the old
1621 : // Initialized queue. We will do some checks on it, and then drop it.
1622 8 : let qi = {
1623 : // Here we preserve working version of the upload queue for possible use during deletions.
1624 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
1625 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
1626 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
1627 8 : let upload_queue_for_deletion = UploadQueueInitialized {
1628 8 : task_counter: 0,
1629 8 : latest_files: initialized.latest_files.clone(),
1630 8 : latest_files_changes_since_metadata_upload_scheduled: 0,
1631 8 : latest_metadata: initialized.latest_metadata.clone(),
1632 8 : projected_remote_consistent_lsn: None,
1633 8 : visible_remote_consistent_lsn: initialized
1634 8 : .visible_remote_consistent_lsn
1635 8 : .clone(),
1636 8 : num_inprogress_layer_uploads: 0,
1637 8 : num_inprogress_metadata_uploads: 0,
1638 8 : num_inprogress_deletions: 0,
1639 8 : inprogress_tasks: HashMap::default(),
1640 8 : queued_operations: VecDeque::default(),
1641 8 : #[cfg(feature = "testing")]
1642 8 : dangling_files: HashMap::default(),
1643 8 : shutting_down: false,
1644 8 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
1645 8 : };
1646 8 :
1647 8 : let upload_queue = std::mem::replace(
1648 8 : &mut **guard,
1649 8 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
1650 8 : UploadQueueStoppedDeletable {
1651 8 : upload_queue_for_deletion,
1652 8 : deleted_at: SetDeletedFlagProgress::NotRunning,
1653 8 : },
1654 8 : )),
1655 8 : );
1656 8 : if let UploadQueue::Initialized(qi) = upload_queue {
1657 8 : qi
1658 : } else {
1659 0 : unreachable!("we checked in the match above that it is Initialized");
1660 : }
1661 : };
1662 :
1663 : // consistency check
1664 8 : assert_eq!(
1665 8 : qi.num_inprogress_layer_uploads
1666 8 : + qi.num_inprogress_metadata_uploads
1667 8 : + qi.num_inprogress_deletions,
1668 8 : qi.inprogress_tasks.len()
1669 8 : );
1670 :
1671 : // We don't need to do anything here for in-progress tasks. They will finish
1672 : // on their own, decrement the unfinished-task counter themselves, and observe
1673 : // that the queue is Stopped.
1674 8 : drop(qi.inprogress_tasks);
1675 :
1676 : // Tear down queued ops
1677 8 : for op in qi.queued_operations.into_iter() {
1678 6 : self.metric_end(&op);
1679 6 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
1680 6 : // which is exactly what we want to happen.
1681 6 : drop(op);
1682 6 : }
1683 : }
1684 : }
1685 14 : }
1686 : }
1687 :
1688 138 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1689 138 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
1690 138 : RemotePath::from_string(&path).expect("Failed to construct path")
1691 138 : }
1692 :
1693 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
1694 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
1695 0 : RemotePath::from_string(&path).expect("Failed to construct path")
1696 0 : }
1697 :
1698 30 : pub fn remote_timeline_path(
1699 30 : tenant_shard_id: &TenantShardId,
1700 30 : timeline_id: &TimelineId,
1701 30 : ) -> RemotePath {
1702 30 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
1703 30 : }
1704 :
1705 : /// Note that the shard component of a remote layer path is _not_ always the same
1706 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
1707 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
1708 18 : pub fn remote_layer_path(
1709 18 : tenant_id: &TenantId,
1710 18 : timeline_id: &TimelineId,
1711 18 : shard: ShardIndex,
1712 18 : layer_file_name: &LayerFileName,
1713 18 : generation: Generation,
1714 18 : ) -> RemotePath {
1715 18 : // Generation-aware key format
1716 18 : let path = format!(
1717 18 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
1718 18 : shard.get_suffix(),
1719 18 : layer_file_name.file_name(),
1720 18 : generation.get_suffix()
1721 18 : );
1722 18 :
1723 18 : RemotePath::from_string(&path).expect("Failed to construct path")
1724 18 : }
1725 :
1726 4 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
1727 4 : RemotePath::from_string(&format!(
1728 4 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
1729 4 : ))
1730 4 : .expect("Failed to construct path")
1731 4 : }
1732 :
1733 2 : pub fn remote_initdb_preserved_archive_path(
1734 2 : tenant_id: &TenantId,
1735 2 : timeline_id: &TimelineId,
1736 2 : ) -> RemotePath {
1737 2 : RemotePath::from_string(&format!(
1738 2 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
1739 2 : ))
1740 2 : .expect("Failed to construct path")
1741 2 : }
1742 :
1743 984 : pub fn remote_index_path(
1744 984 : tenant_shard_id: &TenantShardId,
1745 984 : timeline_id: &TimelineId,
1746 984 : generation: Generation,
1747 984 : ) -> RemotePath {
1748 984 : RemotePath::from_string(&format!(
1749 984 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1750 984 : IndexPart::FILE_NAME,
1751 984 : generation.get_suffix()
1752 984 : ))
1753 984 : .expect("Failed to construct path")
1754 984 : }
1755 :
1756 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1757 0 : RemotePath::from_string(&format!(
1758 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
1759 0 : ))
1760 0 : .expect("Failed to construct path")
1761 0 : }
1762 :
1763 : /// Given the key of an index, parse out the generation part of the name
1764 18 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
1765 18 : let file_name = match path.get_path().file_name() {
1766 18 : Some(f) => f,
1767 : None => {
1768 : // Unexpected: we should be seeing index_part.json paths only
1769 0 : tracing::warn!("Malformed index key {}", path);
1770 0 : return None;
1771 : }
1772 : };
1773 :
1774 18 : match file_name.split_once('-') {
1775 12 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
1776 6 : None => None,
1777 : }
1778 18 : }
1779 :
1780 : /// Files on the remote storage are stored with paths, relative to the workdir.
1781 : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
1782 : ///
1783 : /// Errors if the path provided does not start from pageserver's workdir.
1784 848 : pub fn remote_path(
1785 848 : conf: &PageServerConf,
1786 848 : local_path: &Utf8Path,
1787 848 : generation: Generation,
1788 848 : ) -> anyhow::Result<RemotePath> {
1789 848 : let stripped = local_path
1790 848 : .strip_prefix(&conf.workdir)
1791 848 : .context("Failed to strip workdir prefix")?;
1792 :
1793 848 : let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
1794 848 :
1795 848 : RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
1796 0 : format!(
1797 0 : "to resolve remote part of path {:?} for base {:?}",
1798 0 : local_path, conf.workdir
1799 0 : )
1800 848 : })
1801 848 : }
1802 :
1803 : #[cfg(test)]
1804 : mod tests {
1805 : use super::*;
1806 : use crate::{
1807 : context::RequestContext,
1808 : tenant::{
1809 : harness::{TenantHarness, TIMELINE_ID},
1810 : Tenant, Timeline,
1811 : },
1812 : DEFAULT_PG_VERSION,
1813 : };
1814 :
1815 : use std::collections::HashSet;
1816 :
1817 8 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
1818 8 : format!("contents for {name}").into()
1819 8 : }
1820 :
1821 2 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
1822 2 : let metadata = TimelineMetadata::new(
1823 2 : disk_consistent_lsn,
1824 2 : None,
1825 2 : None,
1826 2 : Lsn(0),
1827 2 : Lsn(0),
1828 2 : Lsn(0),
1829 2 : // Any version will do
1830 2 : // but it should be consistent with the one in the tests
1831 2 : crate::DEFAULT_PG_VERSION,
1832 2 : );
1833 2 :
1834 2 : // go through serialize + deserialize to fix the header, including checksum
1835 2 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
1836 2 : }
1837 :
1838 2 : fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
1839 6 : let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
1840 2 : avec.sort();
1841 2 :
1842 2 : let mut bvec = b.to_vec();
1843 2 : bvec.sort_unstable();
1844 2 :
1845 2 : assert_eq!(avec, bvec);
1846 2 : }
1847 :
1848 4 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
1849 4 : let mut expected: Vec<String> = expected
1850 4 : .iter()
1851 16 : .map(|x| format!("{}{}", x, generation.get_suffix()))
1852 4 : .collect();
1853 4 : expected.sort();
1854 4 :
1855 4 : let mut found: Vec<String> = Vec::new();
1856 16 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
1857 16 : let entry_name = entry.file_name();
1858 16 : let fname = entry_name.to_str().unwrap();
1859 16 : found.push(String::from(fname));
1860 16 : }
1861 4 : found.sort();
1862 4 :
1863 4 : assert_eq!(found, expected);
1864 4 : }
1865 :
1866 : struct TestSetup {
1867 : harness: TenantHarness,
1868 : tenant: Arc<Tenant>,
1869 : timeline: Arc<Timeline>,
1870 : tenant_ctx: RequestContext,
1871 : }
1872 :
1873 : impl TestSetup {
1874 8 : async fn new(test_name: &str) -> anyhow::Result<Self> {
1875 8 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
1876 8 : let harness = TenantHarness::create(test_name)?;
1877 8 : let (tenant, ctx) = harness.load().await;
1878 :
1879 8 : let timeline = tenant
1880 8 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1881 24 : .await?;
1882 :
1883 8 : Ok(Self {
1884 8 : harness,
1885 8 : tenant,
1886 8 : timeline,
1887 8 : tenant_ctx: ctx,
1888 8 : })
1889 8 : }
1890 :
1891 : /// Construct a RemoteTimelineClient in an arbitrary generation
1892 10 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
1893 10 : Arc::new(RemoteTimelineClient {
1894 10 : conf: self.harness.conf,
1895 10 : runtime: tokio::runtime::Handle::current(),
1896 10 : tenant_shard_id: self.harness.tenant_shard_id,
1897 10 : timeline_id: TIMELINE_ID,
1898 10 : generation,
1899 10 : storage_impl: self.harness.remote_storage.clone(),
1900 10 : deletion_queue_client: self.harness.deletion_queue.new_client(),
1901 10 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
1902 10 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
1903 10 : &self.harness.tenant_shard_id,
1904 10 : &TIMELINE_ID,
1905 10 : )),
1906 10 : cancel: CancellationToken::new(),
1907 10 : })
1908 10 : }
1909 :
1910 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
1911 : /// and timeline_id are present.
1912 6 : fn span(&self) -> tracing::Span {
1913 6 : tracing::info_span!(
1914 : "test",
1915 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
1916 6 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
1917 : timeline_id = %TIMELINE_ID
1918 : )
1919 6 : }
1920 : }
1921 :
1922 : // Test scheduling
1923 : #[tokio::test]
1924 2 : async fn upload_scheduling() {
1925 2 : // Test outline:
1926 2 : //
1927 2 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
1928 2 : // Schedule upload of index. Check that it is queued
1929 2 : // let the layer file uploads finish. Check that the index-upload is now started
1930 2 : // let the index-upload finish.
1931 2 : //
1932 2 : // Download back the index.json. Check that the list of files is correct
1933 2 : //
1934 2 : // Schedule upload. Schedule deletion. Check that the deletion is queued
1935 2 : // let upload finish. Check that deletion is now started
1936 2 : // Schedule another deletion. Check that it's launched immediately.
1937 2 : // Schedule index upload. Check that it's queued
1938 2 :
1939 6 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
1940 2 : let span = test_setup.span();
1941 2 : let _guard = span.enter();
1942 2 :
1943 2 : let TestSetup {
1944 2 : harness,
1945 2 : tenant: _tenant,
1946 2 : timeline,
1947 2 : tenant_ctx: _tenant_ctx,
1948 2 : } = test_setup;
1949 2 :
1950 2 : let client = timeline.remote_client.as_ref().unwrap();
1951 2 :
1952 2 : // Download back the index.json, and check that the list of files is correct
1953 2 : let initial_index_part = match client
1954 2 : .download_index_file(&CancellationToken::new())
1955 8 : .await
1956 2 : .unwrap()
1957 2 : {
1958 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1959 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1960 2 : };
1961 2 : let initial_layers = initial_index_part
1962 2 : .layer_metadata
1963 2 : .keys()
1964 2 : .map(|f| f.to_owned())
1965 2 : .collect::<HashSet<LayerFileName>>();
1966 2 : let initial_layer = {
1967 2 : assert!(initial_layers.len() == 1);
1968 2 : initial_layers.into_iter().next().unwrap()
1969 2 : };
1970 2 :
1971 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
1972 2 :
1973 2 : println!("workdir: {}", harness.conf.workdir);
1974 2 :
1975 2 : let remote_timeline_dir = harness
1976 2 : .remote_fs_dir
1977 2 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
1978 2 : println!("remote_timeline_dir: {remote_timeline_dir}");
1979 2 :
1980 2 : let generation = harness.generation;
1981 2 : let shard = harness.shard;
1982 2 :
1983 2 : // Create a couple of dummy files, schedule upload for them
1984 2 :
1985 2 : let layers = [
1986 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
1987 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
1988 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
1989 2 : ]
1990 2 : .into_iter()
1991 6 : .map(|(name, contents): (LayerFileName, Vec<u8>)| {
1992 6 : std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
1993 6 :
1994 6 : Layer::for_resident(
1995 6 : harness.conf,
1996 6 : &timeline,
1997 6 : name,
1998 6 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
1999 6 : )
2000 6 : }).collect::<Vec<_>>();
2001 2 :
2002 2 : client
2003 2 : .schedule_layer_file_upload(layers[0].clone())
2004 2 : .unwrap();
2005 2 : client
2006 2 : .schedule_layer_file_upload(layers[1].clone())
2007 2 : .unwrap();
2008 2 :
2009 2 : // Check that they are started immediately, not queued
2010 2 : //
2011 2 : // this works because we running within block_on, so any futures are now queued up until
2012 2 : // our next await point.
2013 2 : {
2014 2 : let mut guard = client.upload_queue.lock().unwrap();
2015 2 : let upload_queue = guard.initialized_mut().unwrap();
2016 2 : assert!(upload_queue.queued_operations.is_empty());
2017 2 : assert!(upload_queue.inprogress_tasks.len() == 2);
2018 2 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2019 2 :
2020 2 : // also check that `latest_file_changes` was updated
2021 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2022 2 : }
2023 2 :
2024 2 : // Schedule upload of index. Check that it is queued
2025 2 : let metadata = dummy_metadata(Lsn(0x20));
2026 2 : client
2027 2 : .schedule_index_upload_for_metadata_update(&metadata)
2028 2 : .unwrap();
2029 2 : {
2030 2 : let mut guard = client.upload_queue.lock().unwrap();
2031 2 : let upload_queue = guard.initialized_mut().unwrap();
2032 2 : assert!(upload_queue.queued_operations.len() == 1);
2033 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2034 2 : }
2035 2 :
2036 2 : // Wait for the uploads to finish
2037 2 : client.wait_completion().await.unwrap();
2038 2 : {
2039 2 : let mut guard = client.upload_queue.lock().unwrap();
2040 2 : let upload_queue = guard.initialized_mut().unwrap();
2041 2 :
2042 2 : assert!(upload_queue.queued_operations.is_empty());
2043 2 : assert!(upload_queue.inprogress_tasks.is_empty());
2044 2 : }
2045 2 :
2046 2 : // Download back the index.json, and check that the list of files is correct
2047 2 : let index_part = match client
2048 2 : .download_index_file(&CancellationToken::new())
2049 8 : .await
2050 2 : .unwrap()
2051 2 : {
2052 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2053 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2054 2 : };
2055 2 :
2056 2 : assert_file_list(
2057 2 : &index_part
2058 2 : .layer_metadata
2059 2 : .keys()
2060 6 : .map(|f| f.to_owned())
2061 2 : .collect(),
2062 2 : &[
2063 2 : &initial_layer.file_name(),
2064 2 : &layers[0].layer_desc().filename().file_name(),
2065 2 : &layers[1].layer_desc().filename().file_name(),
2066 2 : ],
2067 2 : );
2068 2 : assert_eq!(index_part.metadata, metadata);
2069 2 :
2070 2 : // Schedule upload and then a deletion. Check that the deletion is queued
2071 2 : client
2072 2 : .schedule_layer_file_upload(layers[2].clone())
2073 2 : .unwrap();
2074 2 :
2075 2 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2076 2 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2077 2 : // spawn_blocking started by the drop.
2078 2 : client
2079 2 : .schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
2080 2 : .unwrap();
2081 2 : {
2082 2 : let mut guard = client.upload_queue.lock().unwrap();
2083 2 : let upload_queue = guard.initialized_mut().unwrap();
2084 2 :
2085 2 : // Deletion schedules upload of the index file, and the file deletion itself
2086 2 : assert_eq!(upload_queue.queued_operations.len(), 2);
2087 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2088 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2089 2 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2090 2 : assert_eq!(
2091 2 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2092 2 : 0
2093 2 : );
2094 2 : }
2095 2 : assert_remote_files(
2096 2 : &[
2097 2 : &initial_layer.file_name(),
2098 2 : &layers[0].layer_desc().filename().file_name(),
2099 2 : &layers[1].layer_desc().filename().file_name(),
2100 2 : "index_part.json",
2101 2 : ],
2102 2 : &remote_timeline_dir,
2103 2 : generation,
2104 2 : );
2105 2 :
2106 2 : // Finish them
2107 2 : client.wait_completion().await.unwrap();
2108 2 : harness.deletion_queue.pump().await;
2109 2 :
2110 2 : assert_remote_files(
2111 2 : &[
2112 2 : &initial_layer.file_name(),
2113 2 : &layers[1].layer_desc().filename().file_name(),
2114 2 : &layers[2].layer_desc().filename().file_name(),
2115 2 : "index_part.json",
2116 2 : ],
2117 2 : &remote_timeline_dir,
2118 2 : generation,
2119 2 : );
2120 2 : }
2121 :
2122 : #[tokio::test]
2123 2 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2124 2 : // Setup
2125 2 :
2126 2 : let TestSetup {
2127 2 : harness,
2128 2 : tenant: _tenant,
2129 2 : timeline,
2130 2 : ..
2131 6 : } = TestSetup::new("metrics").await.unwrap();
2132 2 : let client = timeline.remote_client.as_ref().unwrap();
2133 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2134 2 :
2135 2 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2136 2 : let content_1 = dummy_contents("foo");
2137 2 : std::fs::write(
2138 2 : timeline_path.join(layer_file_name_1.file_name()),
2139 2 : &content_1,
2140 2 : )
2141 2 : .unwrap();
2142 2 :
2143 2 : let layer_file_1 = Layer::for_resident(
2144 2 : harness.conf,
2145 2 : &timeline,
2146 2 : layer_file_name_1.clone(),
2147 2 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2148 2 : );
2149 2 :
2150 2 : #[derive(Debug, PartialEq, Clone, Copy)]
2151 2 : struct BytesStartedFinished {
2152 2 : started: Option<usize>,
2153 2 : finished: Option<usize>,
2154 2 : }
2155 2 : impl std::ops::Add for BytesStartedFinished {
2156 2 : type Output = Self;
2157 4 : fn add(self, rhs: Self) -> Self::Output {
2158 4 : Self {
2159 4 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2160 4 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2161 4 : }
2162 4 : }
2163 2 : }
2164 6 : let get_bytes_started_stopped = || {
2165 6 : let started = client
2166 6 : .metrics
2167 6 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2168 6 : .map(|v| v.try_into().unwrap());
2169 6 : let stopped = client
2170 6 : .metrics
2171 6 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2172 6 : .map(|v| v.try_into().unwrap());
2173 6 : BytesStartedFinished {
2174 6 : started,
2175 6 : finished: stopped,
2176 6 : }
2177 6 : };
2178 2 :
2179 2 : // Test
2180 2 : tracing::info!("now doing actual test");
2181 2 :
2182 2 : let actual_a = get_bytes_started_stopped();
2183 2 :
2184 2 : client
2185 2 : .schedule_layer_file_upload(layer_file_1.clone())
2186 2 : .unwrap();
2187 2 :
2188 2 : let actual_b = get_bytes_started_stopped();
2189 2 :
2190 2 : client.wait_completion().await.unwrap();
2191 2 :
2192 2 : let actual_c = get_bytes_started_stopped();
2193 2 :
2194 2 : // Validate
2195 2 :
2196 2 : let expected_b = actual_a
2197 2 : + BytesStartedFinished {
2198 2 : started: Some(content_1.len()),
2199 2 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2200 2 : finished: Some(0),
2201 2 : };
2202 2 : assert_eq!(actual_b, expected_b);
2203 2 :
2204 2 : let expected_c = actual_a
2205 2 : + BytesStartedFinished {
2206 2 : started: Some(content_1.len()),
2207 2 : finished: Some(content_1.len()),
2208 2 : };
2209 2 : assert_eq!(actual_c, expected_c);
2210 2 : }
2211 :
2212 12 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
2213 12 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
2214 12 : let example_metadata = TimelineMetadata::example();
2215 12 : let example_index_part = IndexPart::new(
2216 12 : HashMap::new(),
2217 12 : example_metadata.disk_consistent_lsn(),
2218 12 : example_metadata,
2219 12 : );
2220 12 :
2221 12 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
2222 12 :
2223 12 : let index_path = test_state.harness.remote_fs_dir.join(
2224 12 : remote_index_path(
2225 12 : &test_state.harness.tenant_shard_id,
2226 12 : &TIMELINE_ID,
2227 12 : generation,
2228 12 : )
2229 12 : .get_path(),
2230 12 : );
2231 12 :
2232 12 : std::fs::create_dir_all(index_path.parent().unwrap())
2233 12 : .expect("creating test dir should work");
2234 12 :
2235 12 : eprintln!("Writing {index_path}");
2236 12 : std::fs::write(&index_path, index_part_bytes).unwrap();
2237 12 : example_index_part
2238 12 : }
2239 :
2240 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
2241 : /// index, the IndexPart returned is equal to `expected`
2242 10 : async fn assert_got_index_part(
2243 10 : test_state: &TestSetup,
2244 10 : get_generation: Generation,
2245 10 : expected: &IndexPart,
2246 10 : ) {
2247 10 : let client = test_state.build_client(get_generation);
2248 :
2249 10 : let download_r = client
2250 10 : .download_index_file(&CancellationToken::new())
2251 66 : .await
2252 10 : .expect("download should always succeed");
2253 10 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
2254 10 : match download_r {
2255 10 : MaybeDeletedIndexPart::IndexPart(index_part) => {
2256 10 : assert_eq!(&index_part, expected);
2257 : }
2258 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
2259 : }
2260 10 : }
2261 :
2262 : #[tokio::test]
2263 2 : async fn index_part_download_simple() -> anyhow::Result<()> {
2264 6 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
2265 2 : let span = test_state.span();
2266 2 : let _guard = span.enter();
2267 2 :
2268 2 : // Simple case: we are in generation N, load the index from generation N - 1
2269 2 : let generation_n = 5;
2270 2 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2271 2 :
2272 10 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
2273 2 :
2274 2 : Ok(())
2275 2 : }
2276 :
2277 : #[tokio::test]
2278 2 : async fn index_part_download_ordering() -> anyhow::Result<()> {
2279 2 : let test_state = TestSetup::new("index_part_download_ordering")
2280 6 : .await
2281 2 : .unwrap();
2282 2 :
2283 2 : let span = test_state.span();
2284 2 : let _guard = span.enter();
2285 2 :
2286 2 : // A generation-less IndexPart exists in the bucket, we should find it
2287 2 : let generation_n = 5;
2288 2 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
2289 16 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
2290 2 :
2291 2 : // If a more recent-than-none generation exists, we should prefer to load that
2292 2 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
2293 16 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2294 2 :
2295 2 : // If a more-recent-than-me generation exists, we should ignore it.
2296 2 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
2297 16 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2298 2 :
2299 2 : // If a directly previous generation exists, _and_ an index exists in my own
2300 2 : // generation, I should prefer my own generation.
2301 2 : let _injected_prev =
2302 2 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2303 2 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
2304 2 : assert_got_index_part(
2305 2 : &test_state,
2306 2 : Generation::new(generation_n),
2307 2 : &injected_current,
2308 2 : )
2309 8 : .await;
2310 2 :
2311 2 : Ok(())
2312 2 : }
2313 : }
|