TLA Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in
95 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
96 : //! It is initialized based on the [`IndexPart`] that was passed during init
97 : //! and updated with every `schedule_*` function call.
98 : //! All this is necessary necessary to compute the future [`IndexPart`]s
99 : //! when scheduling an operation while other operations that also affect the
100 : //! remote [`IndexPart`] are in flight.
101 : //!
102 : //! # Retries & Error Handling
103 : //!
104 : //! The client retries operations indefinitely, using exponential back-off.
105 : //! There is no way to force a retry, i.e., interrupt the back-off.
106 : //! This could be built easily.
107 : //!
108 : //! # Cancellation
109 : //!
110 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
111 : //! the client's tenant and timeline.
112 : //! Dropping the client will drop queued operations but not executing operations.
113 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
114 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
115 : //!
116 : //! # Completion
117 : //!
118 : //! Once an operation has completed, we update
119 : //! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
120 : //! and submit a request through the DeletionQueue to update
121 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
122 : //! validated that our generation is not stale. It is this visible value
123 : //! that is advertized to safekeepers as a signal that that they can
124 : //! delete the WAL up to that LSN.
125 : //!
126 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
127 : //! for all pending operations to complete. It does not prevent more
128 : //! operations from getting scheduled.
129 : //!
130 : //! # Crash Consistency
131 : //!
132 : //! We do not persist the upload queue state.
133 : //! If we drop the client, or crash, all unfinished operations are lost.
134 : //!
135 : //! To recover, the following steps need to be taken:
136 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
137 : //! consistent remote state, assuming the user scheduled the operations in
138 : //! the correct order.
139 : //! - Initiate upload queue with that [`IndexPart`].
140 : //! - Reschedule all lost operations by comparing the local filesystem state
141 : //! and remote state as per [`IndexPart`]. This is done in
142 : //! [`Tenant::timeline_init_and_sync`].
143 : //!
144 : //! Note that if we crash during file deletion between the index update
145 : //! that removes the file from the list of files, and deleting the remote file,
146 : //! the file is leaked in the remote storage. Similarly, if a new file is created
147 : //! and uploaded, but the pageserver dies permanently before updating the
148 : //! remote index file, the new file is leaked in remote storage. We accept and
149 : //! tolerate that for now.
150 : //! Note further that we cannot easily fix this by scheduling deletes for every
151 : //! file that is present only on the remote, because we cannot distinguish the
152 : //! following two cases:
153 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
154 : //! but crashed before it finished remotely.
155 : //! - (2) We never had the file locally because we haven't on-demand downloaded
156 : //! it yet.
157 : //!
158 : //! # Downloads
159 : //!
160 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
161 : //! downloading files from the remote storage. Downloads are performed immediately
162 : //! against the `RemoteStorage`, independently of the upload queue.
163 : //!
164 : //! When we attach a tenant, we perform the following steps:
165 : //! - create `Tenant` object in `TenantState::Attaching` state
166 : //! - List timelines that are present in remote storage, and for each:
167 : //! - download their remote [`IndexPart`]s
168 : //! - create `Timeline` struct and a `RemoteTimelineClient`
169 : //! - initialize the client's upload queue with its `IndexPart`
170 : //! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
171 : //! for layers that are referenced by `IndexPart` but not present locally
172 : //! - schedule uploads for layers that are only present locally.
173 : //! - if the remote `IndexPart`'s metadata was newer than the metadata in
174 : //! the local filesystem, write the remote metadata to the local filesystem
175 : //! - After the above is done for each timeline, open the tenant for business by
176 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
177 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
178 : //!
179 : //! We keep track of the fact that a client is in `Attaching` state in a marker
180 : //! file on the local disk. This is critical because, when we restart the pageserver,
181 : //! we do not want to do the `List timelines` step for each tenant that has already
182 : //! been successfully attached (for performance & cost reasons).
183 : //! Instead, for a tenant without the attach marker file, we assume that the
184 : //! local state is in sync or ahead of the remote state. This includes the list
185 : //! of all of the tenant's timelines, which is particularly critical to be up-to-date:
186 : //! if there's a timeline on the remote that the pageserver doesn't know about,
187 : //! the GC will not consider its branch point, leading to data loss.
188 : //! So, for a tenant with the attach marker file, we know that we do not yet have
189 : //! persisted all the remote timeline's metadata files locally. To exclude the
190 : //! risk above, we re-run the procedure for such tenants
191 : //!
192 : //! # Operating Without Remote Storage
193 : //!
194 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
195 : //! not created and the uploads are skipped.
196 : //! Theoretically, it should be ok to remove and re-add remote storage configuration to
197 : //! the pageserver config at any time, since it doesn't make a difference to
198 : //! [`Timeline::load_layer_map`].
199 : //! Of course, the remote timeline dir must not change while we have de-configured
200 : //! remote storage, i.e., the pageserver must remain the owner of the given prefix
201 : //! in remote storage.
202 : //! But note that we don't test any of this right now.
203 : //!
204 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
205 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
206 :
207 : mod download;
208 : pub mod index;
209 : mod upload;
210 :
211 : use anyhow::Context;
212 : use camino::Utf8Path;
213 : use chrono::{NaiveDateTime, Utc};
214 : // re-export these
215 : pub use download::{is_temp_download_file, list_remote_timelines};
216 : use scopeguard::ScopeGuard;
217 : use tokio_util::sync::CancellationToken;
218 : use utils::backoff::{
219 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
220 : };
221 :
222 : use std::collections::{HashMap, VecDeque};
223 : use std::sync::atomic::{AtomicU32, Ordering};
224 : use std::sync::{Arc, Mutex};
225 :
226 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
227 : use std::ops::DerefMut;
228 : use tracing::{debug, error, info, instrument, warn};
229 : use tracing::{info_span, Instrument};
230 : use utils::lsn::Lsn;
231 :
232 : use crate::deletion_queue::DeletionQueueClient;
233 : use crate::metrics::{
234 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
235 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
236 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
237 : };
238 : use crate::task_mgr::shutdown_token;
239 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
240 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
241 : use crate::tenant::upload_queue::Delete;
242 : use crate::tenant::TIMELINES_SEGMENT_NAME;
243 : use crate::{
244 : config::PageServerConf,
245 : task_mgr,
246 : task_mgr::TaskKind,
247 : task_mgr::BACKGROUND_RUNTIME,
248 : tenant::metadata::TimelineMetadata,
249 : tenant::upload_queue::{
250 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
251 : },
252 : };
253 :
254 : use utils::id::{TenantId, TimelineId};
255 :
256 : use self::index::IndexPart;
257 :
258 : use super::storage_layer::LayerFileName;
259 : use super::upload_queue::SetDeletedFlagProgress;
260 : use super::Generation;
261 :
262 : // Occasional network issues and such can cause remote operations to fail, and
263 : // that's expected. If a download fails, we log it at info-level, and retry.
264 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
265 : // level instead, as repeated failures can mean a more serious problem. If it
266 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
267 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
268 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
269 :
270 : // Similarly log failed uploads and deletions at WARN level, after this many
271 : // retries. Uploads and deletions are retried forever, though.
272 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
273 :
274 : pub enum MaybeDeletedIndexPart {
275 : IndexPart(IndexPart),
276 : Deleted(IndexPart),
277 : }
278 :
279 : /// Errors that can arise when calling [`RemoteTimelineClient::stop`].
280 UBC 0 : #[derive(Debug, thiserror::Error)]
281 : pub enum StopError {
282 : /// Returned if the upload queue was never initialized.
283 : /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
284 : #[error("queue is not initialized")]
285 : QueueUninitialized,
286 : }
287 :
288 0 : #[derive(Debug, thiserror::Error)]
289 : pub enum PersistIndexPartWithDeletedFlagError {
290 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
291 : AlreadyInProgress(NaiveDateTime),
292 : #[error("the deleted_flag was already set, value is {0:?}")]
293 : AlreadyDeleted(NaiveDateTime),
294 : #[error(transparent)]
295 : Other(#[from] anyhow::Error),
296 : }
297 :
298 : /// A client for accessing a timeline's data in remote storage.
299 : ///
300 : /// This takes care of managing the number of connections, and balancing them
301 : /// across tenants. This also handles retries of failed uploads.
302 : ///
303 : /// Upload and delete requests are ordered so that before a deletion is
304 : /// performed, we wait for all preceding uploads to finish. This ensures sure
305 : /// that if you perform a compaction operation that reshuffles data in layer
306 : /// files, we don't have a transient state where the old files have already been
307 : /// deleted, but new files have not yet been uploaded.
308 : ///
309 : /// Similarly, this enforces an order between index-file uploads, and layer
310 : /// uploads. Before an index-file upload is performed, all preceding layer
311 : /// uploads must be finished.
312 : ///
313 : /// This also maintains a list of remote files, and automatically includes that
314 : /// in the index part file, whenever timeline metadata is uploaded.
315 : ///
316 : /// Downloads are not queued, they are performed immediately.
317 : pub struct RemoteTimelineClient {
318 : conf: &'static PageServerConf,
319 :
320 : runtime: tokio::runtime::Handle,
321 :
322 : tenant_id: TenantId,
323 : timeline_id: TimelineId,
324 : generation: Generation,
325 :
326 : upload_queue: Mutex<UploadQueue>,
327 :
328 : metrics: Arc<RemoteTimelineClientMetrics>,
329 :
330 : storage_impl: GenericRemoteStorage,
331 :
332 : deletion_queue_client: DeletionQueueClient,
333 : }
334 :
335 : impl RemoteTimelineClient {
336 : ///
337 : /// Create a remote storage client for given timeline
338 : ///
339 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
340 : /// by calling init_upload_queue.
341 : ///
342 CBC 1595 : pub fn new(
343 1595 : remote_storage: GenericRemoteStorage,
344 1595 : deletion_queue_client: DeletionQueueClient,
345 1595 : conf: &'static PageServerConf,
346 1595 : tenant_id: TenantId,
347 1595 : timeline_id: TimelineId,
348 1595 : generation: Generation,
349 1595 : ) -> RemoteTimelineClient {
350 1595 : RemoteTimelineClient {
351 1595 : conf,
352 1595 : runtime: if cfg!(test) {
353 : // remote_timeline_client.rs tests rely on current-thread runtime
354 148 : tokio::runtime::Handle::current()
355 : } else {
356 1447 : BACKGROUND_RUNTIME.handle().clone()
357 : },
358 1595 : tenant_id,
359 1595 : timeline_id,
360 1595 : generation,
361 1595 : storage_impl: remote_storage,
362 1595 : deletion_queue_client,
363 1595 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
364 1595 : metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
365 1595 : }
366 1595 : }
367 :
368 : /// Initialize the upload queue for a remote storage that already received
369 : /// an index file upload, i.e., it's not empty.
370 : /// The given `index_part` must be the one on the remote.
371 314 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
372 314 : let mut upload_queue = self.upload_queue.lock().unwrap();
373 314 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
374 314 : self.update_remote_physical_size_gauge(Some(index_part));
375 314 : info!(
376 314 : "initialized upload queue from remote index with {} layer files",
377 314 : index_part.layer_metadata.len()
378 314 : );
379 314 : Ok(())
380 314 : }
381 :
382 : /// Initialize the upload queue for the case where the remote storage is empty,
383 : /// i.e., it doesn't have an `IndexPart`.
384 967 : pub fn init_upload_queue_for_empty_remote(
385 967 : &self,
386 967 : local_metadata: &TimelineMetadata,
387 967 : ) -> anyhow::Result<()> {
388 967 : let mut upload_queue = self.upload_queue.lock().unwrap();
389 967 : upload_queue.initialize_empty_remote(local_metadata)?;
390 967 : self.update_remote_physical_size_gauge(None);
391 967 : info!("initialized upload queue as empty");
392 967 : Ok(())
393 967 : }
394 :
395 : /// Initialize the queue in stopped state. Used in startup path
396 : /// to continue deletion operation interrupted by pageserver crash or restart.
397 21 : pub fn init_upload_queue_stopped_to_continue_deletion(
398 21 : &self,
399 21 : index_part: &IndexPart,
400 21 : ) -> anyhow::Result<()> {
401 : // FIXME: consider newtype for DeletedIndexPart.
402 21 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
403 21 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
404 21 : ))?;
405 :
406 : {
407 21 : let mut upload_queue = self.upload_queue.lock().unwrap();
408 21 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
409 21 : self.update_remote_physical_size_gauge(Some(index_part));
410 21 : }
411 21 : // also locks upload queue, without dropping the guard above it will be a deadlock
412 21 : self.stop().expect("initialized line above");
413 21 :
414 21 : let mut upload_queue = self.upload_queue.lock().unwrap();
415 21 :
416 21 : upload_queue
417 21 : .stopped_mut()
418 21 : .expect("stopped above")
419 21 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
420 21 :
421 21 : Ok(())
422 21 : }
423 :
424 2671 : pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
425 2671 : match &mut *self.upload_queue.lock().unwrap() {
426 UBC 0 : UploadQueue::Uninitialized => None,
427 CBC 2509 : UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
428 162 : UploadQueue::Stopped(q) => q
429 162 : .upload_queue_for_deletion
430 162 : .get_last_remote_consistent_lsn_projected(),
431 : }
432 2671 : }
433 :
434 779573 : pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
435 779573 : match &mut *self.upload_queue.lock().unwrap() {
436 UBC 0 : UploadQueue::Uninitialized => None,
437 CBC 779411 : UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
438 162 : UploadQueue::Stopped(q) => Some(
439 162 : q.upload_queue_for_deletion
440 162 : .get_last_remote_consistent_lsn_visible(),
441 162 : ),
442 : }
443 779573 : }
444 :
445 7290 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
446 7290 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
447 6323 : current_remote_index_part
448 6323 : .layer_metadata
449 6323 : .values()
450 6323 : // If we don't have the file size for the layer, don't account for it in the metric.
451 417447 : .map(|ilmd| ilmd.file_size)
452 6323 : .sum()
453 : } else {
454 967 : 0
455 : };
456 7290 : self.metrics.remote_physical_size_set(size);
457 7290 : }
458 :
459 12 : pub fn get_remote_physical_size(&self) -> u64 {
460 12 : self.metrics.remote_physical_size_get()
461 12 : }
462 :
463 : //
464 : // Download operations.
465 : //
466 : // These don't use the per-timeline queue. They do use the global semaphore in
467 : // S3Bucket, to limit the total number of concurrent operations, though.
468 : //
469 :
470 : /// Download index file
471 353 : pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
472 353 : let _unfinished_gauge_guard = self.metrics.call_begin(
473 353 : &RemoteOpFileKind::Index,
474 353 : &RemoteOpKind::Download,
475 353 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
476 353 : reason: "no need for a downloads gauge",
477 353 : },
478 353 : );
479 :
480 353 : let index_part = download::download_index_part(
481 353 : &self.storage_impl,
482 353 : &self.tenant_id,
483 353 : &self.timeline_id,
484 353 : self.generation,
485 353 : )
486 353 : .measure_remote_op(
487 353 : self.tenant_id,
488 353 : self.timeline_id,
489 353 : RemoteOpFileKind::Index,
490 353 : RemoteOpKind::Download,
491 353 : Arc::clone(&self.metrics),
492 353 : )
493 1372 : .await?;
494 :
495 342 : if index_part.deleted_at.is_some() {
496 21 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
497 : } else {
498 321 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
499 : }
500 353 : }
501 :
502 : /// Download a (layer) file from `path`, into local filesystem.
503 : ///
504 : /// 'layer_metadata' is the metadata from the remote index file.
505 : ///
506 : /// On success, returns the size of the downloaded file.
507 1379 : pub async fn download_layer_file(
508 1379 : &self,
509 1379 : layer_file_name: &LayerFileName,
510 1379 : layer_metadata: &LayerFileMetadata,
511 1379 : ) -> anyhow::Result<u64> {
512 1370 : let downloaded_size = {
513 1379 : let _unfinished_gauge_guard = self.metrics.call_begin(
514 1379 : &RemoteOpFileKind::Layer,
515 1379 : &RemoteOpKind::Download,
516 1379 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
517 1379 : reason: "no need for a downloads gauge",
518 1379 : },
519 1379 : );
520 1379 : download::download_layer_file(
521 1379 : self.conf,
522 1379 : &self.storage_impl,
523 1379 : self.tenant_id,
524 1379 : self.timeline_id,
525 1379 : layer_file_name,
526 1379 : layer_metadata,
527 1379 : )
528 1379 : .measure_remote_op(
529 1379 : self.tenant_id,
530 1379 : self.timeline_id,
531 1379 : RemoteOpFileKind::Layer,
532 1379 : RemoteOpKind::Download,
533 1379 : Arc::clone(&self.metrics),
534 1379 : )
535 459967 : .await?
536 : };
537 :
538 1370 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
539 1370 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
540 1370 :
541 1370 : Ok(downloaded_size)
542 1376 : }
543 :
544 : //
545 : // Upload operations.
546 : //
547 :
548 : ///
549 : /// Launch an index-file upload operation in the background, with
550 : /// updated metadata.
551 : ///
552 : /// The upload will be added to the queue immediately, but it
553 : /// won't be performed until all previously scheduled layer file
554 : /// upload operations have completed successfully. This is to
555 : /// ensure that when the index file claims that layers X, Y and Z
556 : /// exist in remote storage, they really do. To wait for the upload
557 : /// to complete, use `wait_completion`.
558 : ///
559 : /// If there were any changes to the list of files, i.e. if any
560 : /// layer file uploads were scheduled, since the last index file
561 : /// upload, those will be included too.
562 5719 : pub fn schedule_index_upload_for_metadata_update(
563 5719 : self: &Arc<Self>,
564 5719 : metadata: &TimelineMetadata,
565 5719 : ) -> anyhow::Result<()> {
566 5719 : let mut guard = self.upload_queue.lock().unwrap();
567 5719 : let upload_queue = guard.initialized_mut()?;
568 :
569 : // As documented in the struct definition, it's ok for latest_metadata to be
570 : // ahead of what's _actually_ on the remote during index upload.
571 5719 : upload_queue.latest_metadata = metadata.clone();
572 5719 :
573 5719 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
574 5719 :
575 5719 : Ok(())
576 5719 : }
577 :
578 : ///
579 : /// Launch an index-file upload operation in the background, if necessary.
580 : ///
581 : /// Use this function to schedule the update of the index file after
582 : /// scheduling file uploads or deletions. If no file uploads or deletions
583 : /// have been scheduled since the last index file upload, this does
584 : /// nothing.
585 : ///
586 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
587 : /// the upload to the upload queue and returns quickly.
588 1566 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
589 1566 : let mut guard = self.upload_queue.lock().unwrap();
590 1566 : let upload_queue = guard.initialized_mut()?;
591 :
592 1566 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
593 15 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
594 1551 : }
595 :
596 1566 : Ok(())
597 1566 : }
598 :
599 : /// Launch an index-file upload operation in the background (internal function)
600 6059 : fn schedule_index_upload(
601 6059 : self: &Arc<Self>,
602 6059 : upload_queue: &mut UploadQueueInitialized,
603 6059 : metadata: TimelineMetadata,
604 6059 : ) {
605 6059 : info!(
606 6059 : "scheduling metadata upload with {} files ({} changed)",
607 6059 : upload_queue.latest_files.len(),
608 6059 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
609 6059 : );
610 :
611 6059 : let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
612 6059 :
613 6059 : let index_part = IndexPart::new(
614 6059 : upload_queue.latest_files.clone(),
615 6059 : disk_consistent_lsn,
616 6059 : metadata,
617 6059 : );
618 6059 : let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
619 6059 : self.calls_unfinished_metric_begin(&op);
620 6059 : upload_queue.queued_operations.push_back(op);
621 6059 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
622 6059 :
623 6059 : // Launch the task immediately, if possible
624 6059 : self.launch_queued_tasks(upload_queue);
625 6059 : }
626 :
627 : ///
628 : /// Launch an upload operation in the background.
629 : ///
630 18600 : pub fn schedule_layer_file_upload(
631 18600 : self: &Arc<Self>,
632 18600 : layer_file_name: &LayerFileName,
633 18600 : layer_metadata: &LayerFileMetadata,
634 18600 : ) -> anyhow::Result<()> {
635 18600 : let mut guard = self.upload_queue.lock().unwrap();
636 18600 : let upload_queue = guard.initialized_mut()?;
637 :
638 18600 : upload_queue
639 18600 : .latest_files
640 18600 : .insert(layer_file_name.clone(), layer_metadata.clone());
641 18600 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
642 18600 :
643 18600 : let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
644 18600 : self.calls_unfinished_metric_begin(&op);
645 18600 : upload_queue.queued_operations.push_back(op);
646 18600 :
647 18600 : info!("scheduled layer file upload {layer_file_name}");
648 :
649 : // Launch the task immediately, if possible
650 18600 : self.launch_queued_tasks(upload_queue);
651 18600 : Ok(())
652 18600 : }
653 :
654 : /// Launch a delete operation in the background.
655 : ///
656 : /// The operation does not modify local state but assumes the local files have already been
657 : /// deleted, and is used to mirror those changes to remote.
658 : ///
659 : /// Note: This schedules an index file upload before the deletions. The
660 : /// deletion won't actually be performed, until any previously scheduled
661 : /// upload operations, and the index file upload, have completed
662 : /// successfully.
663 639 : pub fn schedule_layer_file_deletion(
664 639 : self: &Arc<Self>,
665 639 : names: Vec<LayerFileName>,
666 639 : ) -> anyhow::Result<()> {
667 639 : let mut guard = self.upload_queue.lock().unwrap();
668 639 : let upload_queue = guard.initialized_mut()?;
669 :
670 : // Deleting layers doesn't affect the values stored in TimelineMetadata,
671 : // so we don't need update it. Just serialize it.
672 639 : let metadata = upload_queue.latest_metadata.clone();
673 639 :
674 639 : // Update the remote index file, removing the to-be-deleted files from the index,
675 639 : // before deleting the actual files.
676 639 : //
677 639 : // Once we start removing files from upload_queue.latest_files, there's
678 639 : // no going back! Otherwise, some of the files would already be removed
679 639 : // from latest_files, but not yet scheduled for deletion. Use a closure
680 639 : // to syntactically forbid ? or bail! calls here.
681 639 : let no_bail_here = || {
682 639 : // Decorate our list of names with each name's generation, dropping
683 639 : // makes that are unexpectedly missing from our metadata.
684 639 : let with_generations: Vec<_> = names
685 639 : .into_iter()
686 6649 : .filter_map(|name| {
687 6649 : // Remove from latest_files, learning the file's remote generation in the process
688 6649 : let meta = upload_queue.latest_files.remove(&name);
689 :
690 6649 : if let Some(meta) = meta {
691 6641 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
692 6641 : Some((name, meta.generation))
693 : } else {
694 : // This can only happen if we forgot to to schedule the file upload
695 : // before scheduling the delete. Log it because it is a rare/strange
696 : // situation, and in case something is misbehaving, we'd like to know which
697 : // layers experienced this.
698 8 : info!(
699 8 : "Deleting layer {name} not found in latest_files list, never uploaded?"
700 8 : );
701 8 : None
702 : }
703 6649 : })
704 639 : .collect();
705 639 :
706 639 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
707 325 : self.schedule_index_upload(upload_queue, metadata);
708 325 : }
709 :
710 7280 : for (name, gen) in &with_generations {
711 6641 : info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
712 : }
713 :
714 : // schedule the actual deletions
715 639 : let op = UploadOp::Delete(Delete {
716 639 : layers: with_generations,
717 639 : });
718 639 : self.calls_unfinished_metric_begin(&op);
719 639 : upload_queue.queued_operations.push_back(op);
720 639 :
721 639 : // Launch the tasks immediately, if possible
722 639 : self.launch_queued_tasks(upload_queue);
723 639 : };
724 639 : no_bail_here();
725 639 : Ok(())
726 639 : }
727 :
728 : ///
729 : /// Wait for all previously scheduled uploads/deletions to complete
730 : ///
731 2365 : pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
732 2365 : let mut receiver = {
733 2365 : let mut guard = self.upload_queue.lock().unwrap();
734 2365 : let upload_queue = guard.initialized_mut()?;
735 2365 : self.schedule_barrier(upload_queue)
736 2365 : };
737 2365 :
738 2365 : if receiver.changed().await.is_err() {
739 1 : anyhow::bail!("wait_completion aborted because upload queue was stopped");
740 2359 : }
741 2359 : Ok(())
742 2360 : }
743 :
744 2365 : fn schedule_barrier(
745 2365 : self: &Arc<Self>,
746 2365 : upload_queue: &mut UploadQueueInitialized,
747 2365 : ) -> tokio::sync::watch::Receiver<()> {
748 2365 : let (sender, receiver) = tokio::sync::watch::channel(());
749 2365 : let barrier_op = UploadOp::Barrier(sender);
750 2365 :
751 2365 : upload_queue.queued_operations.push_back(barrier_op);
752 2365 : // Don't count this kind of operation!
753 2365 :
754 2365 : // Launch the task immediately, if possible
755 2365 : self.launch_queued_tasks(upload_queue);
756 2365 :
757 2365 : receiver
758 2365 : }
759 :
760 : /// Set the deleted_at field in the remote index file.
761 : ///
762 : /// This fails if the upload queue has not been `stop()`ed.
763 : ///
764 : /// The caller is responsible for calling `stop()` AND for waiting
765 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
766 : /// Check method [`RemoteTimelineClient::stop`] for details.
767 846 : #[instrument(skip_all)]
768 : pub(crate) async fn persist_index_part_with_deleted_flag(
769 : self: &Arc<Self>,
770 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
771 : let index_part_with_deleted_at = {
772 : let mut locked = self.upload_queue.lock().unwrap();
773 :
774 : // We must be in stopped state because otherwise
775 : // we can have inprogress index part upload that can overwrite the file
776 : // with missing is_deleted flag that we going to set below
777 : let stopped = locked.stopped_mut()?;
778 :
779 : match stopped.deleted_at {
780 : SetDeletedFlagProgress::NotRunning => (), // proceed
781 : SetDeletedFlagProgress::InProgress(at) => {
782 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
783 : }
784 : SetDeletedFlagProgress::Successful(at) => {
785 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
786 : }
787 : };
788 : let deleted_at = Utc::now().naive_utc();
789 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
790 :
791 : let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
792 : .context("IndexPart serialize")?;
793 : index_part.deleted_at = Some(deleted_at);
794 : index_part
795 : };
796 :
797 UBC 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
798 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
799 0 : let stopped = locked
800 0 : .stopped_mut()
801 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
802 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
803 0 : });
804 :
805 CBC 195 : pausable_failpoint!("persist_deleted_index_part");
806 :
807 : backoff::retry(
808 256 : || {
809 256 : upload::upload_index_part(
810 256 : &self.storage_impl,
811 256 : &self.tenant_id,
812 256 : &self.timeline_id,
813 256 : self.generation,
814 256 : &index_part_with_deleted_at,
815 256 : )
816 256 : },
817 61 : |_e| false,
818 : 1,
819 : // have just a couple of attempts
820 : // when executed as part of timeline deletion this happens in context of api call
821 : // when executed as part of tenant deletion this happens in the background
822 : 2,
823 : "persist_index_part_with_deleted_flag",
824 : // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
825 UBC 0 : backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
826 : )
827 : .await?;
828 :
829 : // all good, disarm the guard and mark as success
830 : ScopeGuard::into_inner(undo_deleted_at);
831 : {
832 : let mut locked = self.upload_queue.lock().unwrap();
833 :
834 : let stopped = locked
835 : .stopped_mut()
836 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
837 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
838 : index_part_with_deleted_at
839 : .deleted_at
840 : .expect("we set it above"),
841 : );
842 : }
843 :
844 : Ok(())
845 : }
846 :
847 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
848 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
849 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
850 CBC 214 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
851 214 : debug_assert_current_span_has_tenant_and_timeline_id();
852 :
853 214 : let layers: Vec<RemotePath> = {
854 214 : let mut locked = self.upload_queue.lock().unwrap();
855 214 : let stopped = locked.stopped_mut()?;
856 :
857 214 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
858 UBC 0 : anyhow::bail!("deleted_at is not set")
859 CBC 214 : }
860 :
861 214 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
862 :
863 214 : stopped
864 214 : .upload_queue_for_deletion
865 214 : .latest_files
866 214 : .drain()
867 4663 : .map(|(file_name, meta)| {
868 4663 : remote_layer_path(
869 4663 : &self.tenant_id,
870 4663 : &self.timeline_id,
871 4663 : &file_name,
872 4663 : meta.generation,
873 4663 : )
874 4663 : })
875 214 : .collect()
876 214 : };
877 214 :
878 214 : let layer_deletion_count = layers.len();
879 214 : self.deletion_queue_client.push_immediate(layers).await?;
880 :
881 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
882 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
883 214 : let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
884 214 :
885 214 : // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
886 214 : // taking the burden of listing all the layers that we already know we should delete.
887 214 : self.deletion_queue_client.flush_immediate().await?;
888 :
889 214 : let remaining = backoff::retry(
890 278 : || async {
891 278 : self.storage_impl
892 278 : .list_files(Some(&timeline_storage_path))
893 824 : .await
894 278 : },
895 214 : |_e| false,
896 214 : FAILED_DOWNLOAD_WARN_THRESHOLD,
897 214 : FAILED_REMOTE_OP_RETRIES,
898 214 : "list_prefixes",
899 214 : backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
900 214 : )
901 824 : .await
902 214 : .context("list prefixes")?;
903 :
904 : // We will delete the current index_part object last, since it acts as a deletion
905 : // marker via its deleted_at attribute
906 214 : let latest_index = remaining
907 214 : .iter()
908 1296 : .filter(|p| {
909 1296 : p.object_name()
910 1296 : .map(|n| n.starts_with(IndexPart::FILE_NAME))
911 1296 : .unwrap_or(false)
912 1296 : })
913 214 : .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
914 214 : .max_by_key(|i| i.1)
915 214 : .map(|i| i.0.clone())
916 214 : .unwrap_or(
917 214 : // No generation-suffixed indices, assume we are dealing with
918 214 : // a legacy index.
919 214 : remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
920 214 : );
921 214 :
922 214 : let remaining_layers: Vec<RemotePath> = remaining
923 214 : .into_iter()
924 1296 : .filter(|p| p!= &latest_index)
925 214 : .inspect(|path| {
926 1090 : if let Some(name) = path.object_name() {
927 1090 : info!(%name, "deleting a file not referenced from index_part.json");
928 : } else {
929 UBC 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
930 : }
931 CBC 1090 : })
932 214 : .collect();
933 214 :
934 214 : let not_referenced_count = remaining_layers.len();
935 214 : if !remaining_layers.is_empty() {
936 52 : self.deletion_queue_client
937 52 : .push_immediate(remaining_layers)
938 UBC 0 : .await?;
939 CBC 162 : }
940 :
941 214 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
942 12 : Err(anyhow::anyhow!(
943 12 : "failpoint: timeline-delete-before-index-delete"
944 12 : ))?
945 214 : });
946 :
947 UBC 0 : debug!("enqueuing index part deletion");
948 CBC 202 : self.deletion_queue_client
949 202 : .push_immediate([latest_index].to_vec())
950 UBC 0 : .await?;
951 :
952 : // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
953 : // for a flush to a persistent deletion list so that we may be sure deletion will occur.
954 CBC 202 : self.deletion_queue_client.flush_immediate().await?;
955 :
956 202 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
957 4 : Err(anyhow::anyhow!(
958 4 : "failpoint: timeline-delete-after-index-delete"
959 4 : ))?
960 202 : });
961 :
962 198 : info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
963 :
964 198 : Ok(())
965 214 : }
966 :
967 : ///
968 : /// Pick next tasks from the queue, and start as many of them as possible without violating
969 : /// the ordering constraints.
970 : ///
971 : /// The caller needs to already hold the `upload_queue` lock.
972 51336 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
973 78876 : while let Some(next_op) = upload_queue.queued_operations.front() {
974 : // Can we run this task now?
975 52694 : let can_run_now = match next_op {
976 : UploadOp::UploadLayer(_, _) => {
977 : // Can always be scheduled.
978 18593 : true
979 : }
980 : UploadOp::UploadMetadata(_, _) => {
981 : // These can only be performed after all the preceding operations
982 : // have finished.
983 27392 : upload_queue.inprogress_tasks.is_empty()
984 : }
985 : UploadOp::Delete(_) => {
986 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
987 891 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
988 : }
989 :
990 5818 : UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
991 : };
992 :
993 : // If we cannot launch this task, don't look any further.
994 : //
995 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
996 : // them now, but we don't try to do that currently. For example, if the frontmost task
997 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
998 : // could still start layer uploads that were scheduled later.
999 52694 : if !can_run_now {
1000 25154 : break;
1001 27540 : }
1002 27540 :
1003 27540 : // We can launch this task. Remove it from the queue first.
1004 27540 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
1005 27540 :
1006 27540 : debug!("starting op: {}", next_op);
1007 :
1008 : // Update the counters
1009 27540 : match next_op {
1010 18593 : UploadOp::UploadLayer(_, _) => {
1011 18593 : upload_queue.num_inprogress_layer_uploads += 1;
1012 18593 : }
1013 6000 : UploadOp::UploadMetadata(_, _) => {
1014 6000 : upload_queue.num_inprogress_metadata_uploads += 1;
1015 6000 : }
1016 588 : UploadOp::Delete(_) => {
1017 588 : upload_queue.num_inprogress_deletions += 1;
1018 588 : }
1019 2359 : UploadOp::Barrier(sender) => {
1020 2359 : sender.send_replace(());
1021 2359 : continue;
1022 : }
1023 : };
1024 :
1025 : // Assign unique ID to this task
1026 25181 : upload_queue.task_counter += 1;
1027 25181 : let upload_task_id = upload_queue.task_counter;
1028 25181 :
1029 25181 : // Add it to the in-progress map
1030 25181 : let task = Arc::new(UploadTask {
1031 25181 : task_id: upload_task_id,
1032 25181 : op: next_op,
1033 25181 : retries: AtomicU32::new(0),
1034 25181 : });
1035 25181 : upload_queue
1036 25181 : .inprogress_tasks
1037 25181 : .insert(task.task_id, Arc::clone(&task));
1038 25181 :
1039 25181 : // Spawn task to perform the task
1040 25181 : let self_rc = Arc::clone(self);
1041 25181 : let tenant_id = self.tenant_id;
1042 25181 : let timeline_id = self.timeline_id;
1043 25181 : task_mgr::spawn(
1044 25181 : &self.runtime,
1045 25181 : TaskKind::RemoteUploadTask,
1046 25181 : Some(self.tenant_id),
1047 25181 : Some(self.timeline_id),
1048 25181 : "remote upload",
1049 : false,
1050 25173 : async move {
1051 3273479 : self_rc.perform_upload_task(task).await;
1052 25091 : Ok(())
1053 25091 : }
1054 25181 : .instrument(info_span!(parent: None, "remote_upload", %tenant_id, %timeline_id, %upload_task_id)),
1055 : );
1056 :
1057 : // Loop back to process next task
1058 : }
1059 51336 : }
1060 :
1061 : ///
1062 : /// Perform an upload task.
1063 : ///
1064 : /// The task is in the `inprogress_tasks` list. This function will try to
1065 : /// execute it, retrying forever. On successful completion, the task is
1066 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1067 : /// queue that were waiting by the completion are launched.
1068 : ///
1069 : /// The task can be shut down, however. That leads to stopping the whole
1070 : /// queue.
1071 : ///
1072 25173 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1073 : // Loop to retry until it completes.
1074 27817 : loop {
1075 27817 : // If we're requested to shut down, close up shop and exit.
1076 27817 : //
1077 27817 : // Note: We only check for the shutdown requests between retries, so
1078 27817 : // if a shutdown request arrives while we're busy uploading, in the
1079 27817 : // upload::upload:*() call below, we will wait not exit until it has
1080 27817 : // finished. We probably could cancel the upload by simply dropping
1081 27817 : // the Future, but we're not 100% sure if the remote storage library
1082 27817 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1083 27817 : // upload finishes or times out soon enough.
1084 27817 : if task_mgr::is_shutdown_requested() {
1085 136 : info!("upload task cancelled by shutdown request");
1086 136 : match self.stop() {
1087 136 : Ok(()) => {}
1088 : Err(StopError::QueueUninitialized) => {
1089 UBC 0 : unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
1090 : }
1091 : }
1092 CBC 136 : return;
1093 27681 : }
1094 :
1095 27681 : let upload_result: anyhow::Result<()> = match &task.op {
1096 20377 : UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
1097 20377 : let path = self
1098 20377 : .conf
1099 20377 : .timeline_path(&self.tenant_id, &self.timeline_id)
1100 20377 : .join(layer_file_name.file_name());
1101 20377 :
1102 20377 : upload::upload_timeline_layer(
1103 20377 : self.conf,
1104 20377 : &self.storage_impl,
1105 20377 : &path,
1106 20377 : layer_metadata,
1107 20377 : self.generation,
1108 20377 : )
1109 20377 : .measure_remote_op(
1110 20377 : self.tenant_id,
1111 20377 : self.timeline_id,
1112 20377 : RemoteOpFileKind::Layer,
1113 20377 : RemoteOpKind::Upload,
1114 20377 : Arc::clone(&self.metrics),
1115 20377 : )
1116 3246491 : .await
1117 : }
1118 6718 : UploadOp::UploadMetadata(ref index_part, _lsn) => {
1119 6718 : let mention_having_future_layers = if cfg!(feature = "testing") {
1120 6718 : index_part
1121 6718 : .layer_metadata
1122 6718 : .keys()
1123 415960 : .any(|x| x.is_in_future(*_lsn))
1124 : } else {
1125 UBC 0 : false
1126 : };
1127 :
1128 CBC 6718 : let res = upload::upload_index_part(
1129 6718 : &self.storage_impl,
1130 6718 : &self.tenant_id,
1131 6718 : &self.timeline_id,
1132 6718 : self.generation,
1133 6718 : index_part,
1134 6718 : )
1135 6718 : .measure_remote_op(
1136 6718 : self.tenant_id,
1137 6718 : self.timeline_id,
1138 6718 : RemoteOpFileKind::Index,
1139 6718 : RemoteOpKind::Upload,
1140 6718 : Arc::clone(&self.metrics),
1141 6718 : )
1142 26464 : .await;
1143 6713 : if res.is_ok() {
1144 5988 : self.update_remote_physical_size_gauge(Some(index_part));
1145 5988 : if mention_having_future_layers {
1146 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1147 12 : tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
1148 5976 : }
1149 725 : }
1150 6713 : res
1151 : }
1152 586 : UploadOp::Delete(delete) => self
1153 586 : .deletion_queue_client
1154 586 : .push_layers(
1155 586 : self.tenant_id,
1156 586 : self.timeline_id,
1157 586 : self.generation,
1158 586 : delete.layers.clone(),
1159 586 : )
1160 518 : .await
1161 586 : .map_err(|e| anyhow::anyhow!(e)),
1162 : UploadOp::Barrier(_) => {
1163 : // unreachable. Barrier operations are handled synchronously in
1164 : // launch_queued_tasks
1165 UBC 0 : warn!("unexpected Barrier operation in perform_upload_task");
1166 0 : break;
1167 : }
1168 : };
1169 :
1170 CBC 27600 : match upload_result {
1171 : Ok(()) => {
1172 24955 : break;
1173 : }
1174 2645 : Err(e) => {
1175 2645 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1176 2645 :
1177 2645 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1178 2645 : // or other external reasons. Such issues are relatively regular, so log them
1179 2645 : // at info level at first, and only WARN if the operation fails repeatedly.
1180 2645 : //
1181 2645 : // (See similar logic for downloads in `download::download_retry`)
1182 2645 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1183 2644 : info!(
1184 2644 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1185 2644 : task.op, retries, e
1186 2644 : );
1187 : } else {
1188 1 : warn!(
1189 1 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1190 1 : task.op, retries, e
1191 1 : );
1192 : }
1193 :
1194 : // sleep until it's time to retry, or we're cancelled
1195 2645 : exponential_backoff(
1196 2645 : retries,
1197 2645 : DEFAULT_BASE_BACKOFF_SECONDS,
1198 2645 : DEFAULT_MAX_BACKOFF_SECONDS,
1199 2645 : &shutdown_token(),
1200 2645 : )
1201 6 : .await;
1202 : }
1203 : }
1204 : }
1205 :
1206 24955 : let retries = task.retries.load(Ordering::SeqCst);
1207 24955 : if retries > 0 {
1208 2501 : info!(
1209 2501 : "remote task {} completed successfully after {} retries",
1210 2501 : task.op, retries
1211 2501 : );
1212 : } else {
1213 UBC 0 : debug!("remote task {} completed successfully", task.op);
1214 : }
1215 :
1216 : // The task has completed successfully. Remove it from the in-progress list.
1217 CBC 23673 : let lsn_update = {
1218 24955 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1219 24955 : let upload_queue = match upload_queue_guard.deref_mut() {
1220 UBC 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1221 CBC 1282 : UploadQueue::Stopped(_stopped) => {
1222 1282 : None
1223 : },
1224 23673 : UploadQueue::Initialized(qi) => { Some(qi) }
1225 : };
1226 :
1227 24955 : let upload_queue = match upload_queue {
1228 23673 : Some(upload_queue) => upload_queue,
1229 : None => {
1230 1282 : info!("another concurrent task already stopped the queue");
1231 1282 : return;
1232 : }
1233 : };
1234 :
1235 23673 : upload_queue.inprogress_tasks.remove(&task.task_id);
1236 :
1237 23673 : let lsn_update = match task.op {
1238 : UploadOp::UploadLayer(_, _) => {
1239 17123 : upload_queue.num_inprogress_layer_uploads -= 1;
1240 17123 : None
1241 : }
1242 5975 : UploadOp::UploadMetadata(_, lsn) => {
1243 5975 : upload_queue.num_inprogress_metadata_uploads -= 1;
1244 5975 : // XXX monotonicity check?
1245 5975 :
1246 5975 : upload_queue.projected_remote_consistent_lsn = Some(lsn);
1247 5975 : if self.generation.is_none() {
1248 : // Legacy mode: skip validating generation
1249 5547 : upload_queue.visible_remote_consistent_lsn.store(lsn);
1250 5547 : None
1251 : } else {
1252 428 : Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
1253 : }
1254 : }
1255 : UploadOp::Delete(_) => {
1256 575 : upload_queue.num_inprogress_deletions -= 1;
1257 575 : None
1258 : }
1259 UBC 0 : UploadOp::Barrier(_) => unreachable!(),
1260 : };
1261 :
1262 : // Launch any queued tasks that were unblocked by this one.
1263 CBC 23673 : self.launch_queued_tasks(upload_queue);
1264 23673 : lsn_update
1265 : };
1266 :
1267 23673 : if let Some((lsn, slot)) = lsn_update {
1268 : // Updates to the remote_consistent_lsn we advertise to pageservers
1269 : // are all routed through the DeletionQueue, to enforce important
1270 : // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
1271 428 : self.deletion_queue_client
1272 428 : .update_remote_consistent_lsn(
1273 428 : self.tenant_id,
1274 428 : self.timeline_id,
1275 428 : self.generation,
1276 428 : lsn,
1277 428 : slot,
1278 428 : )
1279 UBC 0 : .await;
1280 CBC 23245 : }
1281 :
1282 23673 : self.calls_unfinished_metric_end(&task.op);
1283 25091 : }
1284 :
1285 49074 : fn calls_unfinished_metric_impl(
1286 49074 : &self,
1287 49074 : op: &UploadOp,
1288 49074 : ) -> Option<(
1289 49074 : RemoteOpFileKind,
1290 49074 : RemoteOpKind,
1291 49074 : RemoteTimelineClientMetricsCallTrackSize,
1292 49074 : )> {
1293 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
1294 49074 : let res = match op {
1295 35730 : UploadOp::UploadLayer(_, m) => (
1296 35730 : RemoteOpFileKind::Layer,
1297 35730 : RemoteOpKind::Upload,
1298 35730 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
1299 35730 : ),
1300 12080 : UploadOp::UploadMetadata(_, _) => (
1301 12080 : RemoteOpFileKind::Index,
1302 12080 : RemoteOpKind::Upload,
1303 12080 : DontTrackSize {
1304 12080 : reason: "metadata uploads are tiny",
1305 12080 : },
1306 12080 : ),
1307 1263 : UploadOp::Delete(_delete) => (
1308 1263 : RemoteOpFileKind::Layer,
1309 1263 : RemoteOpKind::Delete,
1310 1263 : DontTrackSize {
1311 1263 : reason: "should we track deletes? positive or negative sign?",
1312 1263 : },
1313 1263 : ),
1314 : UploadOp::Barrier(_) => {
1315 : // we do not account these
1316 1 : return None;
1317 : }
1318 : };
1319 49073 : Some(res)
1320 49074 : }
1321 :
1322 25298 : fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
1323 25298 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1324 25298 : Some(x) => x,
1325 UBC 0 : None => return,
1326 : };
1327 CBC 25298 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
1328 25298 : guard.will_decrement_manually(); // in unfinished_ops_metric_end()
1329 25298 : }
1330 :
1331 23776 : fn calls_unfinished_metric_end(&self, op: &UploadOp) {
1332 23776 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1333 23775 : Some(x) => x,
1334 1 : None => return,
1335 : };
1336 23775 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
1337 23776 : }
1338 :
1339 : /// Close the upload queue for new operations and cancel queued operations.
1340 : /// In-progress operations will still be running after this function returns.
1341 : /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
1342 : /// to wait for them to complete, after calling this function.
1343 382 : pub fn stop(&self) -> Result<(), StopError> {
1344 382 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
1345 382 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
1346 382 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
1347 382 : let mut guard = self.upload_queue.lock().unwrap();
1348 382 : match &mut *guard {
1349 UBC 0 : UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
1350 : UploadQueue::Stopped(_) => {
1351 : // nothing to do
1352 CBC 160 : info!("another concurrent task already shut down the queue");
1353 160 : Ok(())
1354 : }
1355 222 : UploadQueue::Initialized(initialized) => {
1356 222 : info!("shutting down upload queue");
1357 :
1358 : // Replace the queue with the Stopped state, taking ownership of the old
1359 : // Initialized queue. We will do some checks on it, and then drop it.
1360 222 : let qi = {
1361 : // Here we preserve working version of the upload queue for possible use during deletions.
1362 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
1363 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
1364 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
1365 222 : let upload_queue_for_deletion = UploadQueueInitialized {
1366 222 : task_counter: 0,
1367 222 : latest_files: initialized.latest_files.clone(),
1368 222 : latest_files_changes_since_metadata_upload_scheduled: 0,
1369 222 : latest_metadata: initialized.latest_metadata.clone(),
1370 222 : projected_remote_consistent_lsn: None,
1371 222 : visible_remote_consistent_lsn: initialized
1372 222 : .visible_remote_consistent_lsn
1373 222 : .clone(),
1374 222 : num_inprogress_layer_uploads: 0,
1375 222 : num_inprogress_metadata_uploads: 0,
1376 222 : num_inprogress_deletions: 0,
1377 222 : inprogress_tasks: HashMap::default(),
1378 222 : queued_operations: VecDeque::default(),
1379 222 : };
1380 222 :
1381 222 : let upload_queue = std::mem::replace(
1382 222 : &mut *guard,
1383 222 : UploadQueue::Stopped(UploadQueueStopped {
1384 222 : upload_queue_for_deletion,
1385 222 : deleted_at: SetDeletedFlagProgress::NotRunning,
1386 222 : }),
1387 222 : );
1388 222 : if let UploadQueue::Initialized(qi) = upload_queue {
1389 222 : qi
1390 : } else {
1391 UBC 0 : unreachable!("we checked in the match above that it is Initialized");
1392 : }
1393 : };
1394 :
1395 : // consistency check
1396 CBC 222 : assert_eq!(
1397 222 : qi.num_inprogress_layer_uploads
1398 222 : + qi.num_inprogress_metadata_uploads
1399 222 : + qi.num_inprogress_deletions,
1400 222 : qi.inprogress_tasks.len()
1401 222 : );
1402 :
1403 : // We don't need to do anything here for in-progress tasks. They will finish
1404 : // on their own, decrement the unfinished-task counter themselves, and observe
1405 : // that the queue is Stopped.
1406 222 : drop(qi.inprogress_tasks);
1407 :
1408 : // Tear down queued ops
1409 222 : for op in qi.queued_operations.into_iter() {
1410 103 : self.calls_unfinished_metric_end(&op);
1411 103 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
1412 103 : // which is exactly what we want to happen.
1413 103 : drop(op);
1414 103 : }
1415 :
1416 : // We're done.
1417 222 : drop(guard);
1418 222 : Ok(())
1419 : }
1420 : }
1421 382 : }
1422 :
1423 591 : pub(crate) fn get_layer_metadata(
1424 591 : &self,
1425 591 : name: &LayerFileName,
1426 591 : ) -> anyhow::Result<Option<LayerFileMetadata>> {
1427 591 : self.upload_queue.lock().unwrap().get_layer_metadata(name)
1428 591 : }
1429 : }
1430 :
1431 328 : pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
1432 328 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
1433 328 : RemotePath::from_string(&path).expect("Failed to construct path")
1434 328 : }
1435 :
1436 280 : pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
1437 280 : remote_timelines_path(tenant_id).join(Utf8Path::new(&timeline_id.to_string()))
1438 280 : }
1439 :
1440 11553 : pub fn remote_layer_path(
1441 11553 : tenant_id: &TenantId,
1442 11553 : timeline_id: &TimelineId,
1443 11553 : layer_file_name: &LayerFileName,
1444 11553 : generation: Generation,
1445 11553 : ) -> RemotePath {
1446 11553 : // Generation-aware key format
1447 11553 : let path = format!(
1448 11553 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1449 11553 : layer_file_name.file_name(),
1450 11553 : generation.get_suffix()
1451 11553 : );
1452 11553 :
1453 11553 : RemotePath::from_string(&path).expect("Failed to construct path")
1454 11553 : }
1455 :
1456 7573 : pub fn remote_index_path(
1457 7573 : tenant_id: &TenantId,
1458 7573 : timeline_id: &TimelineId,
1459 7573 : generation: Generation,
1460 7573 : ) -> RemotePath {
1461 7573 : RemotePath::from_string(&format!(
1462 7573 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1463 7573 : IndexPart::FILE_NAME,
1464 7573 : generation.get_suffix()
1465 7573 : ))
1466 7573 : .expect("Failed to construct path")
1467 7573 : }
1468 :
1469 : /// Given the key of an index, parse out the generation part of the name
1470 221 : pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
1471 221 : let file_name = match path.get_path().file_name() {
1472 221 : Some(f) => f,
1473 : None => {
1474 : // Unexpected: we should be seeing index_part.json paths only
1475 UBC 0 : tracing::warn!("Malformed index key {}", path);
1476 0 : return None;
1477 : }
1478 : };
1479 :
1480 CBC 221 : match file_name.split_once('-') {
1481 11 : Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
1482 210 : None => None,
1483 : }
1484 221 : }
1485 :
1486 : /// Files on the remote storage are stored with paths, relative to the workdir.
1487 : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
1488 : ///
1489 : /// Errors if the path provided does not start from pageserver's workdir.
1490 20372 : pub fn remote_path(
1491 20372 : conf: &PageServerConf,
1492 20372 : local_path: &Utf8Path,
1493 20372 : generation: Generation,
1494 20372 : ) -> anyhow::Result<RemotePath> {
1495 20372 : let stripped = local_path
1496 20372 : .strip_prefix(&conf.workdir)
1497 20372 : .context("Failed to strip workdir prefix")?;
1498 :
1499 20372 : let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
1500 20372 :
1501 20372 : RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
1502 UBC 0 : format!(
1503 0 : "to resolve remote part of path {:?} for base {:?}",
1504 0 : local_path, conf.workdir
1505 0 : )
1506 CBC 20372 : })
1507 20372 : }
1508 :
1509 : #[cfg(test)]
1510 : mod tests {
1511 : use super::*;
1512 : use crate::{
1513 : context::RequestContext,
1514 : tenant::{
1515 : harness::{TenantHarness, TIMELINE_ID},
1516 : Generation, Tenant, Timeline,
1517 : },
1518 : DEFAULT_PG_VERSION,
1519 : };
1520 :
1521 : use std::collections::HashSet;
1522 : use utils::lsn::Lsn;
1523 :
1524 4 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
1525 4 : format!("contents for {name}").into()
1526 4 : }
1527 :
1528 1 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
1529 1 : let metadata = TimelineMetadata::new(
1530 1 : disk_consistent_lsn,
1531 1 : None,
1532 1 : None,
1533 1 : Lsn(0),
1534 1 : Lsn(0),
1535 1 : Lsn(0),
1536 1 : // Any version will do
1537 1 : // but it should be consistent with the one in the tests
1538 1 : crate::DEFAULT_PG_VERSION,
1539 1 : );
1540 1 :
1541 1 : // go through serialize + deserialize to fix the header, including checksum
1542 1 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
1543 1 : }
1544 :
1545 1 : fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
1546 3 : let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
1547 1 : avec.sort();
1548 1 :
1549 1 : let mut bvec = b.to_vec();
1550 1 : bvec.sort_unstable();
1551 1 :
1552 1 : assert_eq!(avec, bvec);
1553 1 : }
1554 :
1555 2 : fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
1556 2 : let mut expected: Vec<String> = expected
1557 2 : .iter()
1558 8 : .map(|x| format!("{}{}", x, generation.get_suffix()))
1559 2 : .collect();
1560 2 : expected.sort();
1561 2 :
1562 2 : let mut found: Vec<String> = Vec::new();
1563 8 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
1564 8 : let entry_name = entry.file_name();
1565 8 : let fname = entry_name.to_str().unwrap();
1566 8 : found.push(String::from(fname));
1567 8 : }
1568 2 : found.sort();
1569 2 :
1570 2 : assert_eq!(found, expected);
1571 2 : }
1572 :
1573 : struct TestSetup {
1574 : harness: TenantHarness,
1575 : tenant: Arc<Tenant>,
1576 : timeline: Arc<Timeline>,
1577 : tenant_ctx: RequestContext,
1578 : }
1579 :
1580 : impl TestSetup {
1581 4 : async fn new(test_name: &str) -> anyhow::Result<Self> {
1582 4 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
1583 4 : let harness = TenantHarness::create(test_name)?;
1584 4 : let (tenant, ctx) = harness.load().await;
1585 :
1586 4 : let timeline = tenant
1587 4 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1588 8 : .await?;
1589 :
1590 4 : Ok(Self {
1591 4 : harness,
1592 4 : tenant,
1593 4 : timeline,
1594 4 : tenant_ctx: ctx,
1595 4 : })
1596 4 : }
1597 :
1598 : /// Construct a RemoteTimelineClient in an arbitrary generation
1599 5 : fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
1600 5 : Arc::new(RemoteTimelineClient {
1601 5 : conf: self.harness.conf,
1602 5 : runtime: tokio::runtime::Handle::current(),
1603 5 : tenant_id: self.harness.tenant_id,
1604 5 : timeline_id: TIMELINE_ID,
1605 5 : generation,
1606 5 : storage_impl: self.harness.remote_storage.clone(),
1607 5 : deletion_queue_client: self.harness.deletion_queue.new_client(),
1608 5 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
1609 5 : metrics: Arc::new(RemoteTimelineClientMetrics::new(
1610 5 : &self.harness.tenant_id,
1611 5 : &TIMELINE_ID,
1612 5 : )),
1613 5 : })
1614 5 : }
1615 :
1616 : /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
1617 : /// and timeline_id are present.
1618 3 : fn span(&self) -> tracing::Span {
1619 3 : tracing::info_span!(
1620 3 : "test",
1621 3 : tenant_id = %self.harness.tenant_id,
1622 3 : timeline_id = %TIMELINE_ID
1623 3 : )
1624 3 : }
1625 : }
1626 :
1627 : // Test scheduling
1628 1 : #[tokio::test]
1629 1 : async fn upload_scheduling() {
1630 : // Test outline:
1631 : //
1632 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
1633 : // Schedule upload of index. Check that it is queued
1634 : // let the layer file uploads finish. Check that the index-upload is now started
1635 : // let the index-upload finish.
1636 : //
1637 : // Download back the index.json. Check that the list of files is correct
1638 : //
1639 : // Schedule upload. Schedule deletion. Check that the deletion is queued
1640 : // let upload finish. Check that deletion is now started
1641 : // Schedule another deletion. Check that it's launched immediately.
1642 : // Schedule index upload. Check that it's queued
1643 :
1644 3 : let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
1645 1 : let span = test_setup.span();
1646 1 : let _guard = span.enter();
1647 1 :
1648 1 : let TestSetup {
1649 1 : harness,
1650 1 : tenant: _tenant,
1651 1 : timeline,
1652 1 : tenant_ctx: _tenant_ctx,
1653 1 : } = test_setup;
1654 1 :
1655 1 : let client = timeline.remote_client.as_ref().unwrap();
1656 :
1657 : // Download back the index.json, and check that the list of files is correct
1658 3 : let initial_index_part = match client.download_index_file().await.unwrap() {
1659 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1660 UBC 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1661 : };
1662 CBC 1 : let initial_layers = initial_index_part
1663 1 : .layer_metadata
1664 1 : .keys()
1665 1 : .map(|f| f.to_owned())
1666 1 : .collect::<HashSet<LayerFileName>>();
1667 1 : let initial_layer = {
1668 1 : assert!(initial_layers.len() == 1);
1669 1 : initial_layers.into_iter().next().unwrap()
1670 1 : };
1671 1 :
1672 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
1673 1 :
1674 1 : println!("workdir: {}", harness.conf.workdir);
1675 1 :
1676 1 : let remote_timeline_dir = harness
1677 1 : .remote_fs_dir
1678 1 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
1679 1 : println!("remote_timeline_dir: {remote_timeline_dir}");
1680 1 :
1681 1 : let generation = harness.generation;
1682 1 :
1683 1 : // Create a couple of dummy files, schedule upload for them
1684 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
1685 1 : let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
1686 1 : let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
1687 1 : let content_1 = dummy_contents("foo");
1688 1 : let content_2 = dummy_contents("bar");
1689 1 : let content_3 = dummy_contents("baz");
1690 :
1691 3 : for (filename, content) in [
1692 1 : (&layer_file_name_1, &content_1),
1693 1 : (&layer_file_name_2, &content_2),
1694 1 : (&layer_file_name_3, &content_3),
1695 3 : ] {
1696 3 : std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
1697 3 : }
1698 :
1699 1 : client
1700 1 : .schedule_layer_file_upload(
1701 1 : &layer_file_name_1,
1702 1 : &LayerFileMetadata::new(content_1.len() as u64, generation),
1703 1 : )
1704 1 : .unwrap();
1705 1 : client
1706 1 : .schedule_layer_file_upload(
1707 1 : &layer_file_name_2,
1708 1 : &LayerFileMetadata::new(content_2.len() as u64, generation),
1709 1 : )
1710 1 : .unwrap();
1711 1 :
1712 1 : // Check that they are started immediately, not queued
1713 1 : //
1714 1 : // this works because we running within block_on, so any futures are now queued up until
1715 1 : // our next await point.
1716 1 : {
1717 1 : let mut guard = client.upload_queue.lock().unwrap();
1718 1 : let upload_queue = guard.initialized_mut().unwrap();
1719 1 : assert!(upload_queue.queued_operations.is_empty());
1720 1 : assert!(upload_queue.inprogress_tasks.len() == 2);
1721 1 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
1722 :
1723 : // also check that `latest_file_changes` was updated
1724 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
1725 : }
1726 :
1727 : // Schedule upload of index. Check that it is queued
1728 1 : let metadata = dummy_metadata(Lsn(0x20));
1729 1 : client
1730 1 : .schedule_index_upload_for_metadata_update(&metadata)
1731 1 : .unwrap();
1732 1 : {
1733 1 : let mut guard = client.upload_queue.lock().unwrap();
1734 1 : let upload_queue = guard.initialized_mut().unwrap();
1735 1 : assert!(upload_queue.queued_operations.len() == 1);
1736 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
1737 : }
1738 :
1739 : // Wait for the uploads to finish
1740 1 : client.wait_completion().await.unwrap();
1741 1 : {
1742 1 : let mut guard = client.upload_queue.lock().unwrap();
1743 1 : let upload_queue = guard.initialized_mut().unwrap();
1744 1 :
1745 1 : assert!(upload_queue.queued_operations.is_empty());
1746 1 : assert!(upload_queue.inprogress_tasks.is_empty());
1747 : }
1748 :
1749 : // Download back the index.json, and check that the list of files is correct
1750 3 : let index_part = match client.download_index_file().await.unwrap() {
1751 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1752 UBC 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1753 : };
1754 :
1755 CBC 1 : assert_file_list(
1756 1 : &index_part
1757 1 : .layer_metadata
1758 1 : .keys()
1759 3 : .map(|f| f.to_owned())
1760 1 : .collect(),
1761 1 : &[
1762 1 : &initial_layer.file_name(),
1763 1 : &layer_file_name_1.file_name(),
1764 1 : &layer_file_name_2.file_name(),
1765 1 : ],
1766 1 : );
1767 1 : assert_eq!(index_part.metadata, metadata);
1768 :
1769 : // Schedule upload and then a deletion. Check that the deletion is queued
1770 1 : client
1771 1 : .schedule_layer_file_upload(
1772 1 : &layer_file_name_3,
1773 1 : &LayerFileMetadata::new(content_3.len() as u64, generation),
1774 1 : )
1775 1 : .unwrap();
1776 1 : client
1777 1 : .schedule_layer_file_deletion([layer_file_name_1.clone()].to_vec())
1778 1 : .unwrap();
1779 1 : {
1780 1 : let mut guard = client.upload_queue.lock().unwrap();
1781 1 : let upload_queue = guard.initialized_mut().unwrap();
1782 1 :
1783 1 : // Deletion schedules upload of the index file, and the file deletion itself
1784 1 : assert!(upload_queue.queued_operations.len() == 2);
1785 1 : assert!(upload_queue.inprogress_tasks.len() == 1);
1786 1 : assert!(upload_queue.num_inprogress_layer_uploads == 1);
1787 1 : assert!(upload_queue.num_inprogress_deletions == 0);
1788 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
1789 : }
1790 1 : assert_remote_files(
1791 1 : &[
1792 1 : &initial_layer.file_name(),
1793 1 : &layer_file_name_1.file_name(),
1794 1 : &layer_file_name_2.file_name(),
1795 1 : "index_part.json",
1796 1 : ],
1797 1 : &remote_timeline_dir,
1798 1 : generation,
1799 1 : );
1800 1 :
1801 1 : // Finish them
1802 1 : client.wait_completion().await.unwrap();
1803 1 : harness.deletion_queue.pump().await;
1804 :
1805 1 : assert_remote_files(
1806 1 : &[
1807 1 : &initial_layer.file_name(),
1808 1 : &layer_file_name_2.file_name(),
1809 1 : &layer_file_name_3.file_name(),
1810 1 : "index_part.json",
1811 1 : ],
1812 1 : &remote_timeline_dir,
1813 1 : generation,
1814 1 : );
1815 : }
1816 :
1817 1 : #[tokio::test]
1818 1 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
1819 : // Setup
1820 :
1821 : let TestSetup {
1822 1 : harness,
1823 1 : tenant: _tenant,
1824 1 : timeline,
1825 : ..
1826 3 : } = TestSetup::new("metrics").await.unwrap();
1827 1 : let client = timeline.remote_client.as_ref().unwrap();
1828 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
1829 1 :
1830 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
1831 1 : let content_1 = dummy_contents("foo");
1832 1 : std::fs::write(
1833 1 : timeline_path.join(layer_file_name_1.file_name()),
1834 1 : &content_1,
1835 1 : )
1836 1 : .unwrap();
1837 1 :
1838 2 : #[derive(Debug, PartialEq, Clone, Copy)]
1839 1 : struct BytesStartedFinished {
1840 1 : started: Option<usize>,
1841 1 : finished: Option<usize>,
1842 1 : }
1843 1 : impl std::ops::Add for BytesStartedFinished {
1844 1 : type Output = Self;
1845 2 : fn add(self, rhs: Self) -> Self::Output {
1846 2 : Self {
1847 2 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
1848 2 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
1849 2 : }
1850 2 : }
1851 1 : }
1852 3 : let get_bytes_started_stopped = || {
1853 3 : let started = client
1854 3 : .metrics
1855 3 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
1856 3 : .map(|v| v.try_into().unwrap());
1857 3 : let stopped = client
1858 3 : .metrics
1859 3 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
1860 3 : .map(|v| v.try_into().unwrap());
1861 3 : BytesStartedFinished {
1862 3 : started,
1863 3 : finished: stopped,
1864 3 : }
1865 3 : };
1866 :
1867 : // Test
1868 1 : tracing::info!("now doing actual test");
1869 :
1870 1 : let actual_a = get_bytes_started_stopped();
1871 1 :
1872 1 : client
1873 1 : .schedule_layer_file_upload(
1874 1 : &layer_file_name_1,
1875 1 : &LayerFileMetadata::new(content_1.len() as u64, harness.generation),
1876 1 : )
1877 1 : .unwrap();
1878 1 :
1879 1 : let actual_b = get_bytes_started_stopped();
1880 1 :
1881 1 : client.wait_completion().await.unwrap();
1882 1 :
1883 1 : let actual_c = get_bytes_started_stopped();
1884 1 :
1885 1 : // Validate
1886 1 :
1887 1 : let expected_b = actual_a
1888 1 : + BytesStartedFinished {
1889 1 : started: Some(content_1.len()),
1890 1 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
1891 1 : finished: Some(0),
1892 1 : };
1893 1 : assert_eq!(actual_b, expected_b);
1894 :
1895 1 : let expected_c = actual_a
1896 1 : + BytesStartedFinished {
1897 1 : started: Some(content_1.len()),
1898 1 : finished: Some(content_1.len()),
1899 1 : };
1900 1 : assert_eq!(actual_c, expected_c);
1901 : }
1902 :
1903 6 : async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
1904 6 : // An empty IndexPart, just sufficient to ensure deserialization will succeed
1905 6 : let example_metadata = TimelineMetadata::example();
1906 6 : let example_index_part = IndexPart::new(
1907 6 : HashMap::new(),
1908 6 : example_metadata.disk_consistent_lsn(),
1909 6 : example_metadata,
1910 6 : );
1911 6 :
1912 6 : let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
1913 6 :
1914 6 : let timeline_path = test_state.harness.timeline_path(&TIMELINE_ID);
1915 6 : let remote_timeline_dir = test_state.harness.remote_fs_dir.join(
1916 6 : timeline_path
1917 6 : .strip_prefix(&test_state.harness.conf.workdir)
1918 6 : .unwrap(),
1919 6 : );
1920 6 :
1921 6 : std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
1922 6 :
1923 6 : let index_path = test_state.harness.remote_fs_dir.join(
1924 6 : remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
1925 6 : );
1926 6 : eprintln!("Writing {index_path}");
1927 6 : std::fs::write(&index_path, index_part_bytes).unwrap();
1928 6 : example_index_part
1929 6 : }
1930 :
1931 : /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
1932 : /// index, the IndexPart returned is equal to `expected`
1933 5 : async fn assert_got_index_part(
1934 5 : test_state: &TestSetup,
1935 5 : get_generation: Generation,
1936 5 : expected: &IndexPart,
1937 5 : ) {
1938 5 : let client = test_state.build_client(get_generation);
1939 :
1940 5 : let download_r = client
1941 5 : .download_index_file()
1942 18 : .await
1943 5 : .expect("download should always succeed");
1944 5 : assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
1945 5 : match download_r {
1946 5 : MaybeDeletedIndexPart::IndexPart(index_part) => {
1947 5 : assert_eq!(&index_part, expected);
1948 : }
1949 UBC 0 : MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
1950 : }
1951 CBC 5 : }
1952 :
1953 1 : #[tokio::test]
1954 1 : async fn index_part_download_simple() -> anyhow::Result<()> {
1955 3 : let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
1956 1 : let span = test_state.span();
1957 1 : let _guard = span.enter();
1958 1 :
1959 1 : // Simple case: we are in generation N, load the index from generation N - 1
1960 1 : let generation_n = 5;
1961 1 : let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
1962 :
1963 3 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
1964 :
1965 1 : Ok(())
1966 : }
1967 :
1968 1 : #[tokio::test]
1969 1 : async fn index_part_download_ordering() -> anyhow::Result<()> {
1970 1 : let test_state = TestSetup::new("index_part_download_ordering")
1971 3 : .await
1972 1 : .unwrap();
1973 1 :
1974 1 : let span = test_state.span();
1975 1 : let _guard = span.enter();
1976 1 :
1977 1 : // A generation-less IndexPart exists in the bucket, we should find it
1978 1 : let generation_n = 5;
1979 1 : let injected_none = inject_index_part(&test_state, Generation::none()).await;
1980 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
1981 :
1982 : // If a more recent-than-none generation exists, we should prefer to load that
1983 1 : let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
1984 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
1985 :
1986 : // If a more-recent-than-me generation exists, we should ignore it.
1987 1 : let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
1988 4 : assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
1989 :
1990 : // If a directly previous generation exists, _and_ an index exists in my own
1991 : // generation, I should prefer my own generation.
1992 1 : let _injected_prev =
1993 1 : inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
1994 1 : let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
1995 1 : assert_got_index_part(
1996 1 : &test_state,
1997 1 : Generation::new(generation_n),
1998 1 : &injected_current,
1999 1 : )
2000 3 : .await;
2001 :
2002 1 : Ok(())
2003 : }
2004 : }
|