Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in
95 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
96 : //! It is initialized based on the [`IndexPart`] that was passed during init
97 : //! and updated with every `schedule_*` function call.
98 : //! All this is necessary necessary to compute the future [`IndexPart`]s
99 : //! when scheduling an operation while other operations that also affect the
100 : //! remote [`IndexPart`] are in flight.
101 : //!
102 : //! # Retries & Error Handling
103 : //!
104 : //! The client retries operations indefinitely, using exponential back-off.
105 : //! There is no way to force a retry, i.e., interrupt the back-off.
106 : //! This could be built easily.
107 : //!
108 : //! # Cancellation
109 : //!
110 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
111 : //! the client's tenant and timeline.
112 : //! Dropping the client will drop queued operations but not executing operations.
113 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
114 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
115 : //!
116 : //! # Completion
117 : //!
118 : //! Once an operation has completed, we update
119 : //! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
120 : //! and submit a request through the DeletionQueue to update
121 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
122 : //! validated that our generation is not stale. It is this visible value
123 : //! that is advertized to safekeepers as a signal that that they can
124 : //! delete the WAL up to that LSN.
125 : //!
126 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
127 : //! for all pending operations to complete. It does not prevent more
128 : //! operations from getting scheduled.
129 : //!
130 : //! # Crash Consistency
131 : //!
132 : //! We do not persist the upload queue state.
133 : //! If we drop the client, or crash, all unfinished operations are lost.
134 : //!
135 : //! To recover, the following steps need to be taken:
136 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
137 : //! consistent remote state, assuming the user scheduled the operations in
138 : //! the correct order.
139 : //! - Initiate upload queue with that [`IndexPart`].
140 : //! - Reschedule all lost operations by comparing the local filesystem state
141 : //! and remote state as per [`IndexPart`]. This is done in
142 : //! [`Tenant::timeline_init_and_sync`].
143 : //!
144 : //! Note that if we crash during file deletion between the index update
145 : //! that removes the file from the list of files, and deleting the remote file,
146 : //! the file is leaked in the remote storage. Similarly, if a new file is created
147 : //! and uploaded, but the pageserver dies permanently before updating the
148 : //! remote index file, the new file is leaked in remote storage. We accept and
149 : //! tolerate that for now.
150 : //! Note further that we cannot easily fix this by scheduling deletes for every
151 : //! file that is present only on the remote, because we cannot distinguish the
152 : //! following two cases:
153 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
154 : //! but crashed before it finished remotely.
155 : //! - (2) We never had the file locally because we haven't on-demand downloaded
156 : //! it yet.
157 : //!
158 : //! # Downloads
159 : //!
160 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
161 : //! downloading files from the remote storage. Downloads are performed immediately
162 : //! against the `RemoteStorage`, independently of the upload queue.
163 : //!
164 : //! When we attach a tenant, we perform the following steps:
165 : //! - create `Tenant` object in `TenantState::Attaching` state
166 : //! - List timelines that are present in remote storage, and for each:
167 : //! - download their remote [`IndexPart`]s
168 : //! - create `Timeline` struct and a `RemoteTimelineClient`
169 : //! - initialize the client's upload queue with its `IndexPart`
170 : //! - schedule uploads for layers that are only present locally.
171 : //! - After the above is done for each timeline, open the tenant for business by
172 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
173 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
174 : //!
175 : //! # Operating Without Remote Storage
176 : //!
177 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
178 : //! not created and the uploads are skipped.
179 : //!
180 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
181 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
182 :
183 : pub(crate) mod download;
184 : pub mod index;
185 : pub(crate) mod upload;
186 :
187 : use anyhow::Context;
188 : use camino::Utf8Path;
189 : use chrono::{NaiveDateTime, Utc};
190 :
191 : pub(crate) use download::download_initdb_tar_zst;
192 : use pageserver_api::models::AuxFilePolicy;
193 : use pageserver_api::shard::{ShardIndex, TenantShardId};
194 : use scopeguard::ScopeGuard;
195 : use tokio_util::sync::CancellationToken;
196 : pub(crate) use upload::upload_initdb_dir;
197 : use utils::backoff::{
198 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
199 : };
200 :
201 : use std::collections::{HashMap, VecDeque};
202 : use std::sync::atomic::{AtomicU32, Ordering};
203 : use std::sync::{Arc, Mutex};
204 : use std::time::Duration;
205 :
206 : use remote_storage::{
207 : DownloadError, GenericRemoteStorage, ListingMode, RemotePath, TimeoutOrCancel,
208 : };
209 : use std::ops::DerefMut;
210 : use tracing::{debug, error, info, instrument, warn};
211 : use tracing::{info_span, Instrument};
212 : use utils::lsn::Lsn;
213 :
214 : use crate::context::RequestContext;
215 : use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
216 : use crate::metrics::{
217 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
218 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
219 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
220 : };
221 : use crate::task_mgr::shutdown_token;
222 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
223 : use crate::tenant::remote_timeline_client::download::download_retry;
224 : use crate::tenant::storage_layer::AsLayerDesc;
225 : use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
226 : use crate::tenant::TIMELINES_SEGMENT_NAME;
227 : use crate::{
228 : config::PageServerConf,
229 : task_mgr,
230 : task_mgr::TaskKind,
231 : task_mgr::BACKGROUND_RUNTIME,
232 : tenant::metadata::TimelineMetadata,
233 : tenant::upload_queue::{
234 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
235 : },
236 : TENANT_HEATMAP_BASENAME,
237 : };
238 :
239 : use utils::id::{TenantId, TimelineId};
240 :
241 : use self::index::IndexPart;
242 :
243 : use super::metadata::MetadataUpdate;
244 : use super::storage_layer::{Layer, LayerName, ResidentLayer};
245 : use super::upload_queue::SetDeletedFlagProgress;
246 : use super::Generation;
247 :
248 : pub(crate) use download::{
249 : download_index_part, is_temp_download_file, list_remote_tenant_shards, list_remote_timelines,
250 : };
251 : pub(crate) use index::LayerFileMetadata;
252 :
253 : // Occasional network issues and such can cause remote operations to fail, and
254 : // that's expected. If a download fails, we log it at info-level, and retry.
255 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
256 : // level instead, as repeated failures can mean a more serious problem. If it
257 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
258 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
259 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
260 :
261 : // Similarly log failed uploads and deletions at WARN level, after this many
262 : // retries. Uploads and deletions are retried forever, though.
263 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
264 :
265 : pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
266 :
267 : pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
268 :
269 : /// Default buffer size when interfacing with [`tokio::fs::File`].
270 : pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
271 :
272 : /// Doing non-essential flushes of deletion queue is subject to this timeout, after
273 : /// which we warn and skip.
274 : const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
275 :
276 : pub enum MaybeDeletedIndexPart {
277 : IndexPart(IndexPart),
278 : Deleted(IndexPart),
279 : }
280 :
281 0 : #[derive(Debug, thiserror::Error)]
282 : pub enum PersistIndexPartWithDeletedFlagError {
283 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
284 : AlreadyInProgress(NaiveDateTime),
285 : #[error("the deleted_flag was already set, value is {0:?}")]
286 : AlreadyDeleted(NaiveDateTime),
287 : #[error(transparent)]
288 : Other(#[from] anyhow::Error),
289 : }
290 :
291 : /// A client for accessing a timeline's data in remote storage.
292 : ///
293 : /// This takes care of managing the number of connections, and balancing them
294 : /// across tenants. This also handles retries of failed uploads.
295 : ///
296 : /// Upload and delete requests are ordered so that before a deletion is
297 : /// performed, we wait for all preceding uploads to finish. This ensures sure
298 : /// that if you perform a compaction operation that reshuffles data in layer
299 : /// files, we don't have a transient state where the old files have already been
300 : /// deleted, but new files have not yet been uploaded.
301 : ///
302 : /// Similarly, this enforces an order between index-file uploads, and layer
303 : /// uploads. Before an index-file upload is performed, all preceding layer
304 : /// uploads must be finished.
305 : ///
306 : /// This also maintains a list of remote files, and automatically includes that
307 : /// in the index part file, whenever timeline metadata is uploaded.
308 : ///
309 : /// Downloads are not queued, they are performed immediately.
310 : pub struct RemoteTimelineClient {
311 : conf: &'static PageServerConf,
312 :
313 : runtime: tokio::runtime::Handle,
314 :
315 : tenant_shard_id: TenantShardId,
316 : timeline_id: TimelineId,
317 : generation: Generation,
318 :
319 : upload_queue: Mutex<UploadQueue>,
320 :
321 : pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
322 :
323 : storage_impl: GenericRemoteStorage,
324 :
325 : deletion_queue_client: DeletionQueueClient,
326 :
327 : cancel: CancellationToken,
328 : }
329 :
330 : impl RemoteTimelineClient {
331 : ///
332 : /// Create a remote storage client for given timeline
333 : ///
334 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
335 : /// by calling init_upload_queue.
336 : ///
337 352 : pub fn new(
338 352 : remote_storage: GenericRemoteStorage,
339 352 : deletion_queue_client: DeletionQueueClient,
340 352 : conf: &'static PageServerConf,
341 352 : tenant_shard_id: TenantShardId,
342 352 : timeline_id: TimelineId,
343 352 : generation: Generation,
344 352 : ) -> RemoteTimelineClient {
345 352 : RemoteTimelineClient {
346 352 : conf,
347 352 : runtime: if cfg!(test) {
348 : // remote_timeline_client.rs tests rely on current-thread runtime
349 352 : tokio::runtime::Handle::current()
350 : } else {
351 0 : BACKGROUND_RUNTIME.handle().clone()
352 : },
353 352 : tenant_shard_id,
354 352 : timeline_id,
355 352 : generation,
356 352 : storage_impl: remote_storage,
357 352 : deletion_queue_client,
358 352 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
359 352 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
360 352 : &tenant_shard_id,
361 352 : &timeline_id,
362 352 : )),
363 352 : cancel: CancellationToken::new(),
364 352 : }
365 352 : }
366 :
367 : /// Initialize the upload queue for a remote storage that already received
368 : /// an index file upload, i.e., it's not empty.
369 : /// The given `index_part` must be the one on the remote.
370 6 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
371 6 : let mut upload_queue = self.upload_queue.lock().unwrap();
372 6 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
373 6 : self.update_remote_physical_size_gauge(Some(index_part));
374 6 : info!(
375 0 : "initialized upload queue from remote index with {} layer files",
376 0 : index_part.layer_metadata.len()
377 : );
378 6 : Ok(())
379 6 : }
380 :
381 : /// Initialize the upload queue for the case where the remote storage is empty,
382 : /// i.e., it doesn't have an `IndexPart`.
383 346 : pub fn init_upload_queue_for_empty_remote(
384 346 : &self,
385 346 : local_metadata: &TimelineMetadata,
386 346 : ) -> anyhow::Result<()> {
387 346 : let mut upload_queue = self.upload_queue.lock().unwrap();
388 346 : upload_queue.initialize_empty_remote(local_metadata)?;
389 346 : self.update_remote_physical_size_gauge(None);
390 346 : info!("initialized upload queue as empty");
391 346 : Ok(())
392 346 : }
393 :
394 : /// Initialize the queue in stopped state. Used in startup path
395 : /// to continue deletion operation interrupted by pageserver crash or restart.
396 0 : pub fn init_upload_queue_stopped_to_continue_deletion(
397 0 : &self,
398 0 : index_part: &IndexPart,
399 0 : ) -> anyhow::Result<()> {
400 : // FIXME: consider newtype for DeletedIndexPart.
401 0 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
402 0 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
403 0 : ))?;
404 :
405 0 : let mut upload_queue = self.upload_queue.lock().unwrap();
406 0 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
407 0 : self.update_remote_physical_size_gauge(Some(index_part));
408 0 : self.stop_impl(&mut upload_queue);
409 0 :
410 0 : upload_queue
411 0 : .stopped_mut()
412 0 : .expect("stopped above")
413 0 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
414 0 :
415 0 : Ok(())
416 0 : }
417 :
418 0 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
419 0 : match &mut *self.upload_queue.lock().unwrap() {
420 0 : UploadQueue::Uninitialized => None,
421 0 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
422 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
423 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
424 0 : .upload_queue_for_deletion
425 0 : .get_last_remote_consistent_lsn_projected(),
426 : }
427 0 : }
428 :
429 0 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
430 0 : match &mut *self.upload_queue.lock().unwrap() {
431 0 : UploadQueue::Uninitialized => None,
432 0 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
433 0 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
434 0 : UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
435 0 : q.upload_queue_for_deletion
436 0 : .get_last_remote_consistent_lsn_visible(),
437 0 : ),
438 : }
439 0 : }
440 :
441 : /// Returns true if this timeline was previously detached at this Lsn and the remote timeline
442 : /// client is currently initialized.
443 0 : pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
444 0 : // technically this is a dirty read, but given how timeline detach ancestor is implemented
445 0 : // via tenant restart, the lineage has always been uploaded.
446 0 : self.upload_queue
447 0 : .lock()
448 0 : .unwrap()
449 0 : .initialized_mut()
450 0 : .map(|uq| uq.latest_lineage.is_previous_ancestor_lsn(lsn))
451 0 : .unwrap_or(false)
452 0 : }
453 :
454 1614 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
455 1614 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
456 1268 : current_remote_index_part
457 1268 : .layer_metadata
458 1268 : .values()
459 1268 : // If we don't have the file size for the layer, don't account for it in the metric.
460 14012 : .map(|ilmd| ilmd.file_size)
461 1268 : .sum()
462 : } else {
463 346 : 0
464 : };
465 1614 : self.metrics.remote_physical_size_gauge.set(size);
466 1614 : }
467 :
468 2 : pub fn get_remote_physical_size(&self) -> u64 {
469 2 : self.metrics.remote_physical_size_gauge.get()
470 2 : }
471 :
472 : //
473 : // Download operations.
474 : //
475 : // These don't use the per-timeline queue. They do use the global semaphore in
476 : // S3Bucket, to limit the total number of concurrent operations, though.
477 : //
478 :
479 : /// Download index file
480 20 : pub async fn download_index_file(
481 20 : &self,
482 20 : cancel: &CancellationToken,
483 20 : ) -> Result<MaybeDeletedIndexPart, DownloadError> {
484 20 : let _unfinished_gauge_guard = self.metrics.call_begin(
485 20 : &RemoteOpFileKind::Index,
486 20 : &RemoteOpKind::Download,
487 20 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
488 20 : reason: "no need for a downloads gauge",
489 20 : },
490 20 : );
491 :
492 20 : let (index_part, _index_generation) = download::download_index_part(
493 20 : &self.storage_impl,
494 20 : &self.tenant_shard_id,
495 20 : &self.timeline_id,
496 20 : self.generation,
497 20 : cancel,
498 20 : )
499 20 : .measure_remote_op(
500 20 : RemoteOpFileKind::Index,
501 20 : RemoteOpKind::Download,
502 20 : Arc::clone(&self.metrics),
503 20 : )
504 100 : .await?;
505 :
506 20 : if index_part.deleted_at.is_some() {
507 0 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
508 : } else {
509 20 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
510 : }
511 20 : }
512 :
513 : /// Download a (layer) file from `path`, into local filesystem.
514 : ///
515 : /// 'layer_metadata' is the metadata from the remote index file.
516 : ///
517 : /// On success, returns the size of the downloaded file.
518 6 : pub async fn download_layer_file(
519 6 : &self,
520 6 : layer_file_name: &LayerName,
521 6 : layer_metadata: &LayerFileMetadata,
522 6 : local_path: &Utf8Path,
523 6 : cancel: &CancellationToken,
524 6 : ctx: &RequestContext,
525 6 : ) -> anyhow::Result<u64> {
526 6 : let downloaded_size = {
527 6 : let _unfinished_gauge_guard = self.metrics.call_begin(
528 6 : &RemoteOpFileKind::Layer,
529 6 : &RemoteOpKind::Download,
530 6 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
531 6 : reason: "no need for a downloads gauge",
532 6 : },
533 6 : );
534 6 : download::download_layer_file(
535 6 : self.conf,
536 6 : &self.storage_impl,
537 6 : self.tenant_shard_id,
538 6 : self.timeline_id,
539 6 : layer_file_name,
540 6 : layer_metadata,
541 6 : local_path,
542 6 : cancel,
543 6 : ctx,
544 6 : )
545 6 : .measure_remote_op(
546 6 : RemoteOpFileKind::Layer,
547 6 : RemoteOpKind::Download,
548 6 : Arc::clone(&self.metrics),
549 6 : )
550 87 : .await?
551 : };
552 :
553 6 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
554 6 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
555 6 :
556 6 : Ok(downloaded_size)
557 6 : }
558 :
559 : //
560 : // Upload operations.
561 : //
562 :
563 : /// Launch an index-file upload operation in the background, with
564 : /// fully updated metadata.
565 : ///
566 : /// This should only be used to upload initial metadata to remote storage.
567 : ///
568 : /// The upload will be added to the queue immediately, but it
569 : /// won't be performed until all previously scheduled layer file
570 : /// upload operations have completed successfully. This is to
571 : /// ensure that when the index file claims that layers X, Y and Z
572 : /// exist in remote storage, they really do. To wait for the upload
573 : /// to complete, use `wait_completion`.
574 : ///
575 : /// If there were any changes to the list of files, i.e. if any
576 : /// layer file uploads were scheduled, since the last index file
577 : /// upload, those will be included too.
578 226 : pub fn schedule_index_upload_for_full_metadata_update(
579 226 : self: &Arc<Self>,
580 226 : metadata: &TimelineMetadata,
581 226 : ) -> anyhow::Result<()> {
582 226 : let mut guard = self.upload_queue.lock().unwrap();
583 226 : let upload_queue = guard.initialized_mut()?;
584 :
585 : // As documented in the struct definition, it's ok for latest_metadata to be
586 : // ahead of what's _actually_ on the remote during index upload.
587 226 : upload_queue.latest_metadata = metadata.clone();
588 226 :
589 226 : self.schedule_index_upload(upload_queue);
590 226 :
591 226 : Ok(())
592 226 : }
593 :
594 : /// Launch an index-file upload operation in the background, with only parts of the metadata
595 : /// updated.
596 : ///
597 : /// This is the regular way of updating metadata on layer flushes or Gc.
598 : ///
599 : /// Using this lighter update mechanism allows for reparenting and detaching without changes to
600 : /// `index_part.json`, while being more clear on what values update regularly.
601 1014 : pub(crate) fn schedule_index_upload_for_metadata_update(
602 1014 : self: &Arc<Self>,
603 1014 : update: &MetadataUpdate,
604 1014 : ) -> anyhow::Result<()> {
605 1014 : let mut guard = self.upload_queue.lock().unwrap();
606 1014 : let upload_queue = guard.initialized_mut()?;
607 :
608 1014 : upload_queue.latest_metadata.apply(update);
609 1014 :
610 1014 : self.schedule_index_upload(upload_queue);
611 1014 :
612 1014 : Ok(())
613 1014 : }
614 :
615 : /// Launch an index-file upload operation in the background, with only aux_file_policy flag updated.
616 10 : pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
617 10 : self: &Arc<Self>,
618 10 : last_aux_file_policy: Option<AuxFilePolicy>,
619 10 : ) -> anyhow::Result<()> {
620 10 : let mut guard = self.upload_queue.lock().unwrap();
621 10 : let upload_queue = guard.initialized_mut()?;
622 10 : upload_queue.last_aux_file_policy = last_aux_file_policy;
623 10 : self.schedule_index_upload(upload_queue);
624 10 : Ok(())
625 10 : }
626 : ///
627 : /// Launch an index-file upload operation in the background, if necessary.
628 : ///
629 : /// Use this function to schedule the update of the index file after
630 : /// scheduling file uploads or deletions. If no file uploads or deletions
631 : /// have been scheduled since the last index file upload, this does
632 : /// nothing.
633 : ///
634 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
635 : /// the upload to the upload queue and returns quickly.
636 372 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
637 372 : let mut guard = self.upload_queue.lock().unwrap();
638 372 : let upload_queue = guard.initialized_mut()?;
639 :
640 372 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
641 16 : self.schedule_index_upload(upload_queue);
642 356 : }
643 :
644 372 : Ok(())
645 372 : }
646 :
647 : /// Launch an index-file upload operation in the background (internal function)
648 1302 : fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
649 1302 : let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
650 1302 :
651 1302 : info!(
652 0 : "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
653 0 : upload_queue.latest_files.len(),
654 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
655 : );
656 :
657 1302 : let index_part = IndexPart::from(&*upload_queue);
658 1302 : let op = UploadOp::UploadMetadata(Box::new(index_part), disk_consistent_lsn);
659 1302 : self.metric_begin(&op);
660 1302 : upload_queue.queued_operations.push_back(op);
661 1302 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
662 1302 :
663 1302 : // Launch the task immediately, if possible
664 1302 : self.launch_queued_tasks(upload_queue);
665 1302 : }
666 :
667 0 : pub(crate) async fn schedule_reparenting_and_wait(
668 0 : self: &Arc<Self>,
669 0 : new_parent: &TimelineId,
670 0 : ) -> anyhow::Result<()> {
671 : // FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
672 : // and reads the in-memory part we cannot do the detaching like this
673 0 : let receiver = {
674 0 : let mut guard = self.upload_queue.lock().unwrap();
675 0 : let upload_queue = guard.initialized_mut()?;
676 :
677 0 : let Some(prev) = upload_queue.latest_metadata.ancestor_timeline() else {
678 0 : return Err(anyhow::anyhow!(
679 0 : "cannot reparent without a current ancestor"
680 0 : ));
681 : };
682 :
683 0 : upload_queue.latest_metadata.reparent(new_parent);
684 0 : upload_queue.latest_lineage.record_previous_ancestor(&prev);
685 0 :
686 0 : self.schedule_index_upload(upload_queue);
687 0 :
688 0 : self.schedule_barrier0(upload_queue)
689 0 : };
690 0 :
691 0 : Self::wait_completion0(receiver).await
692 0 : }
693 :
694 : /// Schedules uploading a new version of `index_part.json` with the given layers added,
695 : /// detaching from ancestor and waits for it to complete.
696 : ///
697 : /// This is used with `Timeline::detach_ancestor` functionality.
698 0 : pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
699 0 : self: &Arc<Self>,
700 0 : layers: &[Layer],
701 0 : adopted: (TimelineId, Lsn),
702 0 : ) -> anyhow::Result<()> {
703 0 : let barrier = {
704 0 : let mut guard = self.upload_queue.lock().unwrap();
705 0 : let upload_queue = guard.initialized_mut()?;
706 :
707 0 : upload_queue.latest_metadata.detach_from_ancestor(&adopted);
708 0 : upload_queue.latest_lineage.record_detaching(&adopted);
709 :
710 0 : for layer in layers {
711 0 : upload_queue
712 0 : .latest_files
713 0 : .insert(layer.layer_desc().layer_name(), layer.metadata());
714 0 : }
715 :
716 0 : self.schedule_index_upload(upload_queue);
717 0 :
718 0 : let barrier = self.schedule_barrier0(upload_queue);
719 0 : self.launch_queued_tasks(upload_queue);
720 0 : barrier
721 0 : };
722 0 :
723 0 : Self::wait_completion0(barrier).await
724 0 : }
725 :
726 : /// Launch an upload operation in the background; the file is added to be included in next
727 : /// `index_part.json` upload.
728 1062 : pub(crate) fn schedule_layer_file_upload(
729 1062 : self: &Arc<Self>,
730 1062 : layer: ResidentLayer,
731 1062 : ) -> anyhow::Result<()> {
732 1062 : let mut guard = self.upload_queue.lock().unwrap();
733 1062 : let upload_queue = guard.initialized_mut()?;
734 :
735 1062 : self.schedule_layer_file_upload0(upload_queue, layer);
736 1062 : self.launch_queued_tasks(upload_queue);
737 1062 : Ok(())
738 1062 : }
739 :
740 1290 : fn schedule_layer_file_upload0(
741 1290 : self: &Arc<Self>,
742 1290 : upload_queue: &mut UploadQueueInitialized,
743 1290 : layer: ResidentLayer,
744 1290 : ) {
745 1290 : let metadata = layer.metadata();
746 1290 :
747 1290 : upload_queue
748 1290 : .latest_files
749 1290 : .insert(layer.layer_desc().layer_name(), metadata.clone());
750 1290 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
751 1290 :
752 1290 : info!(
753 : gen=?metadata.generation,
754 : shard=?metadata.shard,
755 0 : "scheduled layer file upload {layer}",
756 : );
757 :
758 1290 : let op = UploadOp::UploadLayer(layer, metadata);
759 1290 : self.metric_begin(&op);
760 1290 : upload_queue.queued_operations.push_back(op);
761 1290 : }
762 :
763 : /// Launch a delete operation in the background.
764 : ///
765 : /// The operation does not modify local filesystem state.
766 : ///
767 : /// Note: This schedules an index file upload before the deletions. The
768 : /// deletion won't actually be performed, until all previously scheduled
769 : /// upload operations, and the index file upload, have completed
770 : /// successfully.
771 8 : pub fn schedule_layer_file_deletion(
772 8 : self: &Arc<Self>,
773 8 : names: &[LayerName],
774 8 : ) -> anyhow::Result<()> {
775 8 : let mut guard = self.upload_queue.lock().unwrap();
776 8 : let upload_queue = guard.initialized_mut()?;
777 :
778 8 : let with_metadata =
779 8 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
780 8 :
781 8 : self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
782 8 :
783 8 : // Launch the tasks immediately, if possible
784 8 : self.launch_queued_tasks(upload_queue);
785 8 : Ok(())
786 8 : }
787 :
788 : /// Unlinks the layer files from `index_part.json` but does not yet schedule deletion for the
789 : /// layer files, leaving them dangling.
790 : ///
791 : /// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
792 : /// is invoked on them.
793 6 : pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
794 6 : let mut guard = self.upload_queue.lock().unwrap();
795 6 : let upload_queue = guard.initialized_mut()?;
796 :
797 : // just forget the return value; after uploading the next index_part.json, we can consider
798 : // the layer files as "dangling". this is fine, at worst case we create work for the
799 : // scrubber.
800 :
801 8 : let names = gc_layers.iter().map(|x| x.layer_desc().layer_name());
802 6 :
803 6 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
804 6 :
805 6 : self.launch_queued_tasks(upload_queue);
806 6 :
807 6 : Ok(())
808 6 : }
809 :
810 : /// Update the remote index file, removing the to-be-deleted files from the index,
811 : /// allowing scheduling of actual deletions later.
812 42 : fn schedule_unlinking_of_layers_from_index_part0<I>(
813 42 : self: &Arc<Self>,
814 42 : upload_queue: &mut UploadQueueInitialized,
815 42 : names: I,
816 42 : ) -> Vec<(LayerName, LayerFileMetadata)>
817 42 : where
818 42 : I: IntoIterator<Item = LayerName>,
819 42 : {
820 42 : // Decorate our list of names with each name's metadata, dropping
821 42 : // names that are unexpectedly missing from our metadata. This metadata
822 42 : // is later used when physically deleting layers, to construct key paths.
823 42 : let with_metadata: Vec<_> = names
824 42 : .into_iter()
825 332 : .filter_map(|name| {
826 332 : let meta = upload_queue.latest_files.remove(&name);
827 :
828 332 : if let Some(meta) = meta {
829 332 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
830 332 : Some((name, meta))
831 : } else {
832 : // This can only happen if we forgot to to schedule the file upload
833 : // before scheduling the delete. Log it because it is a rare/strange
834 : // situation, and in case something is misbehaving, we'd like to know which
835 : // layers experienced this.
836 0 : info!("Deleting layer {name} not found in latest_files list, never uploaded?");
837 0 : None
838 : }
839 332 : })
840 42 : .collect();
841 :
842 : #[cfg(feature = "testing")]
843 374 : for (name, metadata) in &with_metadata {
844 332 : let gen = metadata.generation;
845 332 : if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
846 0 : if unexpected == gen {
847 0 : tracing::error!("{name} was unlinked twice with same generation");
848 : } else {
849 0 : tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
850 : }
851 332 : }
852 : }
853 :
854 : // after unlinking files from the upload_queue.latest_files we must always schedule an
855 : // index_part update, because that needs to be uploaded before we can actually delete the
856 : // files.
857 42 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
858 36 : self.schedule_index_upload(upload_queue);
859 36 : }
860 :
861 42 : with_metadata
862 42 : }
863 :
864 : /// Schedules deletion for layer files which have previously been unlinked from the
865 : /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
866 335 : pub(crate) fn schedule_deletion_of_unlinked(
867 335 : self: &Arc<Self>,
868 335 : layers: Vec<(LayerName, LayerFileMetadata)>,
869 335 : ) -> anyhow::Result<()> {
870 335 : let mut guard = self.upload_queue.lock().unwrap();
871 335 : let upload_queue = guard.initialized_mut()?;
872 :
873 335 : self.schedule_deletion_of_unlinked0(upload_queue, layers);
874 335 : self.launch_queued_tasks(upload_queue);
875 335 : Ok(())
876 335 : }
877 :
878 343 : fn schedule_deletion_of_unlinked0(
879 343 : self: &Arc<Self>,
880 343 : upload_queue: &mut UploadQueueInitialized,
881 343 : mut with_metadata: Vec<(LayerName, LayerFileMetadata)>,
882 343 : ) {
883 343 : // Filter out any layers which were not created by this tenant shard. These are
884 343 : // layers that originate from some ancestor shard after a split, and may still
885 343 : // be referenced by other shards. We are free to delete them locally and remove
886 343 : // them from our index (and would have already done so when we reach this point
887 343 : // in the code), but we may not delete them remotely.
888 343 : with_metadata.retain(|(name, meta)| {
889 337 : let retain = meta.shard.shard_number == self.tenant_shard_id.shard_number
890 337 : && meta.shard.shard_count == self.tenant_shard_id.shard_count;
891 337 : if !retain {
892 0 : tracing::debug!(
893 0 : "Skipping deletion of ancestor-shard layer {name}, from shard {}",
894 : meta.shard
895 : );
896 337 : }
897 337 : retain
898 343 : });
899 :
900 680 : for (name, meta) in &with_metadata {
901 337 : info!(
902 0 : "scheduling deletion of layer {}{} (shard {})",
903 0 : name,
904 0 : meta.generation.get_suffix(),
905 : meta.shard
906 : );
907 : }
908 :
909 : #[cfg(feature = "testing")]
910 680 : for (name, meta) in &with_metadata {
911 337 : let gen = meta.generation;
912 337 : match upload_queue.dangling_files.remove(name) {
913 331 : Some(same) if same == gen => { /* expected */ }
914 0 : Some(other) => {
915 0 : tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
916 : }
917 : None => {
918 6 : tracing::error!("{name} was unlinked but was not dangling");
919 : }
920 : }
921 : }
922 :
923 : // schedule the actual deletions
924 343 : if with_metadata.is_empty() {
925 : // avoid scheduling the op & bumping the metric
926 6 : return;
927 337 : }
928 337 : let op = UploadOp::Delete(Delete {
929 337 : layers: with_metadata,
930 337 : });
931 337 : self.metric_begin(&op);
932 337 : upload_queue.queued_operations.push_back(op);
933 343 : }
934 :
935 : /// Schedules a compaction update to the remote `index_part.json`.
936 : ///
937 : /// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
938 28 : pub(crate) fn schedule_compaction_update(
939 28 : self: &Arc<Self>,
940 28 : compacted_from: &[Layer],
941 28 : compacted_to: &[ResidentLayer],
942 28 : ) -> anyhow::Result<()> {
943 28 : let mut guard = self.upload_queue.lock().unwrap();
944 28 : let upload_queue = guard.initialized_mut()?;
945 :
946 256 : for layer in compacted_to {
947 228 : self.schedule_layer_file_upload0(upload_queue, layer.clone());
948 228 : }
949 :
950 322 : let names = compacted_from.iter().map(|x| x.layer_desc().layer_name());
951 28 :
952 28 : self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
953 28 : self.launch_queued_tasks(upload_queue);
954 28 :
955 28 : Ok(())
956 28 : }
957 :
958 : /// Wait for all previously scheduled uploads/deletions to complete
959 122 : pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
960 122 : let receiver = {
961 122 : let mut guard = self.upload_queue.lock().unwrap();
962 122 : let upload_queue = guard.initialized_mut()?;
963 122 : self.schedule_barrier0(upload_queue)
964 122 : };
965 122 :
966 122 : Self::wait_completion0(receiver).await
967 122 : }
968 :
969 122 : async fn wait_completion0(
970 122 : mut receiver: tokio::sync::watch::Receiver<()>,
971 122 : ) -> anyhow::Result<()> {
972 122 : if receiver.changed().await.is_err() {
973 0 : anyhow::bail!("wait_completion aborted because upload queue was stopped");
974 122 : }
975 122 :
976 122 : Ok(())
977 122 : }
978 :
979 6 : pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
980 6 : let mut guard = self.upload_queue.lock().unwrap();
981 6 : let upload_queue = guard.initialized_mut()?;
982 6 : self.schedule_barrier0(upload_queue);
983 6 : Ok(())
984 6 : }
985 :
986 128 : fn schedule_barrier0(
987 128 : self: &Arc<Self>,
988 128 : upload_queue: &mut UploadQueueInitialized,
989 128 : ) -> tokio::sync::watch::Receiver<()> {
990 128 : let (sender, receiver) = tokio::sync::watch::channel(());
991 128 : let barrier_op = UploadOp::Barrier(sender);
992 128 :
993 128 : upload_queue.queued_operations.push_back(barrier_op);
994 128 : // Don't count this kind of operation!
995 128 :
996 128 : // Launch the task immediately, if possible
997 128 : self.launch_queued_tasks(upload_queue);
998 128 :
999 128 : receiver
1000 128 : }
1001 :
1002 : /// Wait for all previously scheduled operations to complete, and then stop.
1003 : ///
1004 : /// Not cancellation safe
1005 6 : pub(crate) async fn shutdown(self: &Arc<Self>) {
1006 6 : // On cancellation the queue is left in ackward state of refusing new operations but
1007 6 : // proper stop is yet to be called. On cancel the original or some later task must call
1008 6 : // `stop` or `shutdown`.
1009 6 : let sg = scopeguard::guard((), |_| {
1010 0 : tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
1011 6 : });
1012 :
1013 6 : let fut = {
1014 6 : let mut guard = self.upload_queue.lock().unwrap();
1015 6 : let upload_queue = match &mut *guard {
1016 0 : UploadQueue::Stopped(_) => return,
1017 : UploadQueue::Uninitialized => {
1018 : // transition into Stopped state
1019 0 : self.stop_impl(&mut guard);
1020 0 : return;
1021 : }
1022 6 : UploadQueue::Initialized(ref mut init) => init,
1023 6 : };
1024 6 :
1025 6 : // if the queue is already stuck due to a shutdown operation which was cancelled, then
1026 6 : // just don't add more of these as they would never complete.
1027 6 : //
1028 6 : // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
1029 6 : // in every place we would not have to jump through this hoop, and this method could be
1030 6 : // made cancellable.
1031 6 : if !upload_queue.shutting_down {
1032 6 : upload_queue.shutting_down = true;
1033 6 : upload_queue.queued_operations.push_back(UploadOp::Shutdown);
1034 6 : // this operation is not counted similar to Barrier
1035 6 :
1036 6 : self.launch_queued_tasks(upload_queue);
1037 6 : }
1038 :
1039 6 : upload_queue.shutdown_ready.clone().acquire_owned()
1040 : };
1041 :
1042 6 : let res = fut.await;
1043 :
1044 6 : scopeguard::ScopeGuard::into_inner(sg);
1045 6 :
1046 6 : match res {
1047 0 : Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
1048 6 : Err(_closed) => {
1049 6 : // expected
1050 6 : }
1051 6 : }
1052 6 :
1053 6 : self.stop();
1054 6 : }
1055 :
1056 : /// Set the deleted_at field in the remote index file.
1057 : ///
1058 : /// This fails if the upload queue has not been `stop()`ed.
1059 : ///
1060 : /// The caller is responsible for calling `stop()` AND for waiting
1061 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
1062 : /// Check method [`RemoteTimelineClient::stop`] for details.
1063 0 : #[instrument(skip_all)]
1064 : pub(crate) async fn persist_index_part_with_deleted_flag(
1065 : self: &Arc<Self>,
1066 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
1067 : let index_part_with_deleted_at = {
1068 : let mut locked = self.upload_queue.lock().unwrap();
1069 :
1070 : // We must be in stopped state because otherwise
1071 : // we can have inprogress index part upload that can overwrite the file
1072 : // with missing is_deleted flag that we going to set below
1073 : let stopped = locked.stopped_mut()?;
1074 :
1075 : match stopped.deleted_at {
1076 : SetDeletedFlagProgress::NotRunning => (), // proceed
1077 : SetDeletedFlagProgress::InProgress(at) => {
1078 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
1079 : }
1080 : SetDeletedFlagProgress::Successful(at) => {
1081 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
1082 : }
1083 : };
1084 : let deleted_at = Utc::now().naive_utc();
1085 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
1086 :
1087 : let mut index_part = IndexPart::from(&stopped.upload_queue_for_deletion);
1088 : index_part.deleted_at = Some(deleted_at);
1089 : index_part
1090 : };
1091 :
1092 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
1093 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
1094 0 : let stopped = locked
1095 0 : .stopped_mut()
1096 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1097 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
1098 0 : });
1099 :
1100 : pausable_failpoint!("persist_deleted_index_part");
1101 :
1102 : backoff::retry(
1103 0 : || {
1104 0 : upload::upload_index_part(
1105 0 : &self.storage_impl,
1106 0 : &self.tenant_shard_id,
1107 0 : &self.timeline_id,
1108 0 : self.generation,
1109 0 : &index_part_with_deleted_at,
1110 0 : &self.cancel,
1111 0 : )
1112 0 : },
1113 0 : |_e| false,
1114 : 1,
1115 : // have just a couple of attempts
1116 : // when executed as part of timeline deletion this happens in context of api call
1117 : // when executed as part of tenant deletion this happens in the background
1118 : 2,
1119 : "persist_index_part_with_deleted_flag",
1120 : &self.cancel,
1121 : )
1122 : .await
1123 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1124 0 : .and_then(|x| x)?;
1125 :
1126 : // all good, disarm the guard and mark as success
1127 : ScopeGuard::into_inner(undo_deleted_at);
1128 : {
1129 : let mut locked = self.upload_queue.lock().unwrap();
1130 :
1131 : let stopped = locked
1132 : .stopped_mut()
1133 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
1134 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
1135 : index_part_with_deleted_at
1136 : .deleted_at
1137 : .expect("we set it above"),
1138 : );
1139 : }
1140 :
1141 : Ok(())
1142 : }
1143 :
1144 0 : pub(crate) fn is_deleting(&self) -> bool {
1145 0 : let mut locked = self.upload_queue.lock().unwrap();
1146 0 : locked.stopped_mut().is_ok()
1147 0 : }
1148 :
1149 0 : pub(crate) async fn preserve_initdb_archive(
1150 0 : self: &Arc<Self>,
1151 0 : tenant_id: &TenantId,
1152 0 : timeline_id: &TimelineId,
1153 0 : cancel: &CancellationToken,
1154 0 : ) -> anyhow::Result<()> {
1155 0 : backoff::retry(
1156 0 : || async {
1157 0 : upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
1158 0 : .await
1159 0 : },
1160 0 : TimeoutOrCancel::caused_by_cancel,
1161 0 : FAILED_DOWNLOAD_WARN_THRESHOLD,
1162 0 : FAILED_REMOTE_OP_RETRIES,
1163 0 : "preserve_initdb_tar_zst",
1164 0 : &cancel.clone(),
1165 0 : )
1166 0 : .await
1167 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1168 0 : .and_then(|x| x)
1169 0 : .context("backing up initdb archive")?;
1170 0 : Ok(())
1171 0 : }
1172 :
1173 : /// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
1174 : ///
1175 : /// This is not normally needed.
1176 0 : pub(crate) async fn upload_layer_file(
1177 0 : self: &Arc<Self>,
1178 0 : uploaded: &ResidentLayer,
1179 0 : cancel: &CancellationToken,
1180 0 : ) -> anyhow::Result<()> {
1181 0 : let remote_path = remote_layer_path(
1182 0 : &self.tenant_shard_id.tenant_id,
1183 0 : &self.timeline_id,
1184 0 : self.tenant_shard_id.to_index(),
1185 0 : &uploaded.layer_desc().layer_name(),
1186 0 : uploaded.metadata().generation,
1187 0 : );
1188 0 :
1189 0 : backoff::retry(
1190 0 : || async {
1191 0 : upload::upload_timeline_layer(
1192 0 : &self.storage_impl,
1193 0 : uploaded.local_path(),
1194 0 : &remote_path,
1195 0 : uploaded.metadata().file_size(),
1196 0 : cancel,
1197 0 : )
1198 0 : .await
1199 0 : },
1200 0 : TimeoutOrCancel::caused_by_cancel,
1201 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1202 0 : FAILED_REMOTE_OP_RETRIES,
1203 0 : "upload a layer without adding it to latest files",
1204 0 : cancel,
1205 0 : )
1206 0 : .await
1207 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1208 0 : .and_then(|x| x)
1209 0 : .context("upload a layer without adding it to latest files")
1210 0 : }
1211 :
1212 : /// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
1213 : /// not added to be part of a future `index_part.json` upload.
1214 0 : pub(crate) async fn copy_timeline_layer(
1215 0 : self: &Arc<Self>,
1216 0 : adopted: &Layer,
1217 0 : adopted_as: &Layer,
1218 0 : cancel: &CancellationToken,
1219 0 : ) -> anyhow::Result<()> {
1220 0 : let source_remote_path = remote_layer_path(
1221 0 : &self.tenant_shard_id.tenant_id,
1222 0 : &adopted
1223 0 : .get_timeline_id()
1224 0 : .expect("Source timeline should be alive"),
1225 0 : self.tenant_shard_id.to_index(),
1226 0 : &adopted.layer_desc().layer_name(),
1227 0 : adopted.metadata().generation,
1228 0 : );
1229 0 :
1230 0 : let target_remote_path = remote_layer_path(
1231 0 : &self.tenant_shard_id.tenant_id,
1232 0 : &self.timeline_id,
1233 0 : self.tenant_shard_id.to_index(),
1234 0 : &adopted_as.layer_desc().layer_name(),
1235 0 : adopted_as.metadata().generation,
1236 0 : );
1237 0 :
1238 0 : backoff::retry(
1239 0 : || async {
1240 0 : upload::copy_timeline_layer(
1241 0 : &self.storage_impl,
1242 0 : &source_remote_path,
1243 0 : &target_remote_path,
1244 0 : cancel,
1245 0 : )
1246 0 : .await
1247 0 : },
1248 0 : TimeoutOrCancel::caused_by_cancel,
1249 0 : FAILED_UPLOAD_WARN_THRESHOLD,
1250 0 : FAILED_REMOTE_OP_RETRIES,
1251 0 : "copy timeline layer",
1252 0 : cancel,
1253 0 : )
1254 0 : .await
1255 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
1256 0 : .and_then(|x| x)
1257 0 : .context("remote copy timeline layer")
1258 0 : }
1259 :
1260 0 : async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
1261 0 : match tokio::time::timeout(
1262 0 : DELETION_QUEUE_FLUSH_TIMEOUT,
1263 0 : self.deletion_queue_client.flush_immediate(),
1264 0 : )
1265 0 : .await
1266 : {
1267 0 : Ok(result) => result,
1268 0 : Err(_timeout) => {
1269 0 : // Flushing remote deletions is not mandatory: we flush here to make the system easier to test, and
1270 0 : // to ensure that _usually_ objects are really gone after a DELETE is acked. However, in case of deletion
1271 0 : // queue issues (https://github.com/neondatabase/neon/issues/6440), we don't want to wait indefinitely here.
1272 0 : tracing::warn!(
1273 0 : "Timed out waiting for deletion queue flush, acking deletion anyway"
1274 : );
1275 0 : Ok(())
1276 : }
1277 : }
1278 0 : }
1279 :
1280 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
1281 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
1282 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
1283 0 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
1284 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1285 :
1286 0 : let layers: Vec<RemotePath> = {
1287 0 : let mut locked = self.upload_queue.lock().unwrap();
1288 0 : let stopped = locked.stopped_mut()?;
1289 :
1290 0 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
1291 0 : anyhow::bail!("deleted_at is not set")
1292 0 : }
1293 0 :
1294 0 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
1295 :
1296 0 : stopped
1297 0 : .upload_queue_for_deletion
1298 0 : .latest_files
1299 0 : .drain()
1300 0 : .map(|(file_name, meta)| {
1301 0 : remote_layer_path(
1302 0 : &self.tenant_shard_id.tenant_id,
1303 0 : &self.timeline_id,
1304 0 : meta.shard,
1305 0 : &file_name,
1306 0 : meta.generation,
1307 0 : )
1308 0 : })
1309 0 : .collect()
1310 0 : };
1311 0 :
1312 0 : let layer_deletion_count = layers.len();
1313 0 : self.deletion_queue_client.push_immediate(layers).await?;
1314 :
1315 : // Delete the initdb.tar.zst, which is not always present, but deletion attempts of
1316 : // inexistant objects are not considered errors.
1317 0 : let initdb_path =
1318 0 : remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
1319 0 : self.deletion_queue_client
1320 0 : .push_immediate(vec![initdb_path])
1321 0 : .await?;
1322 :
1323 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
1324 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
1325 0 : let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
1326 0 :
1327 0 : // Execute all pending deletions, so that when we proceed to do a listing below, we aren't
1328 0 : // taking the burden of listing all the layers that we already know we should delete.
1329 0 : self.flush_deletion_queue().await?;
1330 :
1331 0 : let cancel = shutdown_token();
1332 :
1333 0 : let remaining = download_retry(
1334 0 : || async {
1335 0 : self.storage_impl
1336 0 : .list(
1337 0 : Some(&timeline_storage_path),
1338 0 : ListingMode::NoDelimiter,
1339 0 : None,
1340 0 : &cancel,
1341 0 : )
1342 0 : .await
1343 0 : },
1344 0 : "list remaining files",
1345 0 : &cancel,
1346 0 : )
1347 0 : .await
1348 0 : .context("list files remaining files")?
1349 : .keys;
1350 :
1351 : // We will delete the current index_part object last, since it acts as a deletion
1352 : // marker via its deleted_at attribute
1353 0 : let latest_index = remaining
1354 0 : .iter()
1355 0 : .filter(|p| {
1356 0 : p.object_name()
1357 0 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
1358 0 : .unwrap_or(false)
1359 0 : })
1360 0 : .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
1361 0 : .max_by_key(|i| i.1)
1362 0 : .map(|i| i.0.clone())
1363 0 : .unwrap_or(
1364 0 : // No generation-suffixed indices, assume we are dealing with
1365 0 : // a legacy index.
1366 0 : remote_index_path(&self.tenant_shard_id, &self.timeline_id, Generation::none()),
1367 0 : );
1368 0 :
1369 0 : let remaining_layers: Vec<RemotePath> = remaining
1370 0 : .into_iter()
1371 0 : .filter(|p| {
1372 0 : if p == &latest_index {
1373 0 : return false;
1374 0 : }
1375 0 : if p.object_name() == Some(INITDB_PRESERVED_PATH) {
1376 0 : return false;
1377 0 : }
1378 0 : true
1379 0 : })
1380 0 : .inspect(|path| {
1381 0 : if let Some(name) = path.object_name() {
1382 0 : info!(%name, "deleting a file not referenced from index_part.json");
1383 : } else {
1384 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
1385 : }
1386 0 : })
1387 0 : .collect();
1388 0 :
1389 0 : let not_referenced_count = remaining_layers.len();
1390 0 : if !remaining_layers.is_empty() {
1391 0 : self.deletion_queue_client
1392 0 : .push_immediate(remaining_layers)
1393 0 : .await?;
1394 0 : }
1395 :
1396 0 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
1397 0 : Err(anyhow::anyhow!(
1398 0 : "failpoint: timeline-delete-before-index-delete"
1399 0 : ))?
1400 0 : });
1401 :
1402 0 : debug!("enqueuing index part deletion");
1403 0 : self.deletion_queue_client
1404 0 : .push_immediate([latest_index].to_vec())
1405 0 : .await?;
1406 :
1407 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
1408 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
1409 0 : self.flush_deletion_queue().await?;
1410 :
1411 0 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
1412 0 : Err(anyhow::anyhow!(
1413 0 : "failpoint: timeline-delete-after-index-delete"
1414 0 : ))?
1415 0 : });
1416 :
1417 0 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
1418 :
1419 0 : Ok(())
1420 0 : }
1421 :
1422 : ///
1423 : /// Pick next tasks from the queue, and start as many of them as possible without violating
1424 : /// the ordering constraints.
1425 : ///
1426 : /// The caller needs to already hold the `upload_queue` lock.
1427 5528 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
1428 8457 : while let Some(next_op) = upload_queue.queued_operations.front() {
1429 : // Can we run this task now?
1430 4788 : let can_run_now = match next_op {
1431 : UploadOp::UploadLayer(..) => {
1432 : // Can always be scheduled.
1433 1288 : true
1434 : }
1435 : UploadOp::UploadMetadata(_, _) => {
1436 : // These can only be performed after all the preceding operations
1437 : // have finished.
1438 2977 : upload_queue.inprogress_tasks.is_empty()
1439 : }
1440 : UploadOp::Delete(_) => {
1441 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
1442 267 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
1443 : }
1444 :
1445 : UploadOp::Barrier(_) | UploadOp::Shutdown => {
1446 256 : upload_queue.inprogress_tasks.is_empty()
1447 : }
1448 : };
1449 :
1450 : // If we cannot launch this task, don't look any further.
1451 : //
1452 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
1453 : // them now, but we don't try to do that currently. For example, if the frontmost task
1454 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
1455 : // could still start layer uploads that were scheduled later.
1456 4788 : if !can_run_now {
1457 1853 : break;
1458 2935 : }
1459 2935 :
1460 2935 : if let UploadOp::Shutdown = next_op {
1461 : // leave the op in the queue but do not start more tasks; it will be dropped when
1462 : // the stop is called.
1463 6 : upload_queue.shutdown_ready.close();
1464 6 : break;
1465 2929 : }
1466 2929 :
1467 2929 : // We can launch this task. Remove it from the queue first.
1468 2929 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1469 2929 :
1470 2929 : debug!("starting op: {}", next_op);
1471 :
1472 : // Update the counters
1473 2929 : match next_op {
1474 1288 : UploadOp::UploadLayer(_, _) => {
1475 1288 : upload_queue.num_inprogress_layer_uploads += 1;
1476 1288 : }
1477 1277 : UploadOp::UploadMetadata(_, _) => {
1478 1277 : upload_queue.num_inprogress_metadata_uploads += 1;
1479 1277 : }
1480 236 : UploadOp::Delete(_) => {
1481 236 : upload_queue.num_inprogress_deletions += 1;
1482 236 : }
1483 128 : UploadOp::Barrier(sender) => {
1484 128 : sender.send_replace(());
1485 128 : continue;
1486 : }
1487 0 : UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
1488 : };
1489 :
1490 : // Assign unique ID to this task
1491 2801 : upload_queue.task_counter += 1;
1492 2801 : let upload_task_id = upload_queue.task_counter;
1493 2801 :
1494 2801 : // Add it to the in-progress map
1495 2801 : let task = Arc::new(UploadTask {
1496 2801 : task_id: upload_task_id,
1497 2801 : op: next_op,
1498 2801 : retries: AtomicU32::new(0),
1499 2801 : });
1500 2801 : upload_queue
1501 2801 : .inprogress_tasks
1502 2801 : .insert(task.task_id, Arc::clone(&task));
1503 2801 :
1504 2801 : // Spawn task to perform the task
1505 2801 : let self_rc = Arc::clone(self);
1506 2801 : let tenant_shard_id = self.tenant_shard_id;
1507 2801 : let timeline_id = self.timeline_id;
1508 2801 : task_mgr::spawn(
1509 2801 : &self.runtime,
1510 2801 : TaskKind::RemoteUploadTask,
1511 2801 : Some(self.tenant_shard_id),
1512 2801 : Some(self.timeline_id),
1513 2801 : "remote upload",
1514 : false,
1515 2687 : async move {
1516 34932 : self_rc.perform_upload_task(task).await;
1517 2653 : Ok(())
1518 2653 : }
1519 2801 : .instrument(info_span!(parent: None, "remote_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, %upload_task_id)),
1520 : );
1521 :
1522 : // Loop back to process next task
1523 : }
1524 5528 : }
1525 :
1526 : ///
1527 : /// Perform an upload task.
1528 : ///
1529 : /// The task is in the `inprogress_tasks` list. This function will try to
1530 : /// execute it, retrying forever. On successful completion, the task is
1531 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1532 : /// queue that were waiting by the completion are launched.
1533 : ///
1534 : /// The task can be shut down, however. That leads to stopping the whole
1535 : /// queue.
1536 : ///
1537 2687 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1538 2687 : let cancel = shutdown_token();
1539 : // Loop to retry until it completes.
1540 2687 : loop {
1541 2687 : // If we're requested to shut down, close up shop and exit.
1542 2687 : //
1543 2687 : // Note: We only check for the shutdown requests between retries, so
1544 2687 : // if a shutdown request arrives while we're busy uploading, in the
1545 2687 : // upload::upload:*() call below, we will wait not exit until it has
1546 2687 : // finished. We probably could cancel the upload by simply dropping
1547 2687 : // the Future, but we're not 100% sure if the remote storage library
1548 2687 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1549 2687 : // upload finishes or times out soon enough.
1550 2687 : if cancel.is_cancelled() {
1551 0 : info!("upload task cancelled by shutdown request");
1552 0 : self.stop();
1553 0 : return;
1554 2687 : }
1555 :
1556 2687 : let upload_result: anyhow::Result<()> = match &task.op {
1557 1184 : UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
1558 1184 : let local_path = layer.local_path();
1559 1184 :
1560 1184 : // We should only be uploading layers created by this `Tenant`'s lifetime, so
1561 1184 : // the metadata in the upload should always match our current generation.
1562 1184 : assert_eq!(layer_metadata.generation, self.generation);
1563 :
1564 1184 : let remote_path = remote_layer_path(
1565 1184 : &self.tenant_shard_id.tenant_id,
1566 1184 : &self.timeline_id,
1567 1184 : layer_metadata.shard,
1568 1184 : &layer.layer_desc().layer_name(),
1569 1184 : layer_metadata.generation,
1570 1184 : );
1571 1184 :
1572 1184 : upload::upload_timeline_layer(
1573 1184 : &self.storage_impl,
1574 1184 : local_path,
1575 1184 : &remote_path,
1576 1184 : layer_metadata.file_size(),
1577 1184 : &self.cancel,
1578 1184 : )
1579 1184 : .measure_remote_op(
1580 1184 : RemoteOpFileKind::Layer,
1581 1184 : RemoteOpKind::Upload,
1582 1184 : Arc::clone(&self.metrics),
1583 1184 : )
1584 30012 : .await
1585 : }
1586 1267 : UploadOp::UploadMetadata(ref index_part, _lsn) => {
1587 1267 : let mention_having_future_layers = if cfg!(feature = "testing") {
1588 1267 : index_part
1589 1267 : .layer_metadata
1590 1267 : .keys()
1591 13993 : .any(|x| x.is_in_future(*_lsn))
1592 : } else {
1593 0 : false
1594 : };
1595 :
1596 1267 : let res = upload::upload_index_part(
1597 1267 : &self.storage_impl,
1598 1267 : &self.tenant_shard_id,
1599 1267 : &self.timeline_id,
1600 1267 : self.generation,
1601 1267 : index_part,
1602 1267 : &self.cancel,
1603 1267 : )
1604 1267 : .measure_remote_op(
1605 1267 : RemoteOpFileKind::Index,
1606 1267 : RemoteOpKind::Upload,
1607 1267 : Arc::clone(&self.metrics),
1608 1267 : )
1609 4711 : .await;
1610 1262 : if res.is_ok() {
1611 1262 : self.update_remote_physical_size_gauge(Some(index_part));
1612 1262 : if mention_having_future_layers {
1613 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1614 4 : tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
1615 1258 : }
1616 0 : }
1617 1262 : res
1618 : }
1619 236 : UploadOp::Delete(delete) => {
1620 : pausable_failpoint!("before-delete-layer-pausable");
1621 236 : self.deletion_queue_client
1622 236 : .push_layers(
1623 236 : self.tenant_shard_id,
1624 236 : self.timeline_id,
1625 236 : self.generation,
1626 236 : delete.layers.clone(),
1627 236 : )
1628 0 : .await
1629 236 : .map_err(|e| anyhow::anyhow!(e))
1630 : }
1631 0 : unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
1632 : // unreachable. Barrier operations are handled synchronously in
1633 : // launch_queued_tasks
1634 0 : warn!("unexpected {unexpected:?} operation in perform_upload_task");
1635 0 : break;
1636 : }
1637 : };
1638 :
1639 0 : match upload_result {
1640 : Ok(()) => {
1641 2653 : break;
1642 : }
1643 0 : Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
1644 0 : // loop around to do the proper stopping
1645 0 : continue;
1646 : }
1647 0 : Err(e) => {
1648 0 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1649 0 :
1650 0 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1651 0 : // or other external reasons. Such issues are relatively regular, so log them
1652 0 : // at info level at first, and only WARN if the operation fails repeatedly.
1653 0 : //
1654 0 : // (See similar logic for downloads in `download::download_retry`)
1655 0 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1656 0 : info!(
1657 0 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1658 0 : task.op, retries, e
1659 : );
1660 : } else {
1661 0 : warn!(
1662 0 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1663 0 : task.op, retries, e
1664 : );
1665 : }
1666 :
1667 : // sleep until it's time to retry, or we're cancelled
1668 0 : exponential_backoff(
1669 0 : retries,
1670 0 : DEFAULT_BASE_BACKOFF_SECONDS,
1671 0 : DEFAULT_MAX_BACKOFF_SECONDS,
1672 0 : &cancel,
1673 0 : )
1674 0 : .await;
1675 : }
1676 : }
1677 : }
1678 :
1679 2653 : let retries = task.retries.load(Ordering::SeqCst);
1680 2653 : if retries > 0 {
1681 0 : info!(
1682 0 : "remote task {} completed successfully after {} retries",
1683 0 : task.op, retries
1684 : );
1685 : } else {
1686 2653 : debug!("remote task {} completed successfully", task.op);
1687 : }
1688 :
1689 : // The task has completed successfully. Remove it from the in-progress list.
1690 2653 : let lsn_update = {
1691 2653 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1692 2653 : let upload_queue = match upload_queue_guard.deref_mut() {
1693 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1694 0 : UploadQueue::Stopped(_stopped) => {
1695 0 : None
1696 : },
1697 2653 : UploadQueue::Initialized(qi) => { Some(qi) }
1698 : };
1699 :
1700 2653 : let upload_queue = match upload_queue {
1701 2653 : Some(upload_queue) => upload_queue,
1702 : None => {
1703 0 : info!("another concurrent task already stopped the queue");
1704 0 : return;
1705 : }
1706 : };
1707 :
1708 2653 : upload_queue.inprogress_tasks.remove(&task.task_id);
1709 :
1710 2653 : let lsn_update = match task.op {
1711 : UploadOp::UploadLayer(_, _) => {
1712 1155 : upload_queue.num_inprogress_layer_uploads -= 1;
1713 1155 : None
1714 : }
1715 1262 : UploadOp::UploadMetadata(_, lsn) => {
1716 1262 : upload_queue.num_inprogress_metadata_uploads -= 1;
1717 1262 : // XXX monotonicity check?
1718 1262 :
1719 1262 : upload_queue.projected_remote_consistent_lsn = Some(lsn);
1720 1262 : if self.generation.is_none() {
1721 : // Legacy mode: skip validating generation
1722 0 : upload_queue.visible_remote_consistent_lsn.store(lsn);
1723 0 : None
1724 : } else {
1725 1262 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
1726 : }
1727 : }
1728 : UploadOp::Delete(_) => {
1729 236 : upload_queue.num_inprogress_deletions -= 1;
1730 236 : None
1731 : }
1732 0 : UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
1733 : };
1734 :
1735 : // Launch any queued tasks that were unblocked by this one.
1736 2653 : self.launch_queued_tasks(upload_queue);
1737 2653 : lsn_update
1738 : };
1739 :
1740 2653 : if let Some((lsn, slot)) = lsn_update {
1741 : // Updates to the remote_consistent_lsn we advertise to pageservers
1742 : // are all routed through the DeletionQueue, to enforce important
1743 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
1744 1262 : self.deletion_queue_client
1745 1262 : .update_remote_consistent_lsn(
1746 1262 : self.tenant_shard_id,
1747 1262 : self.timeline_id,
1748 1262 : self.generation,
1749 1262 : lsn,
1750 1262 : slot,
1751 1262 : )
1752 0 : .await;
1753 1391 : }
1754 :
1755 2653 : self.metric_end(&task.op);
1756 2653 : }
1757 :
1758 5588 : fn metric_impl(
1759 5588 : &self,
1760 5588 : op: &UploadOp,
1761 5588 : ) -> Option<(
1762 5588 : RemoteOpFileKind,
1763 5588 : RemoteOpKind,
1764 5588 : RemoteTimelineClientMetricsCallTrackSize,
1765 5588 : )> {
1766 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
1767 5588 : let res = match op {
1768 2445 : UploadOp::UploadLayer(_, m) => (
1769 2445 : RemoteOpFileKind::Layer,
1770 2445 : RemoteOpKind::Upload,
1771 2445 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
1772 2445 : ),
1773 2564 : UploadOp::UploadMetadata(_, _) => (
1774 2564 : RemoteOpFileKind::Index,
1775 2564 : RemoteOpKind::Upload,
1776 2564 : DontTrackSize {
1777 2564 : reason: "metadata uploads are tiny",
1778 2564 : },
1779 2564 : ),
1780 573 : UploadOp::Delete(_delete) => (
1781 573 : RemoteOpFileKind::Layer,
1782 573 : RemoteOpKind::Delete,
1783 573 : DontTrackSize {
1784 573 : reason: "should we track deletes? positive or negative sign?",
1785 573 : },
1786 573 : ),
1787 : UploadOp::Barrier(..) | UploadOp::Shutdown => {
1788 : // we do not account these
1789 6 : return None;
1790 : }
1791 : };
1792 5582 : Some(res)
1793 5588 : }
1794 :
1795 2929 : fn metric_begin(&self, op: &UploadOp) {
1796 2929 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
1797 2929 : Some(x) => x,
1798 0 : None => return,
1799 : };
1800 2929 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
1801 2929 : guard.will_decrement_manually(); // in metric_end(), see right below
1802 2929 : }
1803 :
1804 2659 : fn metric_end(&self, op: &UploadOp) {
1805 2659 : let (file_kind, op_kind, track_bytes) = match self.metric_impl(op) {
1806 2653 : Some(x) => x,
1807 6 : None => return,
1808 : };
1809 2653 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
1810 2659 : }
1811 :
1812 : /// Close the upload queue for new operations and cancel queued operations.
1813 : ///
1814 : /// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
1815 : ///
1816 : /// In-progress operations will still be running after this function returns.
1817 : /// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
1818 : /// to wait for them to complete, after calling this function.
1819 14 : pub(crate) fn stop(&self) {
1820 14 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
1821 14 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
1822 14 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
1823 14 : let mut guard = self.upload_queue.lock().unwrap();
1824 14 : self.stop_impl(&mut guard);
1825 14 : }
1826 :
1827 14 : fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
1828 14 : match &mut **guard {
1829 : UploadQueue::Uninitialized => {
1830 0 : info!("UploadQueue is in state Uninitialized, nothing to do");
1831 0 : **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
1832 : }
1833 : UploadQueue::Stopped(_) => {
1834 : // nothing to do
1835 6 : info!("another concurrent task already shut down the queue");
1836 : }
1837 8 : UploadQueue::Initialized(initialized) => {
1838 8 : info!("shutting down upload queue");
1839 :
1840 : // Replace the queue with the Stopped state, taking ownership of the old
1841 : // Initialized queue. We will do some checks on it, and then drop it.
1842 8 : let qi = {
1843 : // Here we preserve working version of the upload queue for possible use during deletions.
1844 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
1845 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
1846 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
1847 8 : let upload_queue_for_deletion = UploadQueueInitialized {
1848 8 : task_counter: 0,
1849 8 : latest_files: initialized.latest_files.clone(),
1850 8 : latest_files_changes_since_metadata_upload_scheduled: 0,
1851 8 : latest_metadata: initialized.latest_metadata.clone(),
1852 8 : latest_lineage: initialized.latest_lineage.clone(),
1853 8 : projected_remote_consistent_lsn: None,
1854 8 : visible_remote_consistent_lsn: initialized
1855 8 : .visible_remote_consistent_lsn
1856 8 : .clone(),
1857 8 : num_inprogress_layer_uploads: 0,
1858 8 : num_inprogress_metadata_uploads: 0,
1859 8 : num_inprogress_deletions: 0,
1860 8 : inprogress_tasks: HashMap::default(),
1861 8 : queued_operations: VecDeque::default(),
1862 8 : #[cfg(feature = "testing")]
1863 8 : dangling_files: HashMap::default(),
1864 8 : shutting_down: false,
1865 8 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
1866 8 : last_aux_file_policy: initialized.last_aux_file_policy,
1867 8 : };
1868 8 :
1869 8 : let upload_queue = std::mem::replace(
1870 8 : &mut **guard,
1871 8 : UploadQueue::Stopped(UploadQueueStopped::Deletable(
1872 8 : UploadQueueStoppedDeletable {
1873 8 : upload_queue_for_deletion,
1874 8 : deleted_at: SetDeletedFlagProgress::NotRunning,
1875 8 : },
1876 8 : )),
1877 8 : );
1878 8 : if let UploadQueue::Initialized(qi) = upload_queue {
1879 8 : qi
1880 : } else {
1881 0 : unreachable!("we checked in the match above that it is Initialized");
1882 : }
1883 : };
1884 :
1885 : // consistency check
1886 8 : assert_eq!(
1887 8 : qi.num_inprogress_layer_uploads
1888 8 : + qi.num_inprogress_metadata_uploads
1889 8 : + qi.num_inprogress_deletions,
1890 8 : qi.inprogress_tasks.len()
1891 8 : );
1892 :
1893 : // We don't need to do anything here for in-progress tasks. They will finish
1894 : // on their own, decrement the unfinished-task counter themselves, and observe
1895 : // that the queue is Stopped.
1896 8 : drop(qi.inprogress_tasks);
1897 :
1898 : // Tear down queued ops
1899 8 : for op in qi.queued_operations.into_iter() {
1900 6 : self.metric_end(&op);
1901 6 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
1902 6 : // which is exactly what we want to happen.
1903 6 : drop(op);
1904 6 : }
1905 : }
1906 : }
1907 14 : }
1908 : }
1909 :
1910 0 : pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1911 0 : let path = format!("tenants/{tenant_shard_id}");
1912 0 : RemotePath::from_string(&path).expect("Failed to construct path")
1913 0 : }
1914 :
1915 160 : pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1916 160 : let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
1917 160 : RemotePath::from_string(&path).expect("Failed to construct path")
1918 160 : }
1919 :
1920 0 : fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
1921 0 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
1922 0 : RemotePath::from_string(&path).expect("Failed to construct path")
1923 0 : }
1924 :
1925 30 : pub fn remote_timeline_path(
1926 30 : tenant_shard_id: &TenantShardId,
1927 30 : timeline_id: &TimelineId,
1928 30 : ) -> RemotePath {
1929 30 : remote_timelines_path(tenant_shard_id).join(Utf8Path::new(&timeline_id.to_string()))
1930 30 : }
1931 :
1932 : /// Note that the shard component of a remote layer path is _not_ always the same
1933 : /// as in the TenantShardId of the caller: tenants may reference layers from a different
1934 : /// ShardIndex. Use the ShardIndex from the layer's metadata.
1935 1202 : pub fn remote_layer_path(
1936 1202 : tenant_id: &TenantId,
1937 1202 : timeline_id: &TimelineId,
1938 1202 : shard: ShardIndex,
1939 1202 : layer_file_name: &LayerName,
1940 1202 : generation: Generation,
1941 1202 : ) -> RemotePath {
1942 1202 : // Generation-aware key format
1943 1202 : let path = format!(
1944 1202 : "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
1945 1202 : shard.get_suffix(),
1946 1202 : layer_file_name,
1947 1202 : generation.get_suffix()
1948 1202 : );
1949 1202 :
1950 1202 : RemotePath::from_string(&path).expect("Failed to construct path")
1951 1202 : }
1952 :
1953 4 : pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
1954 4 : RemotePath::from_string(&format!(
1955 4 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
1956 4 : ))
1957 4 : .expect("Failed to construct path")
1958 4 : }
1959 :
1960 2 : pub fn remote_initdb_preserved_archive_path(
1961 2 : tenant_id: &TenantId,
1962 2 : timeline_id: &TimelineId,
1963 2 : ) -> RemotePath {
1964 2 : RemotePath::from_string(&format!(
1965 2 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
1966 2 : ))
1967 2 : .expect("Failed to construct path")
1968 2 : }
1969 :
1970 1317 : pub fn remote_index_path(
1971 1317 : tenant_shard_id: &TenantShardId,
1972 1317 : timeline_id: &TimelineId,
1973 1317 : generation: Generation,
1974 1317 : ) -> RemotePath {
1975 1317 : RemotePath::from_string(&format!(
1976 1317 : "tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1977 1317 : IndexPart::FILE_NAME,
1978 1317 : generation.get_suffix()
1979 1317 : ))
1980 1317 : .expect("Failed to construct path")
1981 1317 : }
1982 :
1983 0 : pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
1984 0 : RemotePath::from_string(&format!(
1985 0 : "tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
1986 0 : ))
1987 0 : .expect("Failed to construct path")
1988 0 : }
1989 :
1990 : /// Given the key of an index, parse out the generation part of the name
1991 18 : pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
1992 18 : let file_name = match path.get_path().file_name() {
1993 18 : Some(f) => f,
1994 : None => {
1995 : // Unexpected: we should be seeing index_part.json paths only
1996 0 : tracing::warn!("Malformed index key {}", path);
1997 0 : return None;
1998 : }
1999 : };
2000 :
2001 18 : match file_name.split_once('-') {
2002 12 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
2003 6 : None => None,
2004 : }
2005 18 : }
2006 :
2007 : #[cfg(test)]
2008 : mod tests {
2009 : use super::*;
2010 : use crate::{
2011 : context::RequestContext,
2012 : tenant::{
2013 : harness::{TenantHarness, TIMELINE_ID},
2014 : storage_layer::layer::local_layer_path,
2015 : Tenant, Timeline,
2016 : },
2017 : DEFAULT_PG_VERSION,
2018 : };
2019 :
2020 : use std::collections::HashSet;
2021 :
2022 8 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
2023 8 : format!("contents for {name}").into()
2024 8 : }
2025 :
2026 2 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
2027 2 : let metadata = TimelineMetadata::new(
2028 2 : disk_consistent_lsn,
2029 2 : None,
2030 2 : None,
2031 2 : Lsn(0),
2032 2 : Lsn(0),
2033 2 : Lsn(0),
2034 2 : // Any version will do
2035 2 : // but it should be consistent with the one in the tests
2036 2 : crate::DEFAULT_PG_VERSION,
2037 2 : );
2038 2 :
2039 2 : // go through serialize + deserialize to fix the header, including checksum
2040 2 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
2041 2 : }
2042 :
2043 2 : fn assert_file_list(a: &HashSet<LayerName>, b: &[&str]) {
2044 6 : let mut avec: Vec<String> = a.iter().map(|x| x.to_string()).collect();
2045 2 : avec.sort();
2046 2 :
2047 2 : let mut bvec = b.to_vec();
2048 2 : bvec.sort_unstable();
2049 2 :
2050 2 : assert_eq!(avec, bvec);
2051 2 : }
2052 :
2053 4 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
2054 4 : let mut expected: Vec<String> = expected
2055 4 : .iter()
2056 16 : .map(|x| format!("{}{}", x, generation.get_suffix()))
2057 4 : .collect();
2058 4 : expected.sort();
2059 4 :
2060 4 : let mut found: Vec<String> = Vec::new();
2061 16 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
2062 16 : let entry_name = entry.file_name();
2063 16 : let fname = entry_name.to_str().unwrap();
2064 16 : found.push(String::from(fname));
2065 16 : }
2066 4 : found.sort();
2067 4 :
2068 4 : assert_eq!(found, expected);
2069 4 : }
2070 :
2071 : struct TestSetup {
2072 : harness: TenantHarness,
2073 : tenant: Arc<Tenant>,
2074 : timeline: Arc<Timeline>,
2075 : tenant_ctx: RequestContext,
2076 : }
2077 :
2078 : impl TestSetup {
2079 8 : async fn new(test_name: &str) -> anyhow::Result<Self> {
2080 8 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
2081 8 : let harness = TenantHarness::create(test_name)?;
2082 32 : let (tenant, ctx) = harness.load().await;
2083 :
2084 8 : let timeline = tenant
2085 8 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
2086 24 : .await?;
2087 :
2088 8 : Ok(Self {
2089 8 : harness,
2090 8 : tenant,
2091 8 : timeline,
2092 8 : tenant_ctx: ctx,
2093 8 : })
2094 8 : }
2095 :
2096 : /// Construct a RemoteTimelineClient in an arbitrary generation
2097 10 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
2098 10 : Arc::new(RemoteTimelineClient {
2099 10 : conf: self.harness.conf,
2100 10 : runtime: tokio::runtime::Handle::current(),
2101 10 : tenant_shard_id: self.harness.tenant_shard_id,
2102 10 : timeline_id: TIMELINE_ID,
2103 10 : generation,
2104 10 : storage_impl: self.harness.remote_storage.clone(),
2105 10 : deletion_queue_client: self.harness.deletion_queue.new_client(),
2106 10 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
2107 10 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
2108 10 : &self.harness.tenant_shard_id,
2109 10 : &TIMELINE_ID,
2110 10 : )),
2111 10 : cancel: CancellationToken::new(),
2112 10 : })
2113 10 : }
2114 :
2115 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
2116 : /// and timeline_id are present.
2117 6 : fn span(&self) -> tracing::Span {
2118 6 : tracing::info_span!(
2119 : "test",
2120 : tenant_id = %self.harness.tenant_shard_id.tenant_id,
2121 0 : shard_id = %self.harness.tenant_shard_id.shard_slug(),
2122 : timeline_id = %TIMELINE_ID
2123 : )
2124 6 : }
2125 : }
2126 :
2127 : // Test scheduling
2128 : #[tokio::test]
2129 2 : async fn upload_scheduling() {
2130 2 : // Test outline:
2131 2 : //
2132 2 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
2133 2 : // Schedule upload of index. Check that it is queued
2134 2 : // let the layer file uploads finish. Check that the index-upload is now started
2135 2 : // let the index-upload finish.
2136 2 : //
2137 2 : // Download back the index.json. Check that the list of files is correct
2138 2 : //
2139 2 : // Schedule upload. Schedule deletion. Check that the deletion is queued
2140 2 : // let upload finish. Check that deletion is now started
2141 2 : // Schedule another deletion. Check that it's launched immediately.
2142 2 : // Schedule index upload. Check that it's queued
2143 2 :
2144 14 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
2145 2 : let span = test_setup.span();
2146 2 : let _guard = span.enter();
2147 2 :
2148 2 : let TestSetup {
2149 2 : harness,
2150 2 : tenant: _tenant,
2151 2 : timeline,
2152 2 : tenant_ctx: _tenant_ctx,
2153 2 : } = test_setup;
2154 2 :
2155 2 : let client = &timeline.remote_client;
2156 2 :
2157 2 : // Download back the index.json, and check that the list of files is correct
2158 2 : let initial_index_part = match client
2159 2 : .download_index_file(&CancellationToken::new())
2160 8 : .await
2161 2 : .unwrap()
2162 2 : {
2163 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2164 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2165 2 : };
2166 2 : let initial_layers = initial_index_part
2167 2 : .layer_metadata
2168 2 : .keys()
2169 2 : .map(|f| f.to_owned())
2170 2 : .collect::<HashSet<LayerName>>();
2171 2 : let initial_layer = {
2172 2 : assert!(initial_layers.len() == 1);
2173 2 : initial_layers.into_iter().next().unwrap()
2174 2 : };
2175 2 :
2176 2 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
2177 2 :
2178 2 : println!("workdir: {}", harness.conf.workdir);
2179 2 :
2180 2 : let remote_timeline_dir = harness
2181 2 : .remote_fs_dir
2182 2 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
2183 2 : println!("remote_timeline_dir: {remote_timeline_dir}");
2184 2 :
2185 2 : let generation = harness.generation;
2186 2 : let shard = harness.shard;
2187 2 :
2188 2 : // Create a couple of dummy files, schedule upload for them
2189 2 :
2190 2 : let layers = [
2191 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
2192 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
2193 2 : ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
2194 2 : ]
2195 2 : .into_iter()
2196 6 : .map(|(name, contents): (LayerName, Vec<u8>)| {
2197 6 :
2198 6 : let local_path = local_layer_path(
2199 6 : harness.conf,
2200 6 : &timeline.tenant_shard_id,
2201 6 : &timeline.timeline_id,
2202 6 : &name,
2203 6 : &generation,
2204 6 : );
2205 6 : std::fs::write(&local_path, &contents).unwrap();
2206 6 :
2207 6 : Layer::for_resident(
2208 6 : harness.conf,
2209 6 : &timeline,
2210 6 : local_path,
2211 6 : name,
2212 6 : LayerFileMetadata::new(contents.len() as u64, generation, shard),
2213 6 : )
2214 6 : }).collect::<Vec<_>>();
2215 2 :
2216 2 : client
2217 2 : .schedule_layer_file_upload(layers[0].clone())
2218 2 : .unwrap();
2219 2 : client
2220 2 : .schedule_layer_file_upload(layers[1].clone())
2221 2 : .unwrap();
2222 2 :
2223 2 : // Check that they are started immediately, not queued
2224 2 : //
2225 2 : // this works because we running within block_on, so any futures are now queued up until
2226 2 : // our next await point.
2227 2 : {
2228 2 : let mut guard = client.upload_queue.lock().unwrap();
2229 2 : let upload_queue = guard.initialized_mut().unwrap();
2230 2 : assert!(upload_queue.queued_operations.is_empty());
2231 2 : assert!(upload_queue.inprogress_tasks.len() == 2);
2232 2 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
2233 2 :
2234 2 : // also check that `latest_file_changes` was updated
2235 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
2236 2 : }
2237 2 :
2238 2 : // Schedule upload of index. Check that it is queued
2239 2 : let metadata = dummy_metadata(Lsn(0x20));
2240 2 : client
2241 2 : .schedule_index_upload_for_full_metadata_update(&metadata)
2242 2 : .unwrap();
2243 2 : {
2244 2 : let mut guard = client.upload_queue.lock().unwrap();
2245 2 : let upload_queue = guard.initialized_mut().unwrap();
2246 2 : assert!(upload_queue.queued_operations.len() == 1);
2247 2 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
2248 2 : }
2249 2 :
2250 2 : // Wait for the uploads to finish
2251 2 : client.wait_completion().await.unwrap();
2252 2 : {
2253 2 : let mut guard = client.upload_queue.lock().unwrap();
2254 2 : let upload_queue = guard.initialized_mut().unwrap();
2255 2 :
2256 2 : assert!(upload_queue.queued_operations.is_empty());
2257 2 : assert!(upload_queue.inprogress_tasks.is_empty());
2258 2 : }
2259 2 :
2260 2 : // Download back the index.json, and check that the list of files is correct
2261 2 : let index_part = match client
2262 2 : .download_index_file(&CancellationToken::new())
2263 8 : .await
2264 2 : .unwrap()
2265 2 : {
2266 2 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
2267 2 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
2268 2 : };
2269 2 :
2270 2 : assert_file_list(
2271 2 : &index_part
2272 2 : .layer_metadata
2273 2 : .keys()
2274 6 : .map(|f| f.to_owned())
2275 2 : .collect(),
2276 2 : &[
2277 2 : &initial_layer.to_string(),
2278 2 : &layers[0].layer_desc().layer_name().to_string(),
2279 2 : &layers[1].layer_desc().layer_name().to_string(),
2280 2 : ],
2281 2 : );
2282 2 : assert_eq!(index_part.metadata, metadata);
2283 2 :
2284 2 : // Schedule upload and then a deletion. Check that the deletion is queued
2285 2 : client
2286 2 : .schedule_layer_file_upload(layers[2].clone())
2287 2 : .unwrap();
2288 2 :
2289 2 : // this is no longer consistent with how deletion works with Layer::drop, but in this test
2290 2 : // keep using schedule_layer_file_deletion because we don't have a way to wait for the
2291 2 : // spawn_blocking started by the drop.
2292 2 : client
2293 2 : .schedule_layer_file_deletion(&[layers[0].layer_desc().layer_name()])
2294 2 : .unwrap();
2295 2 : {
2296 2 : let mut guard = client.upload_queue.lock().unwrap();
2297 2 : let upload_queue = guard.initialized_mut().unwrap();
2298 2 :
2299 2 : // Deletion schedules upload of the index file, and the file deletion itself
2300 2 : assert_eq!(upload_queue.queued_operations.len(), 2);
2301 2 : assert_eq!(upload_queue.inprogress_tasks.len(), 1);
2302 2 : assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
2303 2 : assert_eq!(upload_queue.num_inprogress_deletions, 0);
2304 2 : assert_eq!(
2305 2 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
2306 2 : 0
2307 2 : );
2308 2 : }
2309 2 : assert_remote_files(
2310 2 : &[
2311 2 : &initial_layer.to_string(),
2312 2 : &layers[0].layer_desc().layer_name().to_string(),
2313 2 : &layers[1].layer_desc().layer_name().to_string(),
2314 2 : "index_part.json",
2315 2 : ],
2316 2 : &remote_timeline_dir,
2317 2 : generation,
2318 2 : );
2319 2 :
2320 2 : // Finish them
2321 2 : client.wait_completion().await.unwrap();
2322 2 : harness.deletion_queue.pump().await;
2323 2 :
2324 2 : assert_remote_files(
2325 2 : &[
2326 2 : &initial_layer.to_string(),
2327 2 : &layers[1].layer_desc().layer_name().to_string(),
2328 2 : &layers[2].layer_desc().layer_name().to_string(),
2329 2 : "index_part.json",
2330 2 : ],
2331 2 : &remote_timeline_dir,
2332 2 : generation,
2333 2 : );
2334 2 : }
2335 :
2336 : #[tokio::test]
2337 2 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
2338 2 : // Setup
2339 2 :
2340 2 : let TestSetup {
2341 2 : harness,
2342 2 : tenant: _tenant,
2343 2 : timeline,
2344 2 : ..
2345 14 : } = TestSetup::new("metrics").await.unwrap();
2346 2 : let client = &timeline.remote_client;
2347 2 :
2348 2 : let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
2349 2 : let local_path = local_layer_path(
2350 2 : harness.conf,
2351 2 : &timeline.tenant_shard_id,
2352 2 : &timeline.timeline_id,
2353 2 : &layer_file_name_1,
2354 2 : &harness.generation,
2355 2 : );
2356 2 : let content_1 = dummy_contents("foo");
2357 2 : std::fs::write(&local_path, &content_1).unwrap();
2358 2 :
2359 2 : let layer_file_1 = Layer::for_resident(
2360 2 : harness.conf,
2361 2 : &timeline,
2362 2 : local_path,
2363 2 : layer_file_name_1.clone(),
2364 2 : LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
2365 2 : );
2366 2 :
2367 2 : #[derive(Debug, PartialEq, Clone, Copy)]
2368 2 : struct BytesStartedFinished {
2369 2 : started: Option<usize>,
2370 2 : finished: Option<usize>,
2371 2 : }
2372 2 : impl std::ops::Add for BytesStartedFinished {
2373 2 : type Output = Self;
2374 4 : fn add(self, rhs: Self) -> Self::Output {
2375 4 : Self {
2376 4 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
2377 4 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
2378 4 : }
2379 4 : }
2380 2 : }
2381 6 : let get_bytes_started_stopped = || {
2382 6 : let started = client
2383 6 : .metrics
2384 6 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2385 6 : .map(|v| v.try_into().unwrap());
2386 6 : let stopped = client
2387 6 : .metrics
2388 6 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
2389 6 : .map(|v| v.try_into().unwrap());
2390 6 : BytesStartedFinished {
2391 6 : started,
2392 6 : finished: stopped,
2393 6 : }
2394 6 : };
2395 2 :
2396 2 : // Test
2397 2 : tracing::info!("now doing actual test");
2398 2 :
2399 2 : let actual_a = get_bytes_started_stopped();
2400 2 :
2401 2 : client
2402 2 : .schedule_layer_file_upload(layer_file_1.clone())
2403 2 : .unwrap();
2404 2 :
2405 2 : let actual_b = get_bytes_started_stopped();
2406 2 :
2407 2 : client.wait_completion().await.unwrap();
2408 2 :
2409 2 : let actual_c = get_bytes_started_stopped();
2410 2 :
2411 2 : // Validate
2412 2 :
2413 2 : let expected_b = actual_a
2414 2 : + BytesStartedFinished {
2415 2 : started: Some(content_1.len()),
2416 2 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
2417 2 : finished: Some(0),
2418 2 : };
2419 2 : assert_eq!(actual_b, expected_b);
2420 2 :
2421 2 : let expected_c = actual_a
2422 2 : + BytesStartedFinished {
2423 2 : started: Some(content_1.len()),
2424 2 : finished: Some(content_1.len()),
2425 2 : };
2426 2 : assert_eq!(actual_c, expected_c);
2427 2 : }
2428 :
2429 12 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
2430 12 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
2431 12 : let example_index_part = IndexPart::example();
2432 12 :
2433 12 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
2434 12 :
2435 12 : let index_path = test_state.harness.remote_fs_dir.join(
2436 12 : remote_index_path(
2437 12 : &test_state.harness.tenant_shard_id,
2438 12 : &TIMELINE_ID,
2439 12 : generation,
2440 12 : )
2441 12 : .get_path(),
2442 12 : );
2443 12 :
2444 12 : std::fs::create_dir_all(index_path.parent().unwrap())
2445 12 : .expect("creating test dir should work");
2446 12 :
2447 12 : eprintln!("Writing {index_path}");
2448 12 : std::fs::write(&index_path, index_part_bytes).unwrap();
2449 12 : example_index_part
2450 12 : }
2451 :
2452 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
2453 : /// index, the IndexPart returned is equal to `expected`
2454 10 : async fn assert_got_index_part(
2455 10 : test_state: &TestSetup,
2456 10 : get_generation: Generation,
2457 10 : expected: &IndexPart,
2458 10 : ) {
2459 10 : let client = test_state.build_client(get_generation);
2460 :
2461 10 : let download_r = client
2462 10 : .download_index_file(&CancellationToken::new())
2463 60 : .await
2464 10 : .expect("download should always succeed");
2465 10 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
2466 10 : match download_r {
2467 10 : MaybeDeletedIndexPart::IndexPart(index_part) => {
2468 10 : assert_eq!(&index_part, expected);
2469 : }
2470 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
2471 : }
2472 10 : }
2473 :
2474 : #[tokio::test]
2475 2 : async fn index_part_download_simple() -> anyhow::Result<()> {
2476 14 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
2477 2 : let span = test_state.span();
2478 2 : let _guard = span.enter();
2479 2 :
2480 2 : // Simple case: we are in generation N, load the index from generation N - 1
2481 2 : let generation_n = 5;
2482 2 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2483 2 :
2484 10 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
2485 2 :
2486 2 : Ok(())
2487 2 : }
2488 :
2489 : #[tokio::test]
2490 2 : async fn index_part_download_ordering() -> anyhow::Result<()> {
2491 2 : let test_state = TestSetup::new("index_part_download_ordering")
2492 14 : .await
2493 2 : .unwrap();
2494 2 :
2495 2 : let span = test_state.span();
2496 2 : let _guard = span.enter();
2497 2 :
2498 2 : // A generation-less IndexPart exists in the bucket, we should find it
2499 2 : let generation_n = 5;
2500 2 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
2501 14 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
2502 2 :
2503 2 : // If a more recent-than-none generation exists, we should prefer to load that
2504 2 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
2505 14 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2506 2 :
2507 2 : // If a more-recent-than-me generation exists, we should ignore it.
2508 2 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
2509 14 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
2510 2 :
2511 2 : // If a directly previous generation exists, _and_ an index exists in my own
2512 2 : // generation, I should prefer my own generation.
2513 2 : let _injected_prev =
2514 2 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
2515 2 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
2516 2 : assert_got_index_part(
2517 2 : &test_state,
2518 2 : Generation::new(generation_n),
2519 2 : &injected_current,
2520 2 : )
2521 8 : .await;
2522 2 :
2523 2 : Ok(())
2524 2 : }
2525 : }
|