TLA Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in
95 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
96 : //! It is initialized based on the [`IndexPart`] that was passed during init
97 : //! and updated with every `schedule_*` function call.
98 : //! All this is necessary necessary to compute the future [`IndexPart`]s
99 : //! when scheduling an operation while other operations that also affect the
100 : //! remote [`IndexPart`] are in flight.
101 : //!
102 : //! # Retries & Error Handling
103 : //!
104 : //! The client retries operations indefinitely, using exponential back-off.
105 : //! There is no way to force a retry, i.e., interrupt the back-off.
106 : //! This could be built easily.
107 : //!
108 : //! # Cancellation
109 : //!
110 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
111 : //! the client's tenant and timeline.
112 : //! Dropping the client will drop queued operations but not executing operations.
113 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
114 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
115 : //!
116 : //! # Completion
117 : //!
118 : //! Once an operation has completed, we update
119 : //! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
120 : //! and submit a request through the DeletionQueue to update
121 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
122 : //! validated that our generation is not stale. It is this visible value
123 : //! that is advertized to safekeepers as a signal that that they can
124 : //! delete the WAL up to that LSN.
125 : //!
126 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
127 : //! for all pending operations to complete. It does not prevent more
128 : //! operations from getting scheduled.
129 : //!
130 : //! # Crash Consistency
131 : //!
132 : //! We do not persist the upload queue state.
133 : //! If we drop the client, or crash, all unfinished operations are lost.
134 : //!
135 : //! To recover, the following steps need to be taken:
136 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
137 : //! consistent remote state, assuming the user scheduled the operations in
138 : //! the correct order.
139 : //! - Initiate upload queue with that [`IndexPart`].
140 : //! - Reschedule all lost operations by comparing the local filesystem state
141 : //! and remote state as per [`IndexPart`]. This is done in
142 : //! [`Tenant::timeline_init_and_sync`].
143 : //!
144 : //! Note that if we crash during file deletion between the index update
145 : //! that removes the file from the list of files, and deleting the remote file,
146 : //! the file is leaked in the remote storage. Similarly, if a new file is created
147 : //! and uploaded, but the pageserver dies permanently before updating the
148 : //! remote index file, the new file is leaked in remote storage. We accept and
149 : //! tolerate that for now.
150 : //! Note further that we cannot easily fix this by scheduling deletes for every
151 : //! file that is present only on the remote, because we cannot distinguish the
152 : //! following two cases:
153 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
154 : //! but crashed before it finished remotely.
155 : //! - (2) We never had the file locally because we haven't on-demand downloaded
156 : //! it yet.
157 : //!
158 : //! # Downloads
159 : //!
160 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
161 : //! downloading files from the remote storage. Downloads are performed immediately
162 : //! against the `RemoteStorage`, independently of the upload queue.
163 : //!
164 : //! When we attach a tenant, we perform the following steps:
165 : //! - create `Tenant` object in `TenantState::Attaching` state
166 : //! - List timelines that are present in remote storage, and for each:
167 : //! - download their remote [`IndexPart`]s
168 : //! - create `Timeline` struct and a `RemoteTimelineClient`
169 : //! - initialize the client's upload queue with its `IndexPart`
170 : //! - schedule uploads for layers that are only present locally.
171 : //! - After the above is done for each timeline, open the tenant for business by
172 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
173 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
174 : //!
175 : //! # Operating Without Remote Storage
176 : //!
177 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
178 : //! not created and the uploads are skipped.
179 : //!
180 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
181 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
182 :
183 : pub(crate) mod download;
184 : pub mod index;
185 : mod upload;
186 :
187 : use anyhow::Context;
188 : use camino::Utf8Path;
189 : use chrono::{NaiveDateTime, Utc};
190 :
191 : pub(crate) use download::download_initdb_tar_zst;
192 : use pageserver_api::shard::{ShardIndex, TenantShardId};
193 : use scopeguard::ScopeGuard;
194 : use tokio_util::sync::CancellationToken;
195 : pub(crate) use upload::upload_initdb_dir;
196 : use utils::backoff::{
197 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
198 : };
199 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
200 :
201 : use std::collections::{HashMap, VecDeque};
202 : use std::sync::atomic::{AtomicU32, Ordering};
203 : use std::sync::{Arc, Mutex};
204 : use std::time::Duration;
205 :
206 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
207 : use std::ops::DerefMut;
208 : use tracing::{debug, error, info, instrument, warn};
209 : use tracing::{info_span, Instrument};
210 : use utils::lsn::Lsn;
211 :
212 : use crate::deletion_queue::DeletionQueueClient;
213 : use crate::metrics::{
214 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
215 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
216 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
217 : };
218 : use crate::task_mgr::shutdown_token;
219 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
220 : use crate::tenant::storage_layer::AsLayerDesc;
221 : use crate::tenant::upload_queue::Delete;
222 : use crate::tenant::TIMELINES_SEGMENT_NAME;
223 : use crate::{
224 : config::PageServerConf,
225 : task_mgr,
226 : task_mgr::TaskKind,
227 : task_mgr::BACKGROUND_RUNTIME,
228 : tenant::metadata::TimelineMetadata,
229 : tenant::upload_queue::{
230 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
231 : },
232 : TENANT_HEATMAP_BASENAME,
233 : };
234 :
235 : use utils::id::{TenantId, TimelineId};
236 :
237 : use self::index::IndexPart;
238 :
239 : use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
240 : use super::upload_queue::SetDeletedFlagProgress;
241 : use super::Generation;
242 :
243 : pub(crate) use download::{is_temp_download_file, list_remote_timelines};
244 : pub(crate) use index::LayerFileMetadata;
245 :
246 : // Occasional network issues and such can cause remote operations to fail, and
247 : // that's expected. If a download fails, we log it at info-level, and retry.
248 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
249 : // level instead, as repeated failures can mean a more serious problem. If it
250 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
251 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
252 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
253 :
254 : // Similarly log failed uploads and deletions at WARN level, after this many
255 : // retries. Uploads and deletions are retried forever, though.
256 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
257 :
258 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
259 :
260 : /// Default buffer size when interfacing with [`tokio::fs::File`].
261 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
262 :
263 : pub enum MaybeDeletedIndexPart {
264 : IndexPart(IndexPart),
265 : Deleted(IndexPart),
266 : }
267 :
268 : /// Errors that can arise when calling [`RemoteTimelineClient::stop`].
269 UBC 0 : #[derive(Debug, thiserror::Error)]
270 : pub enum StopError {
271 : /// Returned if the upload queue was never initialized.
272 : /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
273 : #[error("queue is not initialized")]
274 : QueueUninitialized,
275 : }
276 :
277 0 : #[derive(Debug, thiserror::Error)]
278 : pub enum PersistIndexPartWithDeletedFlagError {
279 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
280 : AlreadyInProgress(NaiveDateTime),
281 : #[error("the deleted_flag was already set, value is {0:?}")]
282 : AlreadyDeleted(NaiveDateTime),
283 : #[error(transparent)]
284 : Other(#[from] anyhow::Error),
285 : }
286 :
287 : /// A client for accessing a timeline's data in remote storage.
288 : ///
289 : /// This takes care of managing the number of connections, and balancing them
290 : /// across tenants. This also handles retries of failed uploads.
291 : ///
292 : /// Upload and delete requests are ordered so that before a deletion is
293 : /// performed, we wait for all preceding uploads to finish. This ensures sure
294 : /// that if you perform a compaction operation that reshuffles data in layer
295 : /// files, we don't have a transient state where the old files have already been
296 : /// deleted, but new files have not yet been uploaded.
297 : ///
298 : /// Similarly, this enforces an order between index-file uploads, and layer
299 : /// uploads. Before an index-file upload is performed, all preceding layer
300 : /// uploads must be finished.
301 : ///
302 : /// This also maintains a list of remote files, and automatically includes that
303 : /// in the index part file, whenever timeline metadata is uploaded.
304 : ///
305 : /// Downloads are not queued, they are performed immediately.
306 : pub struct RemoteTimelineClient {
307 : conf: &'static PageServerConf,
308 :
309 : runtime: tokio::runtime::Handle,
310 :
311 : tenant_shard_id: TenantShardId,
312 : timeline_id: TimelineId,
313 : generation: Generation,
314 :
315 : upload_queue: Mutex<UploadQueue>,
316 :
317 : metrics: Arc<RemoteTimelineClientMetrics>,
318 :
319 : storage_impl: GenericRemoteStorage,
320 :
321 : deletion_queue_client: DeletionQueueClient,
322 :
323 : cancel: CancellationToken,
324 : }
325 :
326 : /// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not
327 : /// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that.
328 : const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120);
329 : const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120);
330 :
331 : /// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow.
332 : ///
333 : /// This is a convenience for the various upload functions. In future
334 : /// the anyhow::Error result should be replaced with a more structured type that
335 : /// enables callers to avoid handling shutdown as an error.
336 CBC 28034 : async fn upload_cancellable<F>(cancel: &CancellationToken, future: F) -> anyhow::Result<()>
337 28034 : where
338 28034 : F: std::future::Future<Output = anyhow::Result<()>>,
339 28034 : {
340 821388 : match timeout_cancellable(UPLOAD_TIMEOUT, cancel, future).await {
341 25440 : Ok(Ok(())) => Ok(()),
342 2590 : Ok(Err(e)) => Err(e),
343 UBC 0 : Err(TimeoutCancellableError::Timeout) => Err(anyhow::anyhow!("Timeout")),
344 0 : Err(TimeoutCancellableError::Cancelled) => Err(anyhow::anyhow!("Shutting down")),
345 : }
346 CBC 28030 : }
347 : /// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to DownloaDError.
348 12033 : async fn download_cancellable<F, R>(
349 12033 : cancel: &CancellationToken,
350 12033 : future: F,
351 12033 : ) -> Result<R, DownloadError>
352 12033 : where
353 12033 : F: std::future::Future<Output = Result<R, DownloadError>>,
354 12033 : {
355 32371 : match timeout_cancellable(DOWNLOAD_TIMEOUT, cancel, future).await {
356 11312 : Ok(Ok(r)) => Ok(r),
357 718 : Ok(Err(e)) => Err(e),
358 : Err(TimeoutCancellableError::Timeout) => {
359 UBC 0 : Err(DownloadError::Other(anyhow::anyhow!("Timed out")))
360 : }
361 CBC 3 : Err(TimeoutCancellableError::Cancelled) => Err(DownloadError::Cancelled),
362 : }
363 12033 : }
364 :
365 : impl RemoteTimelineClient {
366 : ///
367 : /// Create a remote storage client for given timeline
368 : ///
369 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
370 : /// by calling init_upload_queue.
371 : ///
372 1315 : pub fn new(
373 1315 : remote_storage: GenericRemoteStorage,
374 1315 : deletion_queue_client: DeletionQueueClient,
375 1315 : conf: &'static PageServerConf,
376 1315 : tenant_shard_id: TenantShardId,
377 1315 : timeline_id: TimelineId,
378 1315 : generation: Generation,
379 1315 : ) -> RemoteTimelineClient {
380 1315 : RemoteTimelineClient {
381 1315 : conf,
382 1315 : runtime: if cfg!(test) {
383 : // remote_timeline_client.rs tests rely on current-thread runtime
384 145 : tokio::runtime::Handle::current()
385 : } else {
386 1170 : BACKGROUND_RUNTIME.handle().clone()
387 : },
388 1315 : tenant_shard_id,
389 1315 : timeline_id,
390 1315 : generation,
391 1315 : storage_impl: remote_storage,
392 1315 : deletion_queue_client,
393 1315 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
394 1315 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
395 1315 : &tenant_shard_id,
396 1315 : &timeline_id,
397 1315 : )),
398 1315 : cancel: CancellationToken::new(),
399 1315 : }
400 1315 : }
401 :
402 : /// Initialize the upload queue for a remote storage that already received
403 : /// an index file upload, i.e., it's not empty.
404 : /// The given `index_part` must be the one on the remote.
405 357 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
406 357 : let mut upload_queue = self.upload_queue.lock().unwrap();
407 357 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
408 357 : self.update_remote_physical_size_gauge(Some(index_part));
409 357 : info!(
410 357 : "initialized upload queue from remote index with {} layer files",
411 357 : index_part.layer_metadata.len()
412 357 : );
413 357 : Ok(())
414 357 : }
415 :
416 : /// Initialize the upload queue for the case where the remote storage is empty,
417 : /// i.e., it doesn't have an `IndexPart`.
418 921 : pub fn init_upload_queue_for_empty_remote(
419 921 : &self,
420 921 : local_metadata: &TimelineMetadata,
421 921 : ) -> anyhow::Result<()> {
422 921 : let mut upload_queue = self.upload_queue.lock().unwrap();
423 921 : upload_queue.initialize_empty_remote(local_metadata)?;
424 921 : self.update_remote_physical_size_gauge(None);
425 921 : info!("initialized upload queue as empty");
426 921 : Ok(())
427 921 : }
428 :
429 : /// Initialize the queue in stopped state. Used in startup path
430 : /// to continue deletion operation interrupted by pageserver crash or restart.
431 12 : pub fn init_upload_queue_stopped_to_continue_deletion(
432 12 : &self,
433 12 : index_part: &IndexPart,
434 12 : ) -> anyhow::Result<()> {
435 : // FIXME: consider newtype for DeletedIndexPart.
436 12 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
437 12 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
438 12 : ))?;
439 :
440 : {
441 12 : let mut upload_queue = self.upload_queue.lock().unwrap();
442 12 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
443 12 : self.update_remote_physical_size_gauge(Some(index_part));
444 12 : }
445 12 : // also locks upload queue, without dropping the guard above it will be a deadlock
446 12 : self.stop().expect("initialized line above");
447 12 :
448 12 : let mut upload_queue = self.upload_queue.lock().unwrap();
449 12 :
450 12 : upload_queue
451 12 : .stopped_mut()
452 12 : .expect("stopped above")
453 12 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
454 12 :
455 12 : Ok(())
456 12 : }
457 :
458 2993 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
459 2993 : match &mut *self.upload_queue.lock().unwrap() {
460 UBC 0 : UploadQueue::Uninitialized => None,
461 CBC 2883 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
462 110 : UploadQueue::Stopped(q) => q
463 110 : .upload_queue_for_deletion
464 110 : .get_last_remote_consistent_lsn_projected(),
465 : }
466 2993 : }
467 :
468 568864 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
469 568864 : match &mut *self.upload_queue.lock().unwrap() {
470 UBC 0 : UploadQueue::Uninitialized => None,
471 CBC 568754 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
472 110 : UploadQueue::Stopped(q) => Some(
473 110 : q.upload_queue_for_deletion
474 110 : .get_last_remote_consistent_lsn_visible(),
475 110 : ),
476 : }
477 568864 : }
478 :
479 6403 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
480 6403 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
481 5482 : current_remote_index_part
482 5482 : .layer_metadata
483 5482 : .values()
484 5482 : // If we don't have the file size for the layer, don't account for it in the metric.
485 530439 : .map(|ilmd| ilmd.file_size)
486 5482 : .sum()
487 : } else {
488 921 : 0
489 : };
490 6403 : self.metrics.remote_physical_size_set(size);
491 6403 : }
492 :
493 18 : pub fn get_remote_physical_size(&self) -> u64 {
494 18 : self.metrics.remote_physical_size_get()
495 18 : }
496 :
497 : //
498 : // Download operations.
499 : //
500 : // These don't use the per-timeline queue. They do use the global semaphore in
501 : // S3Bucket, to limit the total number of concurrent operations, though.
502 : //
503 :
504 : /// Download index file
505 401 : pub async fn download_index_file(
506 401 : &self,
507 401 : cancel: CancellationToken,
508 401 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
509 401 : let _unfinished_gauge_guard = self.metrics.call_begin(
510 401 : &RemoteOpFileKind::Index,
511 401 : &RemoteOpKind::Download,
512 401 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
513 401 : reason: "no need for a downloads gauge",
514 401 : },
515 401 : );
516 :
517 401 : let index_part = download::download_index_part(
518 401 : &self.storage_impl,
519 401 : &self.tenant_shard_id,
520 401 : &self.timeline_id,
521 401 : self.generation,
522 401 : cancel,
523 401 : )
524 401 : .measure_remote_op(
525 401 : self.tenant_shard_id.tenant_id,
526 401 : self.timeline_id,
527 401 : RemoteOpFileKind::Index,
528 401 : RemoteOpKind::Download,
529 401 : Arc::clone(&self.metrics),
530 401 : )
531 3001 : .await?;
532 :
533 376 : if index_part.deleted_at.is_some() {
534 12 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
535 : } else {
536 364 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
537 : }
538 401 : }
539 :
540 : /// Download a (layer) file from `path`, into local filesystem.
541 : ///
542 : /// 'layer_metadata' is the metadata from the remote index file.
543 : ///
544 : /// On success, returns the size of the downloaded file.
545 9411 : pub async fn download_layer_file(
546 9411 : &self,
547 9411 : layer_file_name: &LayerFileName,
548 9411 : layer_metadata: &LayerFileMetadata,
549 9411 : cancel: &CancellationToken,
550 9411 : ) -> anyhow::Result<u64> {
551 9397 : let downloaded_size = {
552 9411 : let _unfinished_gauge_guard = self.metrics.call_begin(
553 9411 : &RemoteOpFileKind::Layer,
554 9411 : &RemoteOpKind::Download,
555 9411 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
556 9411 : reason: "no need for a downloads gauge",
557 9411 : },
558 9411 : );
559 9411 : download::download_layer_file(
560 9411 : self.conf,
561 9411 : &self.storage_impl,
562 9411 : self.tenant_shard_id,
563 9411 : self.timeline_id,
564 9411 : layer_file_name,
565 9411 : layer_metadata,
566 9411 : cancel,
567 9411 : )
568 9411 : .measure_remote_op(
569 9411 : self.tenant_shard_id.tenant_id,
570 9411 : self.timeline_id,
571 9411 : RemoteOpFileKind::Layer,
572 9411 : RemoteOpKind::Download,
573 9411 : Arc::clone(&self.metrics),
574 9411 : )
575 376829 : .await?
576 : };
577 :
578 9397 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
579 9397 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
580 9397 :
581 9397 : Ok(downloaded_size)
582 9408 : }
583 :
584 : //
585 : // Upload operations.
586 : //
587 :
588 : ///
589 : /// Launch an index-file upload operation in the background, with
590 : /// updated metadata.
591 : ///
592 : /// The upload will be added to the queue immediately, but it
593 : /// won't be performed until all previously scheduled layer file
594 : /// upload operations have completed successfully. This is to
595 : /// ensure that when the index file claims that layers X, Y and Z
596 : /// exist in remote storage, they really do. To wait for the upload
597 : /// to complete, use `wait_completion`.
598 : ///
599 : /// If there were any changes to the list of files, i.e. if any
600 : /// layer file uploads were scheduled, since the last index file
601 : /// upload, those will be included too.
602 4814 : pub fn schedule_index_upload_for_metadata_update(
603 4814 : self: &Arc<Self>,
604 4814 : metadata: &TimelineMetadata,
605 4814 : ) -> anyhow::Result<()> {
606 4814 : let mut guard = self.upload_queue.lock().unwrap();
607 4814 : let upload_queue = guard.initialized_mut()?;
608 :
609 : // As documented in the struct definition, it's ok for latest_metadata to be
610 : // ahead of what's _actually_ on the remote during index upload.
611 4814 : upload_queue.latest_metadata = metadata.clone();
612 4814 :
613 4814 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
614 4814 :
615 4814 : Ok(())
616 4814 : }
617 :
618 : ///
619 : /// Launch an index-file upload operation in the background, if necessary.
620 : ///
621 : /// Use this function to schedule the update of the index file after
622 : /// scheduling file uploads or deletions. If no file uploads or deletions
623 : /// have been scheduled since the last index file upload, this does
624 : /// nothing.
625 : ///
626 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
627 : /// the upload to the upload queue and returns quickly.
628 1721 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
629 1721 : let mut guard = self.upload_queue.lock().unwrap();
630 1721 : let upload_queue = guard.initialized_mut()?;
631 :
632 1721 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
633 90 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
634 1631 : }
635 :
636 1721 : Ok(())
637 1721 : }
638 :
639 : /// Launch an index-file upload operation in the background (internal function)
640 5212 : fn schedule_index_upload(
641 5212 : self: &Arc<Self>,
642 5212 : upload_queue: &mut UploadQueueInitialized,
643 5212 : metadata: TimelineMetadata,
644 5212 : ) {
645 5212 : info!(
646 5212 : "scheduling metadata upload with {} files ({} changed)",
647 5212 : upload_queue.latest_files.len(),
648 5212 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
649 5212 : );
650 :
651 5212 : let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
652 5212 :
653 5212 : let index_part = IndexPart::new(
654 5212 : upload_queue.latest_files.clone(),
655 5212 : disk_consistent_lsn,
656 5212 : metadata,
657 5212 : );
658 5212 : let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
659 5212 : self.calls_unfinished_metric_begin(&op);
660 5212 : upload_queue.queued_operations.push_back(op);
661 5212 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
662 5212 :
663 5212 : // Launch the task immediately, if possible
664 5212 : self.launch_queued_tasks(upload_queue);
665 5212 : }
666 :
667 : ///
668 : /// Launch an upload operation in the background.
669 : ///
670 10038 : pub(crate) fn schedule_layer_file_upload(
671 10038 : self: &Arc<Self>,
672 10038 : layer: ResidentLayer,
673 10038 : ) -> anyhow::Result<()> {
674 10038 : let mut guard = self.upload_queue.lock().unwrap();
675 10038 : let upload_queue = guard.initialized_mut()?;
676 :
677 10038 : self.schedule_layer_file_upload0(upload_queue, layer);
678 10038 : self.launch_queued_tasks(upload_queue);
679 10038 : Ok(())
680 10038 : }
681 :
682 20463 : fn schedule_layer_file_upload0(
683 20463 : self: &Arc<Self>,
684 20463 : upload_queue: &mut UploadQueueInitialized,
685 20463 : layer: ResidentLayer,
686 20463 : ) {
687 20463 : let metadata = layer.metadata();
688 20463 :
689 20463 : upload_queue
690 20463 : .latest_files
691 20463 : .insert(layer.layer_desc().filename(), metadata.clone());
692 20463 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
693 20463 :
694 20463 : info!("scheduled layer file upload {layer}");
695 20463 : let op = UploadOp::UploadLayer(layer, metadata);
696 20463 : self.calls_unfinished_metric_begin(&op);
697 20463 : upload_queue.queued_operations.push_back(op);
698 20463 : }
699 :
700 : /// Launch a delete operation in the background.
701 : ///
702 : /// The operation does not modify local filesystem state.
703 : ///
704 : /// Note: This schedules an index file upload before the deletions. The
705 : /// deletion won't actually be performed, until all previously scheduled
706 : /// upload operations, and the index file upload, have completed
707 : /// successfully.
708 358 : pub fn schedule_layer_file_deletion(
709 358 : self: &Arc<Self>,
710 358 : names: &[LayerFileName],
711 358 : ) -> anyhow::Result<()> {
712 358 : let mut guard = self.upload_queue.lock().unwrap();
713 358 : let upload_queue = guard.initialized_mut()?;
714 :
715 358 : let with_metadata =
716 358 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
717 358 :
718 358 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
719 358 :
720 358 : // Launch the tasks immediately, if possible
721 358 : self.launch_queued_tasks(upload_queue);
722 358 : Ok(())
723 358 : }
724 :
725 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
726 : /// layer files, leaving them dangling.
727 : ///
728 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
729 : /// is invoked on them.
730 24 : pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
731 24 : let mut guard = self.upload_queue.lock().unwrap();
732 24 : let upload_queue = guard.initialized_mut()?;
733 :
734 : // just forget the return value; after uploading the next index_part.json, we can consider
735 : // the layer files as "dangling". this is fine, at worst case we create work for the
736 : // scrubber.
737 :
738 1402 : let names = gc_layers.iter().map(|x| x.layer_desc().filename());
739 24 :
740 24 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
741 24 :
742 24 : self.launch_queued_tasks(upload_queue);
743 24 :
744 24 : Ok(())
745 24 : }
746 :
747 : /// Update the remote index file, removing the to-be-deleted files from the index,
748 : /// allowing scheduling of actual deletions later.
749 664 : fn schedule_unlinking_of_layers_from_index_part0<I>(
750 664 : self: &Arc<Self>,
751 664 : upload_queue: &mut UploadQueueInitialized,
752 664 : names: I,
753 664 : ) -> Vec<(LayerFileName, LayerFileMetadata)>
754 664 : where
755 664 : I: IntoIterator<Item = LayerFileName>,
756 664 : {
757 664 : // Deleting layers doesn't affect the values stored in TimelineMetadata,
758 664 : // so we don't need update it. Just serialize it.
759 664 : let metadata = upload_queue.latest_metadata.clone();
760 664 :
761 664 : // Decorate our list of names with each name's metadata, dropping
762 664 : // names that are unexpectedly missing from our metadata. This metadata
763 664 : // is later used when physically deleting layers, to construct key paths.
764 664 : let with_metadata: Vec<_> = names
765 664 : .into_iter()
766 5009 : .filter_map(|name| {
767 5009 : let meta = upload_queue.latest_files.remove(&name);
768 :
769 5009 : if let Some(meta) = meta {
770 5007 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
771 5007 : Some((name, meta))
772 : } else {
773 : // This can only happen if we forgot to to schedule the file upload
774 : // before scheduling the delete. Log it because it is a rare/strange
775 : // situation, and in case something is misbehaving, we'd like to know which
776 : // layers experienced this.
777 2 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
778 2 : None
779 : }
780 5009 : })
781 664 : .collect();
782 :
783 : #[cfg(feature = "testing")]
784 5671 : for (name, metadata) in &with_metadata {
785 5007 : let gen = metadata.generation;
786 5007 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
787 UBC 0 : if unexpected == gen {
788 0 : tracing::error!("{name} was unlinked twice with same generation");
789 : } else {
790 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
791 : }
792 CBC 5007 : }
793 : }
794 :
795 : // after unlinking files from the upload_queue.latest_files we must always schedule an
796 : // index_part update, because that needs to be uploaded before we can actually delete the
797 : // files.
798 664 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
799 308 : self.schedule_index_upload(upload_queue, metadata);
800 356 : }
801 :
802 664 : with_metadata
803 664 : }
804 :
805 : /// Schedules deletion for layer files which have previously been unlinked from the
806 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
807 4305 : pub(crate) fn schedule_deletion_of_unlinked(
808 4305 : self: &Arc<Self>,
809 4305 : layers: Vec<(LayerFileName, LayerFileMetadata)>,
810 4305 : ) -> anyhow::Result<()> {
811 4305 : let mut guard = self.upload_queue.lock().unwrap();
812 4305 : let upload_queue = guard.initialized_mut()?;
813 :
814 4298 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
815 4298 : self.launch_queued_tasks(upload_queue);
816 4298 : Ok(())
817 4305 : }
818 :
819 4656 : fn schedule_deletion_of_unlinked0(
820 4656 : self: &Arc<Self>,
821 4656 : upload_queue: &mut UploadQueueInitialized,
822 4656 : mut with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
823 4656 : ) {
824 4656 : // Filter out any layers which were not created by this tenant shard. These are
825 4656 : // layers that originate from some ancestor shard after a split, and may still
826 4656 : // be referenced by other shards. We are free to delete them locally and remove
827 4656 : // them from our index (and would have already done so when we reach this point
828 4656 : // in the code), but we may not delete them remotely.
829 4656 : with_metadata.retain(|(name, meta)| {
830 4300 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
831 4300 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
832 4300 : if !retain {
833 UBC 0 : tracing::debug!(
834 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
835 0 : meta.shard
836 0 : );
837 CBC 4300 : }
838 4300 : retain
839 4656 : });
840 :
841 8956 : for (name, meta) in &with_metadata {
842 4300 : info!(
843 4300 : "scheduling deletion of layer {}{} (shard {})",
844 4300 : name,
845 4300 : meta.generation.get_suffix(),
846 4300 : meta.shard
847 4300 : );
848 : }
849 :
850 : #[cfg(feature = "testing")]
851 8956 : for (name, meta) in &with_metadata {
852 4300 : let gen = meta.generation;
853 4300 : match upload_queue.dangling_files.remove(name) {
854 4300 : Some(same) if same == gen => { /* expected */ }
855 UBC 0 : Some(other) => {
856 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
857 : }
858 : None => {
859 0 : tracing::error!("{name} was unlinked but was not dangling");
860 : }
861 : }
862 : }
863 :
864 : // schedule the actual deletions
865 CBC 4656 : let op = UploadOp::Delete(Delete {
866 4656 : layers: with_metadata,
867 4656 : });
868 4656 : self.calls_unfinished_metric_begin(&op);
869 4656 : upload_queue.queued_operations.push_back(op);
870 4656 : }
871 :
872 : /// Schedules a compaction update to the remote `index_part.json`.
873 : ///
874 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
875 282 : pub(crate) fn schedule_compaction_update(
876 282 : self: &Arc<Self>,
877 282 : compacted_from: &[Layer],
878 282 : compacted_to: &[ResidentLayer],
879 282 : ) -> anyhow::Result<()> {
880 282 : let mut guard = self.upload_queue.lock().unwrap();
881 282 : let upload_queue = guard.initialized_mut()?;
882 :
883 10707 : for layer in compacted_to {
884 10425 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
885 10425 : }
886 :
887 3603 : let names = compacted_from.iter().map(|x| x.layer_desc().filename());
888 282 :
889 282 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
890 282 : self.launch_queued_tasks(upload_queue);
891 282 :
892 282 : Ok(())
893 282 : }
894 :
895 : /// Wait for all previously scheduled uploads/deletions to complete
896 1121 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
897 1121 : let mut receiver = {
898 1121 : let mut guard = self.upload_queue.lock().unwrap();
899 1121 : let upload_queue = guard.initialized_mut()?;
900 1121 : self.schedule_barrier0(upload_queue)
901 1121 : };
902 1121 :
903 1121 : if receiver.changed().await.is_err() {
904 UBC 0 : anyhow::bail!("wait_completion aborted because upload queue was stopped");
905 CBC 1117 : }
906 1117 :
907 1117 : Ok(())
908 1117 : }
909 :
910 357 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
911 357 : let mut guard = self.upload_queue.lock().unwrap();
912 357 : let upload_queue = guard.initialized_mut()?;
913 357 : self.schedule_barrier0(upload_queue);
914 357 : Ok(())
915 357 : }
916 :
917 1478 : fn schedule_barrier0(
918 1478 : self: &Arc<Self>,
919 1478 : upload_queue: &mut UploadQueueInitialized,
920 1478 : ) -> tokio::sync::watch::Receiver<()> {
921 1478 : let (sender, receiver) = tokio::sync::watch::channel(());
922 1478 : let barrier_op = UploadOp::Barrier(sender);
923 1478 :
924 1478 : upload_queue.queued_operations.push_back(barrier_op);
925 1478 : // Don't count this kind of operation!
926 1478 :
927 1478 : // Launch the task immediately, if possible
928 1478 : self.launch_queued_tasks(upload_queue);
929 1478 :
930 1478 : receiver
931 1478 : }
932 :
933 : /// Wait for all previously scheduled operations to complete, and then stop.
934 : ///
935 : /// Not cancellation safe
936 201 : pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
937 201 : // On cancellation the queue is left in ackward state of refusing new operations but
938 201 : // proper stop is yet to be called. On cancel the original or some later task must call
939 201 : // `stop` or `shutdown`.
940 201 : let sg = scopeguard::guard((), |_| {
941 UBC 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
942 CBC 201 : });
943 :
944 201 : let fut = {
945 201 : let mut guard = self.upload_queue.lock().unwrap();
946 201 : let upload_queue = match &mut *guard {
947 UBC 0 : UploadQueue::Stopped(_) => return Ok(()),
948 0 : UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
949 CBC 201 : UploadQueue::Initialized(ref mut init) => init,
950 201 : };
951 201 :
952 201 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
953 201 : // just don't add more of these as they would never complete.
954 201 : //
955 201 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
956 201 : // in every place we would not have to jump through this hoop, and this method could be
957 201 : // made cancellable.
958 201 : if !upload_queue.shutting_down {
959 201 : upload_queue.shutting_down = true;
960 201 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
961 201 : // this operation is not counted similar to Barrier
962 201 :
963 201 : self.launch_queued_tasks(upload_queue);
964 201 : }
965 :
966 201 : upload_queue.shutdown_ready.clone().acquire_owned()
967 : };
968 :
969 201 : let res = fut.await;
970 :
971 201 : scopeguard::ScopeGuard::into_inner(sg);
972 201 :
973 201 : match res {
974 UBC 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
975 CBC 201 : Err(_closed) => {
976 201 : // expected
977 201 : }
978 201 : }
979 201 :
980 201 : self.stop()
981 201 : }
982 :
983 : /// Set the deleted_at field in the remote index file.
984 : ///
985 : /// This fails if the upload queue has not been `stop()`ed.
986 : ///
987 : /// The caller is responsible for calling `stop()` AND for waiting
988 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
989 : /// Check method [`RemoteTimelineClient::stop`] for details.
990 UBC 0 : #[instrument(skip_all)]
991 : pub(crate) async fn persist_index_part_with_deleted_flag(
992 : self: &Arc<Self>,
993 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
994 : let index_part_with_deleted_at = {
995 : let mut locked = self.upload_queue.lock().unwrap();
996 :
997 : // We must be in stopped state because otherwise
998 : // we can have inprogress index part upload that can overwrite the file
999 : // with missing is_deleted flag that we going to set below
1000 : let stopped = locked.stopped_mut()?;
1001 :
1002 : match stopped.deleted_at {
1003 : SetDeletedFlagProgress::NotRunning => (), // proceed
1004 : SetDeletedFlagProgress::InProgress(at) => {
1005 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1006 : }
1007 : SetDeletedFlagProgress::Successful(at) => {
1008 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1009 : }
1010 : };
1011 : let deleted_at = Utc::now().naive_utc();
1012 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1013 :
1014 : let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
1015 : .context("IndexPart serialize")?;
1016 : index_part.deleted_at = Some(deleted_at);
1017 : index_part
1018 : };
1019 :
1020 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1021 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1022 0 : let stopped = locked
1023 0 : .stopped_mut()
1024 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1025 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1026 0 : });
1027 :
1028 CBC 159 : pausable_failpoint!("persist_deleted_index_part");
1029 :
1030 : backoff::retry(
1031 214 : || {
1032 214 : upload::upload_index_part(
1033 214 : &self.storage_impl,
1034 214 : &self.tenant_shard_id,
1035 214 : &self.timeline_id,
1036 214 : self.generation,
1037 214 : &index_part_with_deleted_at,
1038 214 : &self.cancel,
1039 214 : )
1040 214 : },
1041 55 : |_e| false,
1042 : 1,
1043 : // have just a couple of attempts
1044 : // when executed as part of timeline deletion this happens in context of api call
1045 : // when executed as part of tenant deletion this happens in the background
1046 : 2,
1047 : "persist_index_part_with_deleted_flag",
1048 UBC 0 : backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
1049 : )
1050 : .await?;
1051 :
1052 : // all good, disarm the guard and mark as success
1053 : ScopeGuard::into_inner(undo_deleted_at);
1054 : {
1055 : let mut locked = self.upload_queue.lock().unwrap();
1056 :
1057 : let stopped = locked
1058 : .stopped_mut()
1059 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1060 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1061 : index_part_with_deleted_at
1062 : .deleted_at
1063 : .expect("we set it above"),
1064 : );
1065 : }
1066 :
1067 : Ok(())
1068 : }
1069 :
1070 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1071 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1072 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1073 CBC 171 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
1074 171 : debug_assert_current_span_has_tenant_and_timeline_id();
1075 :
1076 171 : let layers: Vec<RemotePath> = {
1077 171 : let mut locked = self.upload_queue.lock().unwrap();
1078 171 : let stopped = locked.stopped_mut()?;
1079 :
1080 171 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1081 UBC 0 : anyhow::bail!("deleted_at is not set")
1082 CBC 171 : }
1083 :
1084 171 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1085 :
1086 171 : stopped
1087 171 : .upload_queue_for_deletion
1088 171 : .latest_files
1089 171 : .drain()
1090 4497 : .map(|(file_name, meta)| {
1091 4497 : remote_layer_path(
1092 4497 : &self.tenant_shard_id.tenant_id,
1093 4497 : &self.timeline_id,
1094 4497 : meta.shard,
1095 4497 : &file_name,
1096 4497 : meta.generation,
1097 4497 : )
1098 4497 : })
1099 171 : .collect()
1100 171 : };
1101 171 :
1102 171 : let layer_deletion_count = layers.len();
1103 171 : self.deletion_queue_client.push_immediate(layers).await?;
1104 :
1105 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1106 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1107 171 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1108 171 :
1109 171 : // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
1110 171 : // taking the burden of listing all the layers that we already know we should delete.
1111 171 : self.deletion_queue_client.flush_immediate().await?;
1112 :
1113 171 : let remaining = backoff::retry(
1114 229 : || async {
1115 229 : self.storage_impl
1116 229 : .list_files(Some(&timeline_storage_path))
1117 746 : .await
1118 229 : },
1119 171 : |_e| false,
1120 171 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1121 171 : FAILED_REMOTE_OP_RETRIES,
1122 171 : "list_prefixes",
1123 171 : backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
1124 171 : )
1125 746 : .await
1126 171 : .context("list prefixes")?;
1127 :
1128 : // We will delete the current index_part object last, since it acts as a deletion
1129 : // marker via its deleted_at attribute
1130 171 : let latest_index = remaining
1131 171 : .iter()
1132 2141 : .filter(|p| {
1133 2141 : p.object_name()
1134 2141 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1135 2141 : .unwrap_or(false)
1136 2141 : })
1137 202 : .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
1138 202 : .max_by_key(|i| i.1)
1139 171 : .map(|i| i.0.clone())
1140 171 : .unwrap_or(
1141 171 : // No generation-suffixed indices, assume we are dealing with
1142 171 : // a legacy index.
1143 171 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1144 171 : );
1145 171 :
1146 171 : let remaining_layers: Vec<RemotePath> = remaining
1147 171 : .into_iter()
1148 2141 : .filter(|p| {
1149 2141 : if p == &latest_index {
1150 166 : return false;
1151 1975 : }
1152 1975 : if let Some(name) = p.object_name() {
1153 1975 : if name == INITDB_PATH {
1154 154 : return false;
1155 1821 : }
1156 UBC 0 : }
1157 CBC 1821 : true
1158 2141 : })
1159 1821 : .inspect(|path| {
1160 1821 : if let Some(name) = path.object_name() {
1161 1821 : info!(%name, "deleting a file not referenced from index_part.json");
1162 : } else {
1163 UBC 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1164 : }
1165 CBC 1821 : })
1166 171 : .collect();
1167 171 :
1168 171 : let not_referenced_count = remaining_layers.len();
1169 171 : if !remaining_layers.is_empty() {
1170 98 : self.deletion_queue_client
1171 98 : .push_immediate(remaining_layers)
1172 UBC 0 : .await?;
1173 CBC 73 : }
1174 :
1175 171 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1176 8 : Err(anyhow::anyhow!(
1177 8 : "failpoint: timeline-delete-before-index-delete"
1178 8 : ))?
1179 171 : });
1180 :
1181 UBC 0 : debug!("enqueuing index part deletion");
1182 CBC 163 : self.deletion_queue_client
1183 163 : .push_immediate([latest_index].to_vec())
1184 UBC 0 : .await?;
1185 :
1186 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1187 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1188 CBC 163 : self.deletion_queue_client.flush_immediate().await?;
1189 :
1190 163 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1191 2 : Err(anyhow::anyhow!(
1192 2 : "failpoint: timeline-delete-after-index-delete"
1193 2 : ))?
1194 163 : });
1195 :
1196 161 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1197 :
1198 161 : Ok(())
1199 171 : }
1200 :
1201 : ///
1202 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1203 : /// the ordering constraints.
1204 : ///
1205 : /// The caller needs to already hold the `upload_queue` lock.
1206 47154 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1207 76924 : while let Some(next_op) = upload_queue.queued_operations.front() {
1208 : // Can we run this task now?
1209 56357 : let can_run_now = match next_op {
1210 : UploadOp::UploadLayer(_, _) => {
1211 : // Can always be scheduled.
1212 19921 : true
1213 : }
1214 : UploadOp::UploadMetadata(_, _) => {
1215 : // These can only be performed after all the preceding operations
1216 : // have finished.
1217 29672 : upload_queue.inprogress_tasks.is_empty()
1218 : }
1219 : UploadOp::Delete(_) => {
1220 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1221 3553 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1222 : }
1223 :
1224 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1225 3211 : upload_queue.inprogress_tasks.is_empty()
1226 : }
1227 : };
1228 :
1229 : // If we cannot launch this task, don't look any further.
1230 : //
1231 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1232 : // them now, but we don't try to do that currently. For example, if the frontmost task
1233 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1234 : // could still start layer uploads that were scheduled later.
1235 56357 : if !can_run_now {
1236 26386 : break;
1237 29971 : }
1238 29971 :
1239 29971 : if let UploadOp::Shutdown = next_op {
1240 : // leave the op in the queue but do not start more tasks; it will be dropped when
1241 : // the stop is called.
1242 201 : upload_queue.shutdown_ready.close();
1243 201 : break;
1244 29770 : }
1245 29770 :
1246 29770 : // We can launch this task. Remove it from the queue first.
1247 29770 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1248 29770 :
1249 29770 : debug!("starting op: {}", next_op);
1250 :
1251 : // Update the counters
1252 29770 : match next_op {
1253 19921 : UploadOp::UploadLayer(_, _) => {
1254 19921 : upload_queue.num_inprogress_layer_uploads += 1;
1255 19921 : }
1256 5125 : UploadOp::UploadMetadata(_, _) => {
1257 5125 : upload_queue.num_inprogress_metadata_uploads += 1;
1258 5125 : }
1259 3252 : UploadOp::Delete(_) => {
1260 3252 : upload_queue.num_inprogress_deletions += 1;
1261 3252 : }
1262 1472 : UploadOp::Barrier(sender) => {
1263 1472 : sender.send_replace(());
1264 1472 : continue;
1265 : }
1266 UBC 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1267 : };
1268 :
1269 : // Assign unique ID to this task
1270 CBC 28298 : upload_queue.task_counter += 1;
1271 28298 : let upload_task_id = upload_queue.task_counter;
1272 28298 :
1273 28298 : // Add it to the in-progress map
1274 28298 : let task = Arc::new(UploadTask {
1275 28298 : task_id: upload_task_id,
1276 28298 : op: next_op,
1277 28298 : retries: AtomicU32::new(0),
1278 28298 : });
1279 28298 : upload_queue
1280 28298 : .inprogress_tasks
1281 28298 : .insert(task.task_id, Arc::clone(&task));
1282 28298 :
1283 28298 : // Spawn task to perform the task
1284 28298 : let self_rc = Arc::clone(self);
1285 28298 : let tenant_shard_id = self.tenant_shard_id;
1286 28298 : let timeline_id = self.timeline_id;
1287 28298 : task_mgr::spawn(
1288 28298 : &self.runtime,
1289 28298 : TaskKind::RemoteUploadTask,
1290 28298 : Some(self.tenant_shard_id),
1291 28298 : Some(self.timeline_id),
1292 28298 : "remote upload",
1293 : false,
1294 28289 : async move {
1295 869890 : self_rc.perform_upload_task(task).await;
1296 28280 : Ok(())
1297 28280 : }
1298 28298 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1299 : );
1300 :
1301 : // Loop back to process next task
1302 : }
1303 47154 : }
1304 :
1305 : ///
1306 : /// Perform an upload task.
1307 : ///
1308 : /// The task is in the `inprogress_tasks` list. This function will try to
1309 : /// execute it, retrying forever. On successful completion, the task is
1310 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1311 : /// queue that were waiting by the completion are launched.
1312 : ///
1313 : /// The task can be shut down, however. That leads to stopping the whole
1314 : /// queue.
1315 : ///
1316 28289 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1317 : // Loop to retry until it completes.
1318 30765 : loop {
1319 30765 : // If we're requested to shut down, close up shop and exit.
1320 30765 : //
1321 30765 : // Note: We only check for the shutdown requests between retries, so
1322 30765 : // if a shutdown request arrives while we're busy uploading, in the
1323 30765 : // upload::upload:*() call below, we will wait not exit until it has
1324 30765 : // finished. We probably could cancel the upload by simply dropping
1325 30765 : // the Future, but we're not 100% sure if the remote storage library
1326 30765 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1327 30765 : // upload finishes or times out soon enough.
1328 30765 : if task_mgr::is_shutdown_requested() {
1329 271 : info!("upload task cancelled by shutdown request");
1330 271 : match self.stop() {
1331 271 : Ok(()) => {}
1332 : Err(StopError::QueueUninitialized) => {
1333 UBC 0 : unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
1334 : }
1335 : }
1336 CBC 271 : return;
1337 30494 : }
1338 :
1339 30494 : let upload_result: anyhow::Result<()> = match &task.op {
1340 21499 : UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
1341 21499 : let path = layer.local_path();
1342 21499 : upload::upload_timeline_layer(
1343 21499 : self.conf,
1344 21499 : &self.storage_impl,
1345 21499 : path,
1346 21499 : layer_metadata,
1347 21499 : self.generation,
1348 21499 : &self.cancel,
1349 21499 : )
1350 21499 : .measure_remote_op(
1351 21499 : self.tenant_shard_id.tenant_id,
1352 21499 : self.timeline_id,
1353 21499 : RemoteOpFileKind::Layer,
1354 21499 : RemoteOpKind::Upload,
1355 21499 : Arc::clone(&self.metrics),
1356 21499 : )
1357 847886 : .await
1358 : }
1359 5745 : UploadOp::UploadMetadata(ref index_part, _lsn) => {
1360 5745 : let mention_having_future_layers = if cfg!(feature = "testing") {
1361 5745 : index_part
1362 5745 : .layer_metadata
1363 5745 : .keys()
1364 476256 : .any(|x| x.is_in_future(*_lsn))
1365 : } else {
1366 UBC 0 : false
1367 : };
1368 :
1369 CBC 5745 : let res = upload::upload_index_part(
1370 5745 : &self.storage_impl,
1371 5745 : &self.tenant_shard_id,
1372 5745 : &self.timeline_id,
1373 5745 : self.generation,
1374 5745 : index_part,
1375 5745 : &self.cancel,
1376 5745 : )
1377 5745 : .measure_remote_op(
1378 5745 : self.tenant_shard_id.tenant_id,
1379 5745 : self.timeline_id,
1380 5745 : RemoteOpFileKind::Index,
1381 5745 : RemoteOpKind::Upload,
1382 5745 : Arc::clone(&self.metrics),
1383 5745 : )
1384 18806 : .await;
1385 5741 : if res.is_ok() {
1386 5113 : self.update_remote_physical_size_gauge(Some(index_part));
1387 5113 : if mention_having_future_layers {
1388 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1389 15 : tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
1390 5098 : }
1391 628 : }
1392 5741 : res
1393 : }
1394 3250 : UploadOp::Delete(delete) => {
1395 3250 : pausable_failpoint!("before-delete-layer-pausable");
1396 3250 : self.deletion_queue_client
1397 3250 : .push_layers(
1398 3250 : self.tenant_shard_id,
1399 3250 : self.timeline_id,
1400 3250 : self.generation,
1401 3250 : delete.layers.clone(),
1402 3250 : )
1403 42 : .await
1404 3250 : .map_err(|e| anyhow::anyhow!(e))
1405 : }
1406 UBC 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
1407 : // unreachable. Barrier operations are handled synchronously in
1408 : // launch_queued_tasks
1409 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
1410 0 : break;
1411 : }
1412 : };
1413 :
1414 CBC 30486 : match upload_result {
1415 : Ok(()) => {
1416 28009 : break;
1417 : }
1418 2477 : Err(e) => {
1419 2477 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1420 2477 :
1421 2477 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1422 2477 : // or other external reasons. Such issues are relatively regular, so log them
1423 2477 : // at info level at first, and only WARN if the operation fails repeatedly.
1424 2477 : //
1425 2477 : // (See similar logic for downloads in `download::download_retry`)
1426 2477 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1427 2476 : info!(
1428 2476 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1429 2476 : task.op, retries, e
1430 2476 : );
1431 : } else {
1432 1 : warn!(
1433 1 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1434 1 : task.op, retries, e
1435 1 : );
1436 : }
1437 :
1438 : // sleep until it's time to retry, or we're cancelled
1439 2477 : exponential_backoff(
1440 2477 : retries,
1441 2477 : DEFAULT_BASE_BACKOFF_SECONDS,
1442 2477 : DEFAULT_MAX_BACKOFF_SECONDS,
1443 2477 : &shutdown_token(),
1444 2477 : )
1445 5 : .await;
1446 : }
1447 : }
1448 : }
1449 :
1450 28009 : let retries = task.retries.load(Ordering::SeqCst);
1451 28009 : if retries > 0 {
1452 2199 : info!(
1453 2199 : "remote task {} completed successfully after {} retries",
1454 2199 : task.op, retries
1455 2199 : );
1456 : } else {
1457 UBC 0 : debug!("remote task {} completed successfully", task.op);
1458 : }
1459 :
1460 : // The task has completed successfully. Remove it from the in-progress list.
1461 CBC 25263 : let lsn_update = {
1462 28009 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1463 28009 : let upload_queue = match upload_queue_guard.deref_mut() {
1464 UBC 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1465 CBC 2746 : UploadQueue::Stopped(_stopped) => {
1466 2746 : None
1467 : },
1468 25263 : UploadQueue::Initialized(qi) => { Some(qi) }
1469 : };
1470 :
1471 28009 : let upload_queue = match upload_queue {
1472 25263 : Some(upload_queue) => upload_queue,
1473 : None => {
1474 2746 : info!("another concurrent task already stopped the queue");
1475 2746 : return;
1476 : }
1477 : };
1478 :
1479 25263 : upload_queue.inprogress_tasks.remove(&task.task_id);
1480 :
1481 25263 : let lsn_update = match task.op {
1482 : UploadOp::UploadLayer(_, _) => {
1483 16902 : upload_queue.num_inprogress_layer_uploads -= 1;
1484 16902 : None
1485 : }
1486 5111 : UploadOp::UploadMetadata(_, lsn) => {
1487 5111 : upload_queue.num_inprogress_metadata_uploads -= 1;
1488 5111 : // XXX monotonicity check?
1489 5111 :
1490 5111 : upload_queue.projected_remote_consistent_lsn = Some(lsn);
1491 5111 : if self.generation.is_none() {
1492 : // Legacy mode: skip validating generation
1493 16 : upload_queue.visible_remote_consistent_lsn.store(lsn);
1494 16 : None
1495 : } else {
1496 5095 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
1497 : }
1498 : }
1499 : UploadOp::Delete(_) => {
1500 3250 : upload_queue.num_inprogress_deletions -= 1;
1501 3250 : None
1502 : }
1503 UBC 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
1504 : };
1505 :
1506 : // Launch any queued tasks that were unblocked by this one.
1507 CBC 25263 : self.launch_queued_tasks(upload_queue);
1508 25263 : lsn_update
1509 : };
1510 :
1511 25263 : if let Some((lsn, slot)) = lsn_update {
1512 : // Updates to the remote_consistent_lsn we advertise to pageservers
1513 : // are all routed through the DeletionQueue, to enforce important
1514 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
1515 5095 : self.deletion_queue_client
1516 5095 : .update_remote_consistent_lsn(
1517 5095 : self.tenant_shard_id,
1518 5095 : self.timeline_id,
1519 5095 : self.generation,
1520 5095 : lsn,
1521 5095 : slot,
1522 5095 : )
1523 UBC 0 : .await;
1524 CBC 20168 : }
1525 :
1526 25263 : self.calls_unfinished_metric_end(&task.op);
1527 28280 : }
1528 :
1529 57796 : fn calls_unfinished_metric_impl(
1530 57796 : &self,
1531 57796 : op: &UploadOp,
1532 57796 : ) -> Option<(
1533 57796 : RemoteOpFileKind,
1534 57796 : RemoteOpKind,
1535 57796 : RemoteTimelineClientMetricsCallTrackSize,
1536 57796 : )> {
1537 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
1538 57796 : let res = match op {
1539 37907 : UploadOp::UploadLayer(_, m) => (
1540 37907 : RemoteOpFileKind::Layer,
1541 37907 : RemoteOpKind::Upload,
1542 37907 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
1543 37907 : ),
1544 10401 : UploadOp::UploadMetadata(_, _) => (
1545 10401 : RemoteOpFileKind::Index,
1546 10401 : RemoteOpKind::Upload,
1547 10401 : DontTrackSize {
1548 10401 : reason: "metadata uploads are tiny",
1549 10401 : },
1550 10401 : ),
1551 9287 : UploadOp::Delete(_delete) => (
1552 9287 : RemoteOpFileKind::Layer,
1553 9287 : RemoteOpKind::Delete,
1554 9287 : DontTrackSize {
1555 9287 : reason: "should we track deletes? positive or negative sign?",
1556 9287 : },
1557 9287 : ),
1558 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
1559 : // we do not account these
1560 201 : return None;
1561 : }
1562 : };
1563 57595 : Some(res)
1564 57796 : }
1565 :
1566 30331 : fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
1567 30331 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1568 30331 : Some(x) => x,
1569 UBC 0 : None => return,
1570 : };
1571 CBC 30331 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
1572 30331 : guard.will_decrement_manually(); // in unfinished_ops_metric_end()
1573 30331 : }
1574 :
1575 27465 : fn calls_unfinished_metric_end(&self, op: &UploadOp) {
1576 27465 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1577 27264 : Some(x) => x,
1578 201 : None => return,
1579 : };
1580 27264 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
1581 27465 : }
1582 :
1583 : /// Close the upload queue for new operations and cancel queued operations.
1584 : ///
1585 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
1586 : ///
1587 : /// In-progress operations will still be running after this function returns.
1588 : /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
1589 : /// to wait for them to complete, after calling this function.
1590 1163 : pub(crate) fn stop(&self) -> Result<(), StopError> {
1591 1163 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
1592 1163 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
1593 1163 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
1594 1163 : let mut guard = self.upload_queue.lock().unwrap();
1595 1163 : match &mut *guard {
1596 UBC 0 : UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
1597 : UploadQueue::Stopped(_) => {
1598 : // nothing to do
1599 CBC 655 : info!("another concurrent task already shut down the queue");
1600 655 : Ok(())
1601 : }
1602 508 : UploadQueue::Initialized(initialized) => {
1603 508 : info!("shutting down upload queue");
1604 :
1605 : // Replace the queue with the Stopped state, taking ownership of the old
1606 : // Initialized queue. We will do some checks on it, and then drop it.
1607 508 : let qi = {
1608 : // Here we preserve working version of the upload queue for possible use during deletions.
1609 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
1610 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
1611 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
1612 508 : let upload_queue_for_deletion = UploadQueueInitialized {
1613 508 : task_counter: 0,
1614 508 : latest_files: initialized.latest_files.clone(),
1615 508 : latest_files_changes_since_metadata_upload_scheduled: 0,
1616 508 : latest_metadata: initialized.latest_metadata.clone(),
1617 508 : projected_remote_consistent_lsn: None,
1618 508 : visible_remote_consistent_lsn: initialized
1619 508 : .visible_remote_consistent_lsn
1620 508 : .clone(),
1621 508 : num_inprogress_layer_uploads: 0,
1622 508 : num_inprogress_metadata_uploads: 0,
1623 508 : num_inprogress_deletions: 0,
1624 508 : inprogress_tasks: HashMap::default(),
1625 508 : queued_operations: VecDeque::default(),
1626 508 : #[cfg(feature = "testing")]
1627 508 : dangling_files: HashMap::default(),
1628 508 : shutting_down: false,
1629 508 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
1630 508 : };
1631 508 :
1632 508 : let upload_queue = std::mem::replace(
1633 508 : &mut *guard,
1634 508 : UploadQueue::Stopped(UploadQueueStopped {
1635 508 : upload_queue_for_deletion,
1636 508 : deleted_at: SetDeletedFlagProgress::NotRunning,
1637 508 : }),
1638 508 : );
1639 508 : if let UploadQueue::Initialized(qi) = upload_queue {
1640 508 : qi
1641 : } else {
1642 UBC 0 : unreachable!("we checked in the match above that it is Initialized");
1643 : }
1644 : };
1645 :
1646 : // consistency check
1647 CBC 508 : assert_eq!(
1648 508 : qi.num_inprogress_layer_uploads
1649 508 : + qi.num_inprogress_metadata_uploads
1650 508 : + qi.num_inprogress_deletions,
1651 508 : qi.inprogress_tasks.len()
1652 508 : );
1653 :
1654 : // We don't need to do anything here for in-progress tasks. They will finish
1655 : // on their own, decrement the unfinished-task counter themselves, and observe
1656 : // that the queue is Stopped.
1657 508 : drop(qi.inprogress_tasks);
1658 :
1659 : // Tear down queued ops
1660 2202 : for op in qi.queued_operations.into_iter() {
1661 2202 : self.calls_unfinished_metric_end(&op);
1662 2202 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
1663 2202 : // which is exactly what we want to happen.
1664 2202 : drop(op);
1665 2202 : }
1666 :
1667 : // We're done.
1668 508 : drop(guard);
1669 508 : Ok(())
1670 : }
1671 : }
1672 1163 : }
1673 :
1674 6 : pub(crate) fn get_layers_metadata(
1675 6 : &self,
1676 6 : layers: Vec<LayerFileName>,
1677 6 : ) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
1678 6 : let q = self.upload_queue.lock().unwrap();
1679 6 : let q = match &*q {
1680 : UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
1681 UBC 0 : anyhow::bail!("queue is in state {}", q.as_str())
1682 : }
1683 CBC 6 : UploadQueue::Initialized(inner) => inner,
1684 6 : };
1685 6 :
1686 2453 : let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
1687 6 :
1688 6 : Ok(decorated.collect())
1689 6 : }
1690 : }
1691 :
1692 3323 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1693 3323 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
1694 3323 : RemotePath::from_string(&path).expect("Failed to construct path")
1695 3323 : }
1696 :
1697 3015 : pub fn remote_timeline_path(
1698 3015 : tenant_shard_id: &TenantShardId,
1699 3015 : timeline_id: &TimelineId,
1700 3015 : ) -> RemotePath {
1701 3015 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
1702 3015 : }
1703 :
1704 : /// Note that the shard component of a remote layer path is _not_ always the same
1705 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
1706 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
1707 17953 : pub fn remote_layer_path(
1708 17953 : tenant_id: &TenantId,
1709 17953 : timeline_id: &TimelineId,
1710 17953 : shard: ShardIndex,
1711 17953 : layer_file_name: &LayerFileName,
1712 17953 : generation: Generation,
1713 17953 : ) -> RemotePath {
1714 17953 : // Generation-aware key format
1715 17953 : let path = format!(
1716 17953 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
1717 17953 : shard.get_suffix(),
1718 17953 : layer_file_name.file_name(),
1719 17953 : generation.get_suffix()
1720 17953 : );
1721 17953 :
1722 17953 : RemotePath::from_string(&path).expect("Failed to construct path")
1723 17953 : }
1724 :
1725 593 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
1726 593 : RemotePath::from_string(&format!(
1727 593 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
1728 593 : ))
1729 593 : .expect("Failed to construct path")
1730 593 : }
1731 :
1732 7238 : pub fn remote_index_path(
1733 7238 : tenant_shard_id: &TenantShardId,
1734 7238 : timeline_id: &TimelineId,
1735 7238 : generation: Generation,
1736 7238 : ) -> RemotePath {
1737 7238 : RemotePath::from_string(&format!(
1738 7238 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1739 7238 : IndexPart::FILE_NAME,
1740 7238 : generation.get_suffix()
1741 7238 : ))
1742 7238 : .expect("Failed to construct path")
1743 7238 : }
1744 :
1745 13 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1746 13 : RemotePath::from_string(&format!(
1747 13 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
1748 13 : ))
1749 13 : .expect("Failed to construct path")
1750 13 : }
1751 :
1752 : /// Given the key of an index, parse out the generation part of the name
1753 576 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
1754 576 : let file_name = match path.get_path().file_name() {
1755 576 : Some(f) => f,
1756 : None => {
1757 : // Unexpected: we should be seeing index_part.json paths only
1758 UBC 0 : tracing::warn!("Malformed index key {}", path);
1759 0 : return None;
1760 : }
1761 : };
1762 :
1763 CBC 576 : match file_name.split_once('-') {
1764 572 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
1765 4 : None => None,
1766 : }
1767 576 : }
1768 :
1769 : /// Files on the remote storage are stored with paths, relative to the workdir.
1770 : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
1771 : ///
1772 : /// Errors if the path provided does not start from pageserver's workdir.
1773 21493 : pub fn remote_path(
1774 21493 : conf: &PageServerConf,
1775 21493 : local_path: &Utf8Path,
1776 21493 : generation: Generation,
1777 21493 : ) -> anyhow::Result<RemotePath> {
1778 21493 : let stripped = local_path
1779 21493 : .strip_prefix(&conf.workdir)
1780 21493 : .context("Failed to strip workdir prefix")?;
1781 :
1782 21493 : let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
1783 21493 :
1784 21493 : RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
1785 UBC 0 : format!(
1786 0 : "to resolve remote part of path {:?} for base {:?}",
1787 0 : local_path, conf.workdir
1788 0 : )
1789 CBC 21493 : })
1790 21493 : }
1791 :
1792 : #[cfg(test)]
1793 : mod tests {
1794 : use super::*;
1795 : use crate::{
1796 : context::RequestContext,
1797 : tenant::{
1798 : harness::{TenantHarness, TIMELINE_ID},
1799 : storage_layer::Layer,
1800 : Generation, Tenant, Timeline,
1801 : },
1802 : DEFAULT_PG_VERSION,
1803 : };
1804 :
1805 : use std::collections::HashSet;
1806 : use utils::lsn::Lsn;
1807 :
1808 4 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
1809 4 : format!("contents for {name}").into()
1810 4 : }
1811 :
1812 1 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
1813 1 : let metadata = TimelineMetadata::new(
1814 1 : disk_consistent_lsn,
1815 1 : None,
1816 1 : None,
1817 1 : Lsn(0),
1818 1 : Lsn(0),
1819 1 : Lsn(0),
1820 1 : // Any version will do
1821 1 : // but it should be consistent with the one in the tests
1822 1 : crate::DEFAULT_PG_VERSION,
1823 1 : );
1824 1 :
1825 1 : // go through serialize + deserialize to fix the header, including checksum
1826 1 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
1827 1 : }
1828 :
1829 1 : fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
1830 3 : let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
1831 1 : avec.sort();
1832 1 :
1833 1 : let mut bvec = b.to_vec();
1834 1 : bvec.sort_unstable();
1835 1 :
1836 1 : assert_eq!(avec, bvec);
1837 1 : }
1838 :
1839 2 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
1840 2 : let mut expected: Vec<String> = expected
1841 2 : .iter()
1842 8 : .map(|x| format!("{}{}", x, generation.get_suffix()))
1843 2 : .collect();
1844 2 : expected.sort();
1845 2 :
1846 2 : let mut found: Vec<String> = Vec::new();
1847 8 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
1848 8 : let entry_name = entry.file_name();
1849 8 : let fname = entry_name.to_str().unwrap();
1850 8 : found.push(String::from(fname));
1851 8 : }
1852 2 : found.sort();
1853 2 :
1854 2 : assert_eq!(found, expected);
1855 2 : }
1856 :
1857 : struct TestSetup {
1858 : harness: TenantHarness,
1859 : tenant: Arc<Tenant>,
1860 : timeline: Arc<Timeline>,
1861 : tenant_ctx: RequestContext,
1862 : }
1863 :
1864 : impl TestSetup {
1865 4 : async fn new(test_name: &str) -> anyhow::Result<Self> {
1866 4 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
1867 4 : let harness = TenantHarness::create(test_name)?;
1868 4 : let (tenant, ctx) = harness.load().await;
1869 :
1870 4 : let timeline = tenant
1871 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1872 4 : .await?;
1873 :
1874 4 : Ok(Self {
1875 4 : harness,
1876 4 : tenant,
1877 4 : timeline,
1878 4 : tenant_ctx: ctx,
1879 4 : })
1880 4 : }
1881 :
1882 : /// Construct a RemoteTimelineClient in an arbitrary generation
1883 5 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
1884 5 : Arc::new(RemoteTimelineClient {
1885 5 : conf: self.harness.conf,
1886 5 : runtime: tokio::runtime::Handle::current(),
1887 5 : tenant_shard_id: self.harness.tenant_shard_id,
1888 5 : timeline_id: TIMELINE_ID,
1889 5 : generation,
1890 5 : storage_impl: self.harness.remote_storage.clone(),
1891 5 : deletion_queue_client: self.harness.deletion_queue.new_client(),
1892 5 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
1893 5 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
1894 5 : &self.harness.tenant_shard_id,
1895 5 : &TIMELINE_ID,
1896 5 : )),
1897 5 : cancel: CancellationToken::new(),
1898 5 : })
1899 5 : }
1900 :
1901 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
1902 : /// and timeline_id are present.
1903 3 : fn span(&self) -> tracing::Span {
1904 3 : tracing::info_span!(
1905 3 : "test",
1906 3 : tenant_id = %self.harness.tenant_id,
1907 3 : timeline_id = %TIMELINE_ID
1908 3 : )
1909 3 : }
1910 : }
1911 :
1912 : // Test scheduling
1913 1 : #[tokio::test]
1914 1 : async fn upload_scheduling() {
1915 : // Test outline:
1916 : //
1917 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
1918 : // Schedule upload of index. Check that it is queued
1919 : // let the layer file uploads finish. Check that the index-upload is now started
1920 : // let the index-upload finish.
1921 : //
1922 : // Download back the index.json. Check that the list of files is correct
1923 : //
1924 : // Schedule upload. Schedule deletion. Check that the deletion is queued
1925 : // let upload finish. Check that deletion is now started
1926 : // Schedule another deletion. Check that it's launched immediately.
1927 : // Schedule index upload. Check that it's queued
1928 :
1929 2 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
1930 1 : let span = test_setup.span();
1931 1 : let _guard = span.enter();
1932 1 :
1933 1 : let TestSetup {
1934 1 : harness,
1935 1 : tenant: _tenant,
1936 1 : timeline,
1937 1 : tenant_ctx: _tenant_ctx,
1938 1 : } = test_setup;
1939 1 :
1940 1 : let client = timeline.remote_client.as_ref().unwrap();
1941 :
1942 : // Download back the index.json, and check that the list of files is correct
1943 1 : let initial_index_part = match client
1944 1 : .download_index_file(CancellationToken::new())
1945 3 : .await
1946 1 : .unwrap()
1947 : {
1948 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1949 UBC 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1950 : };
1951 CBC 1 : let initial_layers = initial_index_part
1952 1 : .layer_metadata
1953 1 : .keys()
1954 1 : .map(|f| f.to_owned())
1955 1 : .collect::<HashSet<LayerFileName>>();
1956 1 : let initial_layer = {
1957 1 : assert!(initial_layers.len() == 1);
1958 1 : initial_layers.into_iter().next().unwrap()
1959 1 : };
1960 1 :
1961 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
1962 1 :
1963 1 : println!("workdir: {}", harness.conf.workdir);
1964 1 :
1965 1 : let remote_timeline_dir = harness
1966 1 : .remote_fs_dir
1967 1 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
1968 1 : println!("remote_timeline_dir: {remote_timeline_dir}");
1969 1 :
1970 1 : let generation = harness.generation;
1971 1 : let shard = harness.shard;
1972 1 :
1973 1 : // Create a couple of dummy files, schedule upload for them
1974 1 :
1975 1 : let layers = [
1976 1 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
1977 1 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
1978 1 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
1979 1 : ]
1980 1 : .into_iter()
1981 3 : .map(|(name, contents): (LayerFileName, Vec<u8>)| {
1982 3 : std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
1983 3 :
1984 3 : Layer::for_resident(
1985 3 : harness.conf,
1986 3 : &timeline,
1987 3 : name,
1988 3 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
1989 3 : )
1990 3 : }).collect::<Vec<_>>();
1991 1 :
1992 1 : client
1993 1 : .schedule_layer_file_upload(layers[0].clone())
1994 1 : .unwrap();
1995 1 : client
1996 1 : .schedule_layer_file_upload(layers[1].clone())
1997 1 : .unwrap();
1998 1 :
1999 1 : // Check that they are started immediately, not queued
2000 1 : //
2001 1 : // this works because we running within block_on, so any futures are now queued up until
2002 1 : // our next await point.
2003 1 : {
2004 1 : let mut guard = client.upload_queue.lock().unwrap();
2005 1 : let upload_queue = guard.initialized_mut().unwrap();
2006 1 : assert!(upload_queue.queued_operations.is_empty());
2007 1 : assert!(upload_queue.inprogress_tasks.len() == 2);
2008 1 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2009 :
2010 : // also check that `latest_file_changes` was updated
2011 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2012 : }
2013 :
2014 : // Schedule upload of index. Check that it is queued
2015 1 : let metadata = dummy_metadata(Lsn(0x20));
2016 1 : client
2017 1 : .schedule_index_upload_for_metadata_update(&metadata)
2018 1 : .unwrap();
2019 1 : {
2020 1 : let mut guard = client.upload_queue.lock().unwrap();
2021 1 : let upload_queue = guard.initialized_mut().unwrap();
2022 1 : assert!(upload_queue.queued_operations.len() == 1);
2023 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2024 : }
2025 :
2026 : // Wait for the uploads to finish
2027 1 : client.wait_completion().await.unwrap();
2028 1 : {
2029 1 : let mut guard = client.upload_queue.lock().unwrap();
2030 1 : let upload_queue = guard.initialized_mut().unwrap();
2031 :
2032 1 : assert!(upload_queue.queued_operations.is_empty());
2033 1 : assert!(upload_queue.inprogress_tasks.is_empty());
2034 : }
2035 :
2036 : // Download back the index.json, and check that the list of files is correct
2037 1 : let index_part = match client
2038 1 : .download_index_file(CancellationToken::new())
2039 3 : .await
2040 1 : .unwrap()
2041 : {
2042 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2043 UBC 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2044 : };
2045 :
2046 CBC 1 : assert_file_list(
2047 1 : &index_part
2048 1 : .layer_metadata
2049 1 : .keys()
2050 3 : .map(|f| f.to_owned())
2051 1 : .collect(),
2052 1 : &[
2053 1 : &initial_layer.file_name(),
2054 1 : &layers[0].layer_desc().filename().file_name(),
2055 1 : &layers[1].layer_desc().filename().file_name(),
2056 1 : ],
2057 1 : );
2058 1 : assert_eq!(index_part.metadata, metadata);
2059 :
2060 : // Schedule upload and then a deletion. Check that the deletion is queued
2061 1 : client
2062 1 : .schedule_layer_file_upload(layers[2].clone())
2063 1 : .unwrap();
2064 1 :
2065 1 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2066 1 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2067 1 : // spawn_blocking started by the drop.
2068 1 : client
2069 1 : .schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
2070 1 : .unwrap();
2071 1 : {
2072 1 : let mut guard = client.upload_queue.lock().unwrap();
2073 1 : let upload_queue = guard.initialized_mut().unwrap();
2074 1 :
2075 1 : // Deletion schedules upload of the index file, and the file deletion itself
2076 1 : assert_eq!(upload_queue.queued_operations.len(), 2);
2077 1 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2078 1 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2079 1 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2080 1 : assert_eq!(
2081 1 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2082 1 : 0
2083 1 : );
2084 : }
2085 1 : assert_remote_files(
2086 1 : &[
2087 1 : &initial_layer.file_name(),
2088 1 : &layers[0].layer_desc().filename().file_name(),
2089 1 : &layers[1].layer_desc().filename().file_name(),
2090 1 : "index_part.json",
2091 1 : ],
2092 1 : &remote_timeline_dir,
2093 1 : generation,
2094 1 : );
2095 1 :
2096 1 : // Finish them
2097 1 : client.wait_completion().await.unwrap();
2098 1 : harness.deletion_queue.pump().await;
2099 :
2100 1 : assert_remote_files(
2101 1 : &[
2102 1 : &initial_layer.file_name(),
2103 1 : &layers[1].layer_desc().filename().file_name(),
2104 1 : &layers[2].layer_desc().filename().file_name(),
2105 1 : "index_part.json",
2106 1 : ],
2107 1 : &remote_timeline_dir,
2108 1 : generation,
2109 1 : );
2110 : }
2111 :
2112 1 : #[tokio::test]
2113 1 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2114 : // Setup
2115 :
2116 : let TestSetup {
2117 1 : harness,
2118 1 : tenant: _tenant,
2119 1 : timeline,
2120 : ..
2121 2 : } = TestSetup::new("metrics").await.unwrap();
2122 1 : let client = timeline.remote_client.as_ref().unwrap();
2123 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2124 1 :
2125 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2126 1 : let content_1 = dummy_contents("foo");
2127 1 : std::fs::write(
2128 1 : timeline_path.join(layer_file_name_1.file_name()),
2129 1 : &content_1,
2130 1 : )
2131 1 : .unwrap();
2132 1 :
2133 1 : let layer_file_1 = Layer::for_resident(
2134 1 : harness.conf,
2135 1 : &timeline,
2136 1 : layer_file_name_1.clone(),
2137 1 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2138 1 : );
2139 1 :
2140 2 : #[derive(Debug, PartialEq, Clone, Copy)]
2141 1 : struct BytesStartedFinished {
2142 1 : started: Option<usize>,
2143 1 : finished: Option<usize>,
2144 1 : }
2145 1 : impl std::ops::Add for BytesStartedFinished {
2146 1 : type Output = Self;
2147 2 : fn add(self, rhs: Self) -> Self::Output {
2148 2 : Self {
2149 2 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2150 2 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2151 2 : }
2152 2 : }
2153 1 : }
2154 3 : let get_bytes_started_stopped = || {
2155 3 : let started = client
2156 3 : .metrics
2157 3 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2158 3 : .map(|v| v.try_into().unwrap());
2159 3 : let stopped = client
2160 3 : .metrics
2161 3 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2162 3 : .map(|v| v.try_into().unwrap());
2163 3 : BytesStartedFinished {
2164 3 : started,
2165 3 : finished: stopped,
2166 3 : }
2167 3 : };
2168 :
2169 : // Test
2170 1 : tracing::info!("now doing actual test");
2171 :
2172 1 : let actual_a = get_bytes_started_stopped();
2173 1 :
2174 1 : client
2175 1 : .schedule_layer_file_upload(layer_file_1.clone())
2176 1 : .unwrap();
2177 1 :
2178 1 : let actual_b = get_bytes_started_stopped();
2179 1 :
2180 1 : client.wait_completion().await.unwrap();
2181 1 :
2182 1 : let actual_c = get_bytes_started_stopped();
2183 1 :
2184 1 : // Validate
2185 1 :
2186 1 : let expected_b = actual_a
2187 1 : + BytesStartedFinished {
2188 1 : started: Some(content_1.len()),
2189 1 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2190 1 : finished: Some(0),
2191 1 : };
2192 1 : assert_eq!(actual_b, expected_b);
2193 :
2194 1 : let expected_c = actual_a
2195 1 : + BytesStartedFinished {
2196 1 : started: Some(content_1.len()),
2197 1 : finished: Some(content_1.len()),
2198 1 : };
2199 1 : assert_eq!(actual_c, expected_c);
2200 : }
2201 :
2202 6 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
2203 6 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
2204 6 : let example_metadata = TimelineMetadata::example();
2205 6 : let example_index_part = IndexPart::new(
2206 6 : HashMap::new(),
2207 6 : example_metadata.disk_consistent_lsn(),
2208 6 : example_metadata,
2209 6 : );
2210 6 :
2211 6 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
2212 6 :
2213 6 : let index_path = test_state.harness.remote_fs_dir.join(
2214 6 : remote_index_path(
2215 6 : &test_state.harness.tenant_shard_id,
2216 6 : &TIMELINE_ID,
2217 6 : generation,
2218 6 : )
2219 6 : .get_path(),
2220 6 : );
2221 6 :
2222 6 : std::fs::create_dir_all(index_path.parent().unwrap())
2223 6 : .expect("creating test dir should work");
2224 6 :
2225 6 : eprintln!("Writing {index_path}");
2226 6 : std::fs::write(&index_path, index_part_bytes).unwrap();
2227 6 : example_index_part
2228 6 : }
2229 :
2230 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
2231 : /// index, the IndexPart returned is equal to `expected`
2232 5 : async fn assert_got_index_part(
2233 5 : test_state: &TestSetup,
2234 5 : get_generation: Generation,
2235 5 : expected: &IndexPart,
2236 5 : ) {
2237 5 : let client = test_state.build_client(get_generation);
2238 :
2239 5 : let download_r = client
2240 5 : .download_index_file(CancellationToken::new())
2241 20 : .await
2242 5 : .expect("download should always succeed");
2243 5 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
2244 5 : match download_r {
2245 5 : MaybeDeletedIndexPart::IndexPart(index_part) => {
2246 5 : assert_eq!(&index_part, expected);
2247 : }
2248 UBC 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
2249 : }
2250 CBC 5 : }
2251 :
2252 1 : #[tokio::test]
2253 1 : async fn index_part_download_simple() -> anyhow::Result<()> {
2254 2 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
2255 1 : let span = test_state.span();
2256 1 : let _guard = span.enter();
2257 1 :
2258 1 : // Simple case: we are in generation N, load the index from generation N - 1
2259 1 : let generation_n = 5;
2260 1 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2261 :
2262 3 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
2263 :
2264 1 : Ok(())
2265 : }
2266 :
2267 1 : #[tokio::test]
2268 1 : async fn index_part_download_ordering() -> anyhow::Result<()> {
2269 1 : let test_state = TestSetup::new("index_part_download_ordering")
2270 2 : .await
2271 1 : .unwrap();
2272 1 :
2273 1 : let span = test_state.span();
2274 1 : let _guard = span.enter();
2275 1 :
2276 1 : // A generation-less IndexPart exists in the bucket, we should find it
2277 1 : let generation_n = 5;
2278 1 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
2279 5 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
2280 :
2281 : // If a more recent-than-none generation exists, we should prefer to load that
2282 1 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
2283 5 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2284 :
2285 : // If a more-recent-than-me generation exists, we should ignore it.
2286 1 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
2287 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2288 :
2289 : // If a directly previous generation exists, _and_ an index exists in my own
2290 : // generation, I should prefer my own generation.
2291 1 : let _injected_prev =
2292 1 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2293 1 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
2294 1 : assert_got_index_part(
2295 1 : &test_state,
2296 1 : Generation::new(generation_n),
2297 1 : &injected_current,
2298 1 : )
2299 3 : .await;
2300 :
2301 1 : Ok(())
2302 : }
2303 : }
|