Line data Source code
1 : //! This module manages synchronizing local FS with remote storage.
2 : //!
3 : //! # Overview
4 : //!
5 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
6 : //! It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
7 : //! when it's safe to do so.
8 : //!
9 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
10 : //!
11 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
12 : //!
13 : //! # APIs & How To Use Them
14 : //!
15 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
16 : //! unless the pageserver is configured without remote storage.
17 : //!
18 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
19 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
20 : //! timeline.
21 : //! However, the client does not become ready for use until we've initialized its upload queue:
22 : //!
23 : //! - For timelines that already have some state on the remote storage, we use
24 : //! [`RemoteTimelineClient::init_upload_queue`] .
25 : //! - For newly created timelines, we use
26 : //! [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
27 : //!
28 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
29 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
30 : //!
31 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
32 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
33 : //!
34 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`] when we've created a new layer file.
35 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
36 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
37 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
38 : //!
39 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
40 : //!
41 : //! There are also APIs for downloading files.
42 : //! These are not part of the aforementioned queuing and will not be discussed
43 : //! further here, except in the section covering tenant attach.
44 : //!
45 : //! # Remote Storage Structure & [`IndexPart`] Index File
46 : //!
47 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
48 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
49 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
50 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
51 : //! files for a given timeline.
52 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
53 : //!
54 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
55 : //!
56 : //! # Consistency
57 : //!
58 : //! To have a consistent remote structure, it's important that uploads and
59 : //! deletions are performed in the right order. For example, the index file
60 : //! contains a list of layer files, so it must not be uploaded until all the
61 : //! layer files that are in its list have been successfully uploaded.
62 : //!
63 : //! The contract between client and its user is that the user is responsible of
64 : //! scheduling operations in an order that keeps the remote consistent as
65 : //! described above.
66 : //! From the user's perspective, the operations are executed sequentially.
67 : //! Internally, the client knows which operations can be performed in parallel,
68 : //! and which operations act like a "barrier" that require preceding operations
69 : //! to finish. The calling code just needs to call the schedule-functions in the
70 : //! correct order, and the client will parallelize the operations in a way that
71 : //! is safe.
72 : //!
73 : //! The caller should be careful with deletion, though. They should not delete
74 : //! local files that have been scheduled for upload but not yet finished uploading.
75 : //! Otherwise the upload will fail. To wait for an upload to finish, use
76 : //! the 'wait_completion' function (more on that later.)
77 : //!
78 : //! All of this relies on the following invariants:
79 : //!
80 : //! - We rely on read-after write consistency in the remote storage.
81 : //! - Layer files are immutable
82 : //!
83 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
84 : //! storage. Different tenants can be attached to different pageservers, but if the
85 : //! same tenant is attached to two pageservers at the same time, they will overwrite
86 : //! each other's index file updates, and confusion will ensue. There's no interlock or
87 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
88 : //! that that doesn't happen.
89 : //!
90 : //! ## Implementation Note
91 : //!
92 : //! The *actual* remote state lags behind the *desired* remote state while
93 : //! there are in-flight operations.
94 : //! We keep track of the desired remote state in
95 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
96 : //! It is initialized based on the [`IndexPart`] that was passed during init
97 : //! and updated with every `schedule_*` function call.
98 : //! All this is necessary necessary to compute the future [`IndexPart`]s
99 : //! when scheduling an operation while other operations that also affect the
100 : //! remote [`IndexPart`] are in flight.
101 : //!
102 : //! # Retries & Error Handling
103 : //!
104 : //! The client retries operations indefinitely, using exponential back-off.
105 : //! There is no way to force a retry, i.e., interrupt the back-off.
106 : //! This could be built easily.
107 : //!
108 : //! # Cancellation
109 : //!
110 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
111 : //! the client's tenant and timeline.
112 : //! Dropping the client will drop queued operations but not executing operations.
113 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
114 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
115 : //!
116 : //! # Completion
117 : //!
118 : //! Once an operation has completed, we update
119 : //! [`UploadQueueInitialized::last_uploaded_consistent_lsn`] which indicates
120 : //! to safekeepers that they can delete the WAL up to that LSN.
121 : //!
122 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
123 : //! for all pending operations to complete. It does not prevent more
124 : //! operations from getting scheduled.
125 : //!
126 : //! # Crash Consistency
127 : //!
128 : //! We do not persist the upload queue state.
129 : //! If we drop the client, or crash, all unfinished operations are lost.
130 : //!
131 : //! To recover, the following steps need to be taken:
132 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
133 : //! consistent remote state, assuming the user scheduled the operations in
134 : //! the correct order.
135 : //! - Initiate upload queue with that [`IndexPart`].
136 : //! - Reschedule all lost operations by comparing the local filesystem state
137 : //! and remote state as per [`IndexPart`]. This is done in
138 : //! [`Tenant::timeline_init_and_sync`].
139 : //!
140 : //! Note that if we crash during file deletion between the index update
141 : //! that removes the file from the list of files, and deleting the remote file,
142 : //! the file is leaked in the remote storage. Similarly, if a new file is created
143 : //! and uploaded, but the pageserver dies permanently before updating the
144 : //! remote index file, the new file is leaked in remote storage. We accept and
145 : //! tolerate that for now.
146 : //! Note further that we cannot easily fix this by scheduling deletes for every
147 : //! file that is present only on the remote, because we cannot distinguish the
148 : //! following two cases:
149 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
150 : //! but crashed before it finished remotely.
151 : //! - (2) We never had the file locally because we haven't on-demand downloaded
152 : //! it yet.
153 : //!
154 : //! # Downloads
155 : //!
156 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
157 : //! downloading files from the remote storage. Downloads are performed immediately
158 : //! against the `RemoteStorage`, independently of the upload queue.
159 : //!
160 : //! When we attach a tenant, we perform the following steps:
161 : //! - create `Tenant` object in `TenantState::Attaching` state
162 : //! - List timelines that are present in remote storage, and for each:
163 : //! - download their remote [`IndexPart`]s
164 : //! - create `Timeline` struct and a `RemoteTimelineClient`
165 : //! - initialize the client's upload queue with its `IndexPart`
166 : //! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
167 : //! for layers that are referenced by `IndexPart` but not present locally
168 : //! - schedule uploads for layers that are only present locally.
169 : //! - if the remote `IndexPart`'s metadata was newer than the metadata in
170 : //! the local filesystem, write the remote metadata to the local filesystem
171 : //! - After the above is done for each timeline, open the tenant for business by
172 : //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
173 : //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
174 : //!
175 : //! We keep track of the fact that a client is in `Attaching` state in a marker
176 : //! file on the local disk. This is critical because, when we restart the pageserver,
177 : //! we do not want to do the `List timelines` step for each tenant that has already
178 : //! been successfully attached (for performance & cost reasons).
179 : //! Instead, for a tenant without the attach marker file, we assume that the
180 : //! local state is in sync or ahead of the remote state. This includes the list
181 : //! of all of the tenant's timelines, which is particularly critical to be up-to-date:
182 : //! if there's a timeline on the remote that the pageserver doesn't know about,
183 : //! the GC will not consider its branch point, leading to data loss.
184 : //! So, for a tenant with the attach marker file, we know that we do not yet have
185 : //! persisted all the remote timeline's metadata files locally. To exclude the
186 : //! risk above, we re-run the procedure for such tenants
187 : //!
188 : //! # Operating Without Remote Storage
189 : //!
190 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
191 : //! not created and the uploads are skipped.
192 : //! Theoretically, it should be ok to remove and re-add remote storage configuration to
193 : //! the pageserver config at any time, since it doesn't make a difference to
194 : //! [`Timeline::load_layer_map`].
195 : //! Of course, the remote timeline dir must not change while we have de-configured
196 : //! remote storage, i.e., the pageserver must remain the owner of the given prefix
197 : //! in remote storage.
198 : //! But note that we don't test any of this right now.
199 : //!
200 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
201 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
202 :
203 : mod delete;
204 : mod download;
205 : pub mod index;
206 : mod upload;
207 :
208 : use anyhow::Context;
209 : use chrono::{NaiveDateTime, Utc};
210 : // re-export these
211 : pub use download::{is_temp_download_file, list_remote_timelines};
212 : use scopeguard::ScopeGuard;
213 : use tokio_util::sync::CancellationToken;
214 : use utils::backoff::{
215 : self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
216 : };
217 :
218 : use std::collections::{HashMap, VecDeque};
219 : use std::path::{Path, PathBuf};
220 : use std::sync::atomic::{AtomicU32, Ordering};
221 : use std::sync::{Arc, Mutex};
222 :
223 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
224 : use std::ops::DerefMut;
225 : use tracing::{debug, error, info, instrument, warn};
226 : use tracing::{info_span, Instrument};
227 : use utils::lsn::Lsn;
228 :
229 : use crate::metrics::{
230 : MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
231 : RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
232 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
233 : };
234 : use crate::task_mgr::shutdown_token;
235 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
236 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
237 : use crate::tenant::upload_queue::Delete;
238 : use crate::tenant::TIMELINES_SEGMENT_NAME;
239 : use crate::{
240 : config::PageServerConf,
241 : task_mgr,
242 : task_mgr::TaskKind,
243 : task_mgr::BACKGROUND_RUNTIME,
244 : tenant::metadata::TimelineMetadata,
245 : tenant::upload_queue::{
246 : UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
247 : },
248 : };
249 :
250 : use utils::id::{TenantId, TimelineId};
251 :
252 : use self::index::IndexPart;
253 :
254 : use super::storage_layer::LayerFileName;
255 : use super::upload_queue::SetDeletedFlagProgress;
256 : use super::Generation;
257 :
258 : // Occasional network issues and such can cause remote operations to fail, and
259 : // that's expected. If a download fails, we log it at info-level, and retry.
260 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
261 : // level instead, as repeated failures can mean a more serious problem. If it
262 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
263 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
264 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
265 :
266 : // Similarly log failed uploads and deletions at WARN level, after this many
267 : // retries. Uploads and deletions are retried forever, though.
268 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
269 :
270 : pub enum MaybeDeletedIndexPart {
271 : IndexPart(IndexPart),
272 : Deleted(IndexPart),
273 : }
274 :
275 : /// Errors that can arise when calling [`RemoteTimelineClient::stop`].
276 0 : #[derive(Debug, thiserror::Error)]
277 : pub enum StopError {
278 : /// Returned if the upload queue was never initialized.
279 : /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
280 : #[error("queue is not initialized")]
281 : QueueUninitialized,
282 : }
283 :
284 0 : #[derive(Debug, thiserror::Error)]
285 : pub enum PersistIndexPartWithDeletedFlagError {
286 : #[error("another task is already setting the deleted_flag, started at {0:?}")]
287 : AlreadyInProgress(NaiveDateTime),
288 : #[error("the deleted_flag was already set, value is {0:?}")]
289 : AlreadyDeleted(NaiveDateTime),
290 : #[error(transparent)]
291 : Other(#[from] anyhow::Error),
292 : }
293 :
294 : /// A client for accessing a timeline's data in remote storage.
295 : ///
296 : /// This takes care of managing the number of connections, and balancing them
297 : /// across tenants. This also handles retries of failed uploads.
298 : ///
299 : /// Upload and delete requests are ordered so that before a deletion is
300 : /// performed, we wait for all preceding uploads to finish. This ensures sure
301 : /// that if you perform a compaction operation that reshuffles data in layer
302 : /// files, we don't have a transient state where the old files have already been
303 : /// deleted, but new files have not yet been uploaded.
304 : ///
305 : /// Similarly, this enforces an order between index-file uploads, and layer
306 : /// uploads. Before an index-file upload is performed, all preceding layer
307 : /// uploads must be finished.
308 : ///
309 : /// This also maintains a list of remote files, and automatically includes that
310 : /// in the index part file, whenever timeline metadata is uploaded.
311 : ///
312 : /// Downloads are not queued, they are performed immediately.
313 : pub struct RemoteTimelineClient {
314 : conf: &'static PageServerConf,
315 :
316 : runtime: tokio::runtime::Handle,
317 :
318 : tenant_id: TenantId,
319 : timeline_id: TimelineId,
320 : generation: Generation,
321 :
322 : upload_queue: Mutex<UploadQueue>,
323 :
324 : metrics: Arc<RemoteTimelineClientMetrics>,
325 :
326 : storage_impl: GenericRemoteStorage,
327 : }
328 :
329 : impl RemoteTimelineClient {
330 : ///
331 : /// Create a remote storage client for given timeline
332 : ///
333 : /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
334 : /// by calling init_upload_queue.
335 : ///
336 765 : pub fn new(
337 765 : remote_storage: GenericRemoteStorage,
338 765 : conf: &'static PageServerConf,
339 765 : tenant_id: TenantId,
340 765 : timeline_id: TimelineId,
341 765 : generation: Generation,
342 765 : ) -> RemoteTimelineClient {
343 765 : RemoteTimelineClient {
344 765 : conf,
345 765 : runtime: if cfg!(test) {
346 : // remote_timeline_client.rs tests rely on current-thread runtime
347 143 : tokio::runtime::Handle::current()
348 : } else {
349 622 : BACKGROUND_RUNTIME.handle().clone()
350 : },
351 765 : tenant_id,
352 765 : timeline_id,
353 765 : generation,
354 765 : storage_impl: remote_storage,
355 765 : upload_queue: Mutex::new(UploadQueue::Uninitialized),
356 765 : metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
357 765 : }
358 765 : }
359 :
360 : /// Initialize the upload queue for a remote storage that already received
361 : /// an index file upload, i.e., it's not empty.
362 : /// The given `index_part` must be the one on the remote.
363 178 : pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
364 178 : let mut upload_queue = self.upload_queue.lock().unwrap();
365 178 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
366 178 : self.update_remote_physical_size_gauge(Some(index_part));
367 178 : info!(
368 178 : "initialized upload queue from remote index with {} layer files",
369 178 : index_part.layer_metadata.len()
370 178 : );
371 178 : Ok(())
372 178 : }
373 :
374 : /// Initialize the upload queue for the case where the remote storage is empty,
375 : /// i.e., it doesn't have an `IndexPart`.
376 564 : pub fn init_upload_queue_for_empty_remote(
377 564 : &self,
378 564 : local_metadata: &TimelineMetadata,
379 564 : ) -> anyhow::Result<()> {
380 564 : let mut upload_queue = self.upload_queue.lock().unwrap();
381 564 : upload_queue.initialize_empty_remote(local_metadata)?;
382 564 : self.update_remote_physical_size_gauge(None);
383 564 : info!("initialized upload queue as empty");
384 564 : Ok(())
385 564 : }
386 :
387 : /// Initialize the queue in stopped state. Used in startup path
388 : /// to continue deletion operation interrupted by pageserver crash or restart.
389 21 : pub fn init_upload_queue_stopped_to_continue_deletion(
390 21 : &self,
391 21 : index_part: &IndexPart,
392 21 : ) -> anyhow::Result<()> {
393 : // FIXME: consider newtype for DeletedIndexPart.
394 21 : let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
395 21 : "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
396 21 : ))?;
397 :
398 : {
399 21 : let mut upload_queue = self.upload_queue.lock().unwrap();
400 21 : upload_queue.initialize_with_current_remote_index_part(index_part)?;
401 21 : self.update_remote_physical_size_gauge(Some(index_part));
402 21 : }
403 21 : // also locks upload queue, without dropping the guard above it will be a deadlock
404 21 : self.stop().expect("initialized line above");
405 21 :
406 21 : let mut upload_queue = self.upload_queue.lock().unwrap();
407 21 :
408 21 : upload_queue
409 21 : .stopped_mut()
410 21 : .expect("stopped above")
411 21 : .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
412 21 :
413 21 : Ok(())
414 21 : }
415 :
416 116151 : pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
417 116151 : match &*self.upload_queue.lock().unwrap() {
418 0 : UploadQueue::Uninitialized => None,
419 116020 : UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
420 131 : UploadQueue::Stopped(q) => {
421 131 : Some(q.upload_queue_for_deletion.last_uploaded_consistent_lsn)
422 : }
423 : }
424 116151 : }
425 :
426 5039 : fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
427 5039 : let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
428 4475 : current_remote_index_part
429 4475 : .layer_metadata
430 4475 : .values()
431 4475 : // If we don't have the file size for the layer, don't account for it in the metric.
432 337618 : .map(|ilmd| ilmd.file_size)
433 4475 : .sum()
434 : } else {
435 564 : 0
436 : };
437 5039 : self.metrics.remote_physical_size_gauge().set(size);
438 5039 : }
439 :
440 38 : pub fn get_remote_physical_size(&self) -> u64 {
441 38 : self.metrics.remote_physical_size_gauge().get()
442 38 : }
443 :
444 : //
445 : // Download operations.
446 : //
447 : // These don't use the per-timeline queue. They do use the global semaphore in
448 : // S3Bucket, to limit the total number of concurrent operations, though.
449 : //
450 :
451 : /// Download index file
452 204 : pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
453 204 : let _unfinished_gauge_guard = self.metrics.call_begin(
454 204 : &RemoteOpFileKind::Index,
455 204 : &RemoteOpKind::Download,
456 204 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
457 204 : reason: "no need for a downloads gauge",
458 204 : },
459 204 : );
460 :
461 204 : let index_part = download::download_index_part(
462 204 : &self.storage_impl,
463 204 : &self.tenant_id,
464 204 : &self.timeline_id,
465 204 : self.generation,
466 204 : )
467 204 : .measure_remote_op(
468 204 : self.tenant_id,
469 204 : self.timeline_id,
470 204 : RemoteOpFileKind::Index,
471 204 : RemoteOpKind::Download,
472 204 : Arc::clone(&self.metrics),
473 204 : )
474 667 : .await?;
475 :
476 201 : if index_part.deleted_at.is_some() {
477 21 : Ok(MaybeDeletedIndexPart::Deleted(index_part))
478 : } else {
479 180 : Ok(MaybeDeletedIndexPart::IndexPart(index_part))
480 : }
481 204 : }
482 :
483 : /// Download a (layer) file from `path`, into local filesystem.
484 : ///
485 : /// 'layer_metadata' is the metadata from the remote index file.
486 : ///
487 : /// On success, returns the size of the downloaded file.
488 1043 : pub async fn download_layer_file(
489 1043 : &self,
490 1043 : layer_file_name: &LayerFileName,
491 1043 : layer_metadata: &LayerFileMetadata,
492 1043 : ) -> anyhow::Result<u64> {
493 1008 : let downloaded_size = {
494 1043 : let _unfinished_gauge_guard = self.metrics.call_begin(
495 1043 : &RemoteOpFileKind::Layer,
496 1043 : &RemoteOpKind::Download,
497 1043 : crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
498 1043 : reason: "no need for a downloads gauge",
499 1043 : },
500 1043 : );
501 1043 : download::download_layer_file(
502 1043 : self.conf,
503 1043 : &self.storage_impl,
504 1043 : self.tenant_id,
505 1043 : self.timeline_id,
506 1043 : layer_file_name,
507 1043 : layer_metadata,
508 1043 : )
509 1043 : .measure_remote_op(
510 1043 : self.tenant_id,
511 1043 : self.timeline_id,
512 1043 : RemoteOpFileKind::Layer,
513 1043 : RemoteOpKind::Download,
514 1043 : Arc::clone(&self.metrics),
515 1043 : )
516 360525 : .await?
517 : };
518 :
519 1008 : REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
520 1008 : REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
521 1008 :
522 1008 : Ok(downloaded_size)
523 1040 : }
524 :
525 : //
526 : // Upload operations.
527 : //
528 :
529 : ///
530 : /// Launch an index-file upload operation in the background, with
531 : /// updated metadata.
532 : ///
533 : /// The upload will be added to the queue immediately, but it
534 : /// won't be performed until all previously scheduled layer file
535 : /// upload operations have completed successfully. This is to
536 : /// ensure that when the index file claims that layers X, Y and Z
537 : /// exist in remote storage, they really do. To wait for the upload
538 : /// to complete, use `wait_completion`.
539 : ///
540 : /// If there were any changes to the list of files, i.e. if any
541 : /// layer file uploads were scheduled, since the last index file
542 : /// upload, those will be included too.
543 4109 : pub fn schedule_index_upload_for_metadata_update(
544 4109 : self: &Arc<Self>,
545 4109 : metadata: &TimelineMetadata,
546 4109 : ) -> anyhow::Result<()> {
547 4109 : let mut guard = self.upload_queue.lock().unwrap();
548 4109 : let upload_queue = guard.initialized_mut()?;
549 :
550 : // As documented in the struct definition, it's ok for latest_metadata to be
551 : // ahead of what's _actually_ on the remote during index upload.
552 4109 : upload_queue.latest_metadata = metadata.clone();
553 4109 :
554 4109 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
555 4109 :
556 4109 : Ok(())
557 4109 : }
558 :
559 : ///
560 : /// Launch an index-file upload operation in the background, if necessary.
561 : ///
562 : /// Use this function to schedule the update of the index file after
563 : /// scheduling file uploads or deletions. If no file uploads or deletions
564 : /// have been scheduled since the last index file upload, this does
565 : /// nothing.
566 : ///
567 : /// Like schedule_index_upload_for_metadata_update(), this merely adds
568 : /// the upload to the upload queue and returns quickly.
569 812 : pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
570 812 : let mut guard = self.upload_queue.lock().unwrap();
571 812 : let upload_queue = guard.initialized_mut()?;
572 :
573 812 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
574 11 : self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
575 801 : }
576 :
577 812 : Ok(())
578 812 : }
579 :
580 : /// Launch an index-file upload operation in the background (internal function)
581 4336 : fn schedule_index_upload(
582 4336 : self: &Arc<Self>,
583 4336 : upload_queue: &mut UploadQueueInitialized,
584 4336 : metadata: TimelineMetadata,
585 4336 : ) {
586 4336 : info!(
587 4336 : "scheduling metadata upload with {} files ({} changed)",
588 4336 : upload_queue.latest_files.len(),
589 4336 : upload_queue.latest_files_changes_since_metadata_upload_scheduled,
590 4336 : );
591 :
592 4336 : let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
593 4336 :
594 4336 : let index_part = IndexPart::new(
595 4336 : upload_queue.latest_files.clone(),
596 4336 : disk_consistent_lsn,
597 4336 : metadata,
598 4336 : );
599 4336 : let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
600 4336 : self.calls_unfinished_metric_begin(&op);
601 4336 : upload_queue.queued_operations.push_back(op);
602 4336 : upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
603 4336 :
604 4336 : // Launch the task immediately, if possible
605 4336 : self.launch_queued_tasks(upload_queue);
606 4336 : }
607 :
608 : ///
609 : /// Launch an upload operation in the background.
610 : ///
611 12223 : pub fn schedule_layer_file_upload(
612 12223 : self: &Arc<Self>,
613 12223 : layer_file_name: &LayerFileName,
614 12223 : layer_metadata: &LayerFileMetadata,
615 12223 : ) -> anyhow::Result<()> {
616 12223 : let mut guard = self.upload_queue.lock().unwrap();
617 12223 : let upload_queue = guard.initialized_mut()?;
618 :
619 12223 : upload_queue
620 12223 : .latest_files
621 12223 : .insert(layer_file_name.clone(), layer_metadata.clone());
622 12223 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
623 12223 :
624 12223 : let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
625 12223 : self.calls_unfinished_metric_begin(&op);
626 12223 : upload_queue.queued_operations.push_back(op);
627 12223 :
628 12223 : info!("scheduled layer file upload {layer_file_name}");
629 :
630 : // Launch the task immediately, if possible
631 12223 : self.launch_queued_tasks(upload_queue);
632 12223 : Ok(())
633 12223 : }
634 :
635 : /// Launch a delete operation in the background.
636 : ///
637 : /// The operation does not modify local state but assumes the local files have already been
638 : /// deleted, and is used to mirror those changes to remote.
639 : ///
640 : /// Note: This schedules an index file upload before the deletions. The
641 : /// deletion won't actually be performed, until any previously scheduled
642 : /// upload operations, and the index file upload, have completed
643 : /// successfully.
644 387 : pub fn schedule_layer_file_deletion(
645 387 : self: &Arc<Self>,
646 387 : names: &[LayerFileName],
647 387 : ) -> anyhow::Result<()> {
648 387 : let mut guard = self.upload_queue.lock().unwrap();
649 387 : let upload_queue = guard.initialized_mut()?;
650 :
651 : // Deleting layers doesn't affect the values stored in TimelineMetadata,
652 : // so we don't need update it. Just serialize it.
653 387 : let metadata = upload_queue.latest_metadata.clone();
654 387 :
655 387 : // Update the remote index file, removing the to-be-deleted files from the index,
656 387 : // before deleting the actual files.
657 387 : //
658 387 : // Once we start removing files from upload_queue.latest_files, there's
659 387 : // no going back! Otherwise, some of the files would already be removed
660 387 : // from latest_files, but not yet scheduled for deletion. Use a closure
661 387 : // to syntactically forbid ? or bail! calls here.
662 387 : let no_bail_here = || {
663 387 : // Decorate our list of names with each name's generation, dropping
664 387 : // makes that are unexpectedly missing from our metadata.
665 387 : let with_generations: Vec<_> = names
666 387 : .iter()
667 3548 : .filter_map(|name| {
668 3548 : // Remove from latest_files, learning the file's remote generation in the process
669 3548 : let meta = upload_queue.latest_files.remove(name);
670 :
671 3548 : if let Some(meta) = meta {
672 3547 : upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
673 3547 : Some((name, meta.generation))
674 : } else {
675 : // This can only happen if we forgot to to schedule the file upload
676 : // before scheduling the delete. Log it because it is a rare/strange
677 : // situation, and in case something is misbehaving, we'd like to know which
678 : // layers experienced this.
679 1 : info!(
680 1 : "Deleting layer {name} not found in latest_files list, never uploaded?"
681 1 : );
682 1 : None
683 : }
684 3548 : })
685 387 : .collect();
686 387 :
687 387 : if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
688 216 : self.schedule_index_upload(upload_queue, metadata);
689 216 : }
690 :
691 : // schedule the actual deletions
692 3934 : for (name, generation) in with_generations {
693 3547 : let op = UploadOp::Delete(Delete {
694 3547 : file_kind: RemoteOpFileKind::Layer,
695 3547 : layer_file_name: name.clone(),
696 3547 : scheduled_from_timeline_delete: false,
697 3547 : generation,
698 3547 : });
699 3547 : self.calls_unfinished_metric_begin(&op);
700 3547 : upload_queue.queued_operations.push_back(op);
701 3547 : info!("scheduled layer file deletion {name}");
702 : }
703 :
704 : // Launch the tasks immediately, if possible
705 387 : self.launch_queued_tasks(upload_queue);
706 387 : };
707 387 : no_bail_here();
708 387 : Ok(())
709 387 : }
710 :
711 : ///
712 : /// Wait for all previously scheduled uploads/deletions to complete
713 : ///
714 1027 : pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
715 1027 : let mut receiver = {
716 1027 : let mut guard = self.upload_queue.lock().unwrap();
717 1027 : let upload_queue = guard.initialized_mut()?;
718 1027 : self.schedule_barrier(upload_queue)
719 1027 : };
720 1027 :
721 1027 : if receiver.changed().await.is_err() {
722 1 : anyhow::bail!("wait_completion aborted because upload queue was stopped");
723 1025 : }
724 1025 : Ok(())
725 1026 : }
726 :
727 1230 : fn schedule_barrier(
728 1230 : self: &Arc<Self>,
729 1230 : upload_queue: &mut UploadQueueInitialized,
730 1230 : ) -> tokio::sync::watch::Receiver<()> {
731 1230 : let (sender, receiver) = tokio::sync::watch::channel(());
732 1230 : let barrier_op = UploadOp::Barrier(sender);
733 1230 :
734 1230 : upload_queue.queued_operations.push_back(barrier_op);
735 1230 : // Don't count this kind of operation!
736 1230 :
737 1230 : // Launch the task immediately, if possible
738 1230 : self.launch_queued_tasks(upload_queue);
739 1230 :
740 1230 : receiver
741 1230 : }
742 :
743 : /// Set the deleted_at field in the remote index file.
744 : ///
745 : /// This fails if the upload queue has not been `stop()`ed.
746 : ///
747 : /// The caller is responsible for calling `stop()` AND for waiting
748 : /// for any ongoing upload tasks to finish after `stop()` has succeeded.
749 : /// Check method [`RemoteTimelineClient::stop`] for details.
750 802 : #[instrument(skip_all)]
751 : pub(crate) async fn persist_index_part_with_deleted_flag(
752 : self: &Arc<Self>,
753 : ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
754 : let index_part_with_deleted_at = {
755 : let mut locked = self.upload_queue.lock().unwrap();
756 :
757 : // We must be in stopped state because otherwise
758 : // we can have inprogress index part upload that can overwrite the file
759 : // with missing is_deleted flag that we going to set below
760 : let stopped = locked.stopped_mut()?;
761 :
762 : match stopped.deleted_at {
763 : SetDeletedFlagProgress::NotRunning => (), // proceed
764 : SetDeletedFlagProgress::InProgress(at) => {
765 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
766 : }
767 : SetDeletedFlagProgress::Successful(at) => {
768 : return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
769 : }
770 : };
771 : let deleted_at = Utc::now().naive_utc();
772 : stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
773 :
774 : let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
775 : .context("IndexPart serialize")?;
776 : index_part.deleted_at = Some(deleted_at);
777 : index_part
778 : };
779 :
780 0 : let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
781 0 : let mut locked = self_clone.upload_queue.lock().unwrap();
782 0 : let stopped = locked
783 0 : .stopped_mut()
784 0 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
785 0 : stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
786 0 : });
787 :
788 184 : pausable_failpoint!("persist_deleted_index_part");
789 :
790 : backoff::retry(
791 245 : || {
792 245 : upload::upload_index_part(
793 245 : &self.storage_impl,
794 245 : &self.tenant_id,
795 245 : &self.timeline_id,
796 245 : self.generation,
797 245 : &index_part_with_deleted_at,
798 245 : )
799 245 : },
800 61 : |_e| false,
801 : 1,
802 : // have just a couple of attempts
803 : // when executed as part of timeline deletion this happens in context of api call
804 : // when executed as part of tenant deletion this happens in the background
805 : 2,
806 : "persist_index_part_with_deleted_flag",
807 : // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
808 0 : backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
809 : )
810 : .await?;
811 :
812 : // all good, disarm the guard and mark as success
813 : ScopeGuard::into_inner(undo_deleted_at);
814 : {
815 : let mut locked = self.upload_queue.lock().unwrap();
816 :
817 : let stopped = locked
818 : .stopped_mut()
819 : .expect("there's no way out of Stopping, and we checked it's Stopping above");
820 : stopped.deleted_at = SetDeletedFlagProgress::Successful(
821 : index_part_with_deleted_at
822 : .deleted_at
823 : .expect("we set it above"),
824 : );
825 : }
826 :
827 : Ok(())
828 : }
829 :
830 : /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
831 : /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
832 : /// deletes leaked files if any and proceeds with deletion of index file at the end.
833 203 : pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
834 203 : debug_assert_current_span_has_tenant_and_timeline_id();
835 :
836 203 : let (mut receiver, deletions_queued) = {
837 203 : let mut deletions_queued = 0;
838 203 :
839 203 : let mut locked = self.upload_queue.lock().unwrap();
840 203 : let stopped = locked.stopped_mut()?;
841 :
842 203 : if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
843 0 : anyhow::bail!("deleted_at is not set")
844 203 : }
845 :
846 203 : debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
847 :
848 203 : stopped
849 203 : .upload_queue_for_deletion
850 203 : .queued_operations
851 203 : .reserve(stopped.upload_queue_for_deletion.latest_files.len());
852 :
853 : // schedule the actual deletions
854 5573 : for (name, meta) in &stopped.upload_queue_for_deletion.latest_files {
855 5370 : let op = UploadOp::Delete(Delete {
856 5370 : file_kind: RemoteOpFileKind::Layer,
857 5370 : layer_file_name: name.clone(),
858 5370 : scheduled_from_timeline_delete: true,
859 5370 : generation: meta.generation,
860 5370 : });
861 5370 :
862 5370 : self.calls_unfinished_metric_begin(&op);
863 5370 : stopped
864 5370 : .upload_queue_for_deletion
865 5370 : .queued_operations
866 5370 : .push_back(op);
867 :
868 5370 : info!("scheduled layer file deletion {name}");
869 5370 : deletions_queued += 1;
870 : }
871 :
872 203 : self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion);
873 203 :
874 203 : (
875 203 : self.schedule_barrier(&mut stopped.upload_queue_for_deletion),
876 203 : deletions_queued,
877 203 : )
878 203 : };
879 203 :
880 203 : receiver.changed().await.context("upload queue shut down")?;
881 :
882 : // Do not delete index part yet, it is needed for possible retry. If we remove it first
883 : // and retry will arrive to different pageserver there wont be any traces of it on remote storage
884 203 : let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
885 :
886 203 : let remaining = backoff::retry(
887 267 : || async {
888 267 : self.storage_impl
889 267 : .list_files(Some(&timeline_storage_path))
890 1172 : .await
891 267 : },
892 203 : |_e| false,
893 203 : FAILED_DOWNLOAD_WARN_THRESHOLD,
894 203 : FAILED_REMOTE_OP_RETRIES,
895 203 : "list_prefixes",
896 203 : backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
897 203 : )
898 1172 : .await
899 203 : .context("list prefixes")?;
900 :
901 203 : let remaining: Vec<RemotePath> = remaining
902 203 : .into_iter()
903 1086 : .filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
904 203 : .inspect(|path| {
905 891 : if let Some(name) = path.object_name() {
906 891 : info!(%name, "deleting a file not referenced from index_part.json");
907 : } else {
908 0 : warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
909 : }
910 891 : })
911 203 : .collect();
912 203 :
913 203 : if !remaining.is_empty() {
914 46 : backoff::retry(
915 638 : || async { self.storage_impl.delete_objects(&remaining).await },
916 46 : |_e| false,
917 46 : FAILED_UPLOAD_WARN_THRESHOLD,
918 46 : FAILED_REMOTE_OP_RETRIES,
919 46 : "delete_objects",
920 46 : backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
921 46 : )
922 638 : .await
923 46 : .context("delete_objects")?;
924 157 : }
925 :
926 203 : fail::fail_point!("timeline-delete-before-index-delete", |_| {
927 12 : Err(anyhow::anyhow!(
928 12 : "failpoint: timeline-delete-before-index-delete"
929 12 : ))?
930 203 : });
931 :
932 191 : let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
933 :
934 0 : debug!("deleting index part");
935 :
936 191 : backoff::retry(
937 729 : || async { self.storage_impl.delete(&index_file_path).await },
938 191 : |_e| false,
939 191 : FAILED_UPLOAD_WARN_THRESHOLD,
940 191 : FAILED_REMOTE_OP_RETRIES,
941 191 : "delete_index",
942 191 : backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled")),
943 191 : )
944 729 : .await
945 191 : .context("delete_index")?;
946 :
947 191 : fail::fail_point!("timeline-delete-after-index-delete", |_| {
948 4 : Err(anyhow::anyhow!(
949 4 : "failpoint: timeline-delete-after-index-delete"
950 4 : ))?
951 191 : });
952 :
953 187 : info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json");
954 :
955 187 : Ok(())
956 203 : }
957 :
958 : ///
959 : /// Pick next tasks from the queue, and start as many of them as possible without violating
960 : /// the ordering constraints.
961 : ///
962 : /// The caller needs to already hold the `upload_queue` lock.
963 41305 : fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
964 67122 : while let Some(next_op) = upload_queue.queued_operations.front() {
965 : // Can we run this task now?
966 46320 : let can_run_now = match next_op {
967 : UploadOp::UploadLayer(_, _) => {
968 : // Can always be scheduled.
969 12215 : true
970 : }
971 : UploadOp::UploadMetadata(_, _) => {
972 : // These can only be performed after all the preceding operations
973 : // have finished.
974 18365 : upload_queue.inprogress_tasks.is_empty()
975 : }
976 : UploadOp::Delete(_) => {
977 : // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
978 8283 : upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
979 : }
980 :
981 7457 : UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
982 : };
983 :
984 : // If we cannot launch this task, don't look any further.
985 : //
986 : // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
987 : // them now, but we don't try to do that currently. For example, if the frontmost task
988 : // is an index-file upload that cannot proceed until preceding uploads have finished, we
989 : // could still start layer uploads that were scheduled later.
990 46320 : if !can_run_now {
991 20503 : break;
992 25817 : }
993 25817 :
994 25817 : // We can launch this task. Remove it from the queue first.
995 25817 : let next_op = upload_queue.queued_operations.pop_front().unwrap();
996 25817 :
997 25817 : debug!("starting op: {}", next_op);
998 :
999 : // Update the counters
1000 25817 : match next_op {
1001 12215 : UploadOp::UploadLayer(_, _) => {
1002 12215 : upload_queue.num_inprogress_layer_uploads += 1;
1003 12215 : }
1004 4280 : UploadOp::UploadMetadata(_, _) => {
1005 4280 : upload_queue.num_inprogress_metadata_uploads += 1;
1006 4280 : }
1007 8094 : UploadOp::Delete(_) => {
1008 8094 : upload_queue.num_inprogress_deletions += 1;
1009 8094 : }
1010 1228 : UploadOp::Barrier(sender) => {
1011 1228 : sender.send_replace(());
1012 1228 : continue;
1013 : }
1014 : };
1015 :
1016 : // Assign unique ID to this task
1017 24589 : upload_queue.task_counter += 1;
1018 24589 : let upload_task_id = upload_queue.task_counter;
1019 24589 :
1020 24589 : // Add it to the in-progress map
1021 24589 : let task = Arc::new(UploadTask {
1022 24589 : task_id: upload_task_id,
1023 24589 : op: next_op,
1024 24589 : retries: AtomicU32::new(0),
1025 24589 : });
1026 24589 : upload_queue
1027 24589 : .inprogress_tasks
1028 24589 : .insert(task.task_id, Arc::clone(&task));
1029 24589 :
1030 24589 : // Spawn task to perform the task
1031 24589 : let self_rc = Arc::clone(self);
1032 24589 : let tenant_id = self.tenant_id;
1033 24589 : let timeline_id = self.timeline_id;
1034 24589 : task_mgr::spawn(
1035 24589 : &self.runtime,
1036 24589 : TaskKind::RemoteUploadTask,
1037 24589 : Some(self.tenant_id),
1038 24589 : Some(self.timeline_id),
1039 24589 : "remote upload",
1040 : false,
1041 24588 : async move {
1042 1386546 : self_rc.perform_upload_task(task).await;
1043 24576 : Ok(())
1044 24576 : }
1045 24589 : .instrument(info_span!(parent: None, "remote_upload", %tenant_id, %timeline_id, %upload_task_id)),
1046 : );
1047 :
1048 : // Loop back to process next task
1049 : }
1050 41305 : }
1051 :
1052 : ///
1053 : /// Perform an upload task.
1054 : ///
1055 : /// The task is in the `inprogress_tasks` list. This function will try to
1056 : /// execute it, retrying forever. On successful completion, the task is
1057 : /// removed it from the `inprogress_tasks` list, and any next task(s) in the
1058 : /// queue that were waiting by the completion are launched.
1059 : ///
1060 : /// The task can be shut down, however. That leads to stopping the whole
1061 : /// queue.
1062 : ///
1063 24588 : async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
1064 : // Loop to retry until it completes.
1065 29222 : loop {
1066 29222 : // If we're requested to shut down, close up shop and exit.
1067 29222 : //
1068 29222 : // Note: We only check for the shutdown requests between retries, so
1069 29222 : // if a shutdown request arrives while we're busy uploading, in the
1070 29222 : // upload::upload:*() call below, we will wait not exit until it has
1071 29222 : // finished. We probably could cancel the upload by simply dropping
1072 29222 : // the Future, but we're not 100% sure if the remote storage library
1073 29222 : // is cancellation safe, so we don't dare to do that. Hopefully, the
1074 29222 : // upload finishes or times out soon enough.
1075 29222 : if task_mgr::is_shutdown_requested() {
1076 109 : info!("upload task cancelled by shutdown request");
1077 109 : match self.stop() {
1078 109 : Ok(()) => {}
1079 : Err(StopError::QueueUninitialized) => {
1080 0 : unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
1081 : }
1082 : }
1083 109 : return;
1084 29113 : }
1085 :
1086 29113 : let upload_result: anyhow::Result<()> = match &task.op {
1087 14125 : UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
1088 14125 : let path = self
1089 14125 : .conf
1090 14125 : .timeline_path(&self.tenant_id, &self.timeline_id)
1091 14125 : .join(layer_file_name.file_name());
1092 14125 :
1093 14125 : upload::upload_timeline_layer(
1094 14125 : self.conf,
1095 14125 : &self.storage_impl,
1096 14125 : &path,
1097 14125 : layer_metadata,
1098 14125 : self.generation,
1099 14125 : )
1100 14125 : .measure_remote_op(
1101 14125 : self.tenant_id,
1102 14125 : self.timeline_id,
1103 14125 : RemoteOpFileKind::Layer,
1104 14125 : RemoteOpKind::Upload,
1105 14125 : Arc::clone(&self.metrics),
1106 14125 : )
1107 1341928 : .await
1108 : }
1109 5001 : UploadOp::UploadMetadata(ref index_part, _lsn) => {
1110 5001 : let mention_having_future_layers = if cfg!(feature = "testing") {
1111 5001 : index_part
1112 5001 : .layer_metadata
1113 5001 : .keys()
1114 339100 : .any(|x| x.is_in_future(*_lsn))
1115 : } else {
1116 0 : false
1117 : };
1118 :
1119 5001 : let res = upload::upload_index_part(
1120 5001 : &self.storage_impl,
1121 5001 : &self.tenant_id,
1122 5001 : &self.timeline_id,
1123 5001 : self.generation,
1124 5001 : index_part,
1125 5001 : )
1126 5001 : .measure_remote_op(
1127 5001 : self.tenant_id,
1128 5001 : self.timeline_id,
1129 5001 : RemoteOpFileKind::Index,
1130 5001 : RemoteOpKind::Upload,
1131 5001 : Arc::clone(&self.metrics),
1132 5001 : )
1133 13983 : .await;
1134 4999 : if res.is_ok() {
1135 4276 : self.update_remote_physical_size_gauge(Some(index_part));
1136 4276 : if mention_having_future_layers {
1137 : // find rationale near crate::tenant::timeline::init::cleanup_future_layer
1138 5 : tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
1139 4271 : }
1140 723 : }
1141 4999 : res
1142 : }
1143 9987 : UploadOp::Delete(delete) => {
1144 9987 : let path = &self
1145 9987 : .conf
1146 9987 : .timeline_path(&self.tenant_id, &self.timeline_id)
1147 9987 : .join(delete.layer_file_name.file_name());
1148 9987 : delete::delete_layer(self.conf, &self.storage_impl, path, delete.generation)
1149 9987 : .measure_remote_op(
1150 9987 : self.tenant_id,
1151 9987 : self.timeline_id,
1152 9987 : delete.file_kind,
1153 9987 : RemoteOpKind::Delete,
1154 9987 : Arc::clone(&self.metrics),
1155 9987 : )
1156 30629 : .await
1157 : }
1158 : UploadOp::Barrier(_) => {
1159 : // unreachable. Barrier operations are handled synchronously in
1160 : // launch_queued_tasks
1161 0 : warn!("unexpected Barrier operation in perform_upload_task");
1162 0 : break;
1163 : }
1164 : };
1165 :
1166 29102 : match upload_result {
1167 : Ok(()) => {
1168 24467 : break;
1169 : }
1170 4635 : Err(e) => {
1171 4635 : let retries = task.retries.fetch_add(1, Ordering::SeqCst);
1172 4635 :
1173 4635 : // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
1174 4635 : // or other external reasons. Such issues are relatively regular, so log them
1175 4635 : // at info level at first, and only WARN if the operation fails repeatedly.
1176 4635 : //
1177 4635 : // (See similar logic for downloads in `download::download_retry`)
1178 4635 : if retries < FAILED_UPLOAD_WARN_THRESHOLD {
1179 4633 : info!(
1180 4633 : "failed to perform remote task {}, will retry (attempt {}): {:#}",
1181 4633 : task.op, retries, e
1182 4633 : );
1183 : } else {
1184 2 : warn!(
1185 2 : "failed to perform remote task {}, will retry (attempt {}): {:?}",
1186 2 : task.op, retries, e
1187 2 : );
1188 : }
1189 :
1190 : // sleep until it's time to retry, or we're cancelled
1191 4635 : exponential_backoff(
1192 4635 : retries,
1193 4635 : DEFAULT_BASE_BACKOFF_SECONDS,
1194 4635 : DEFAULT_MAX_BACKOFF_SECONDS,
1195 4635 : &shutdown_token(),
1196 4635 : )
1197 6 : .await;
1198 : }
1199 : }
1200 : }
1201 :
1202 24467 : let retries = task.retries.load(Ordering::SeqCst);
1203 24467 : if retries > 0 {
1204 4517 : info!(
1205 4517 : "remote task {} completed successfully after {} retries",
1206 4517 : task.op, retries
1207 4517 : );
1208 : } else {
1209 0 : debug!("remote task {} completed successfully", task.op);
1210 : }
1211 :
1212 : // The task has completed successfully. Remove it from the in-progress list.
1213 : {
1214 24467 : let mut upload_queue_guard = self.upload_queue.lock().unwrap();
1215 24467 : let upload_queue = match upload_queue_guard.deref_mut() {
1216 0 : UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
1217 6911 : UploadQueue::Stopped(stopped) => {
1218 6911 : // Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion)
1219 6911 : // then stop() took care of it so we just return.
1220 6911 : // For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc.
1221 6911 : match &task.op {
1222 5440 : UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion),
1223 1541 : _ => None
1224 : }
1225 : },
1226 17556 : UploadQueue::Initialized(qi) => { Some(qi) }
1227 : };
1228 :
1229 24467 : let upload_queue = match upload_queue {
1230 22926 : Some(upload_queue) => upload_queue,
1231 : None => {
1232 1541 : info!("another concurrent task already stopped the queue");
1233 1541 : return;
1234 : }
1235 : };
1236 :
1237 22926 : upload_queue.inprogress_tasks.remove(&task.task_id);
1238 22926 :
1239 22926 : match task.op {
1240 10742 : UploadOp::UploadLayer(_, _) => {
1241 10742 : upload_queue.num_inprogress_layer_uploads -= 1;
1242 10742 : }
1243 4268 : UploadOp::UploadMetadata(_, lsn) => {
1244 4268 : upload_queue.num_inprogress_metadata_uploads -= 1;
1245 4268 : upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
1246 4268 : }
1247 7916 : UploadOp::Delete(_) => {
1248 7916 : upload_queue.num_inprogress_deletions -= 1;
1249 7916 : }
1250 0 : UploadOp::Barrier(_) => unreachable!(),
1251 : };
1252 :
1253 : // Launch any queued tasks that were unblocked by this one.
1254 22926 : self.launch_queued_tasks(upload_queue);
1255 22926 : }
1256 22926 : self.calls_unfinished_metric_end(&task.op);
1257 24576 : }
1258 :
1259 49278 : fn calls_unfinished_metric_impl(
1260 49278 : &self,
1261 49278 : op: &UploadOp,
1262 49278 : ) -> Option<(
1263 49278 : RemoteOpFileKind,
1264 49278 : RemoteOpKind,
1265 49278 : RemoteTimelineClientMetricsCallTrackSize,
1266 49278 : )> {
1267 : use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
1268 49278 : let res = match op {
1269 22973 : UploadOp::UploadLayer(_, m) => (
1270 22973 : RemoteOpFileKind::Layer,
1271 22973 : RemoteOpKind::Upload,
1272 22973 : RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
1273 22973 : ),
1274 8651 : UploadOp::UploadMetadata(_, _) => (
1275 8651 : RemoteOpFileKind::Index,
1276 8651 : RemoteOpKind::Upload,
1277 8651 : DontTrackSize {
1278 8651 : reason: "metadata uploads are tiny",
1279 8651 : },
1280 8651 : ),
1281 17653 : UploadOp::Delete(delete) => (
1282 17653 : delete.file_kind,
1283 17653 : RemoteOpKind::Delete,
1284 17653 : DontTrackSize {
1285 17653 : reason: "should we track deletes? positive or negative sign?",
1286 17653 : },
1287 17653 : ),
1288 : UploadOp::Barrier(_) => {
1289 : // we do not account these
1290 1 : return None;
1291 : }
1292 : };
1293 49277 : Some(res)
1294 49278 : }
1295 :
1296 25476 : fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
1297 25476 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1298 25476 : Some(x) => x,
1299 0 : None => return,
1300 : };
1301 25476 : let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
1302 25476 : guard.will_decrement_manually(); // in unfinished_ops_metric_end()
1303 25476 : }
1304 :
1305 23802 : fn calls_unfinished_metric_end(&self, op: &UploadOp) {
1306 23802 : let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
1307 23801 : Some(x) => x,
1308 1 : None => return,
1309 : };
1310 23801 : self.metrics.call_end(&file_kind, &op_kind, track_bytes);
1311 23802 : }
1312 :
1313 : /// Close the upload queue for new operations and cancel queued operations.
1314 : /// In-progress operations will still be running after this function returns.
1315 : /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
1316 : /// to wait for them to complete, after calling this function.
1317 344 : pub fn stop(&self) -> Result<(), StopError> {
1318 344 : // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
1319 344 : // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
1320 344 : // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
1321 344 : let mut guard = self.upload_queue.lock().unwrap();
1322 344 : match &mut *guard {
1323 0 : UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
1324 : UploadQueue::Stopped(_) => {
1325 : // nothing to do
1326 135 : info!("another concurrent task already shut down the queue");
1327 135 : Ok(())
1328 : }
1329 209 : UploadQueue::Initialized(initialized) => {
1330 209 : info!("shutting down upload queue");
1331 :
1332 : // Replace the queue with the Stopped state, taking ownership of the old
1333 : // Initialized queue. We will do some checks on it, and then drop it.
1334 209 : let qi = {
1335 : // Here we preserve working version of the upload queue for possible use during deletions.
1336 : // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
1337 : // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
1338 : // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
1339 209 : let upload_queue_for_deletion = UploadQueueInitialized {
1340 209 : task_counter: 0,
1341 209 : latest_files: initialized.latest_files.clone(),
1342 209 : latest_files_changes_since_metadata_upload_scheduled: 0,
1343 209 : latest_metadata: initialized.latest_metadata.clone(),
1344 209 : last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn,
1345 209 : num_inprogress_layer_uploads: 0,
1346 209 : num_inprogress_metadata_uploads: 0,
1347 209 : num_inprogress_deletions: 0,
1348 209 : inprogress_tasks: HashMap::default(),
1349 209 : queued_operations: VecDeque::default(),
1350 209 : };
1351 209 :
1352 209 : let upload_queue = std::mem::replace(
1353 209 : &mut *guard,
1354 209 : UploadQueue::Stopped(UploadQueueStopped {
1355 209 : upload_queue_for_deletion,
1356 209 : deleted_at: SetDeletedFlagProgress::NotRunning,
1357 209 : }),
1358 209 : );
1359 209 : if let UploadQueue::Initialized(qi) = upload_queue {
1360 209 : qi
1361 : } else {
1362 0 : unreachable!("we checked in the match above that it is Initialized");
1363 : }
1364 : };
1365 :
1366 : // consistency check
1367 209 : assert_eq!(
1368 209 : qi.num_inprogress_layer_uploads
1369 209 : + qi.num_inprogress_metadata_uploads
1370 209 : + qi.num_inprogress_deletions,
1371 209 : qi.inprogress_tasks.len()
1372 209 : );
1373 :
1374 : // We don't need to do anything here for in-progress tasks. They will finish
1375 : // on their own, decrement the unfinished-task counter themselves, and observe
1376 : // that the queue is Stopped.
1377 209 : drop(qi.inprogress_tasks);
1378 :
1379 : // Tear down queued ops
1380 876 : for op in qi.queued_operations.into_iter() {
1381 876 : self.calls_unfinished_metric_end(&op);
1382 876 : // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
1383 876 : // which is exactly what we want to happen.
1384 876 : drop(op);
1385 876 : }
1386 :
1387 : // We're done.
1388 209 : drop(guard);
1389 209 : Ok(())
1390 : }
1391 : }
1392 344 : }
1393 : }
1394 :
1395 245 : pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
1396 245 : let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
1397 245 : RemotePath::from_string(&path).expect("Failed to construct path")
1398 245 : }
1399 :
1400 203 : pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
1401 203 : remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string()))
1402 203 : }
1403 :
1404 1043 : pub fn remote_layer_path(
1405 1043 : tenant_id: &TenantId,
1406 1043 : timeline_id: &TimelineId,
1407 1043 : layer_file_name: &LayerFileName,
1408 1043 : layer_meta: &LayerFileMetadata,
1409 1043 : ) -> RemotePath {
1410 1043 : // Generation-aware key format
1411 1043 : let path = format!(
1412 1043 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1413 1043 : layer_file_name.file_name(),
1414 1043 : layer_meta.generation.get_suffix()
1415 1043 : );
1416 1043 :
1417 1043 : RemotePath::from_string(&path).expect("Failed to construct path")
1418 1043 : }
1419 :
1420 5443 : pub fn remote_index_path(
1421 5443 : tenant_id: &TenantId,
1422 5443 : timeline_id: &TimelineId,
1423 5443 : generation: Generation,
1424 5443 : ) -> RemotePath {
1425 5443 : RemotePath::from_string(&format!(
1426 5443 : "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
1427 5443 : IndexPart::FILE_NAME,
1428 5443 : generation.get_suffix()
1429 5443 : ))
1430 5443 : .expect("Failed to construct path")
1431 5443 : }
1432 :
1433 : /// Files on the remote storage are stored with paths, relative to the workdir.
1434 : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
1435 : ///
1436 : /// Errors if the path provided does not start from pageserver's workdir.
1437 24108 : pub fn remote_path(
1438 24108 : conf: &PageServerConf,
1439 24108 : local_path: &Path,
1440 24108 : generation: Generation,
1441 24108 : ) -> anyhow::Result<RemotePath> {
1442 24108 : let stripped = local_path
1443 24108 : .strip_prefix(&conf.workdir)
1444 24108 : .context("Failed to strip workdir prefix")?;
1445 :
1446 24108 : let suffixed = format!(
1447 24108 : "{0}{1}",
1448 24108 : stripped.to_string_lossy(),
1449 24108 : generation.get_suffix()
1450 24108 : );
1451 24108 :
1452 24108 : RemotePath::new(&PathBuf::from(suffixed)).with_context(|| {
1453 0 : format!(
1454 0 : "to resolve remote part of path {:?} for base {:?}",
1455 0 : local_path, conf.workdir
1456 0 : )
1457 24108 : })
1458 24108 : }
1459 :
1460 : #[cfg(test)]
1461 : mod tests {
1462 : use super::*;
1463 : use crate::{
1464 : context::RequestContext,
1465 : tenant::{
1466 : harness::{TenantHarness, TIMELINE_ID},
1467 : Generation, Tenant, Timeline,
1468 : },
1469 : DEFAULT_PG_VERSION,
1470 : };
1471 :
1472 : use std::{collections::HashSet, path::Path};
1473 : use utils::lsn::Lsn;
1474 :
1475 4 : pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
1476 4 : format!("contents for {name}").into()
1477 4 : }
1478 :
1479 1 : pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
1480 1 : let metadata = TimelineMetadata::new(
1481 1 : disk_consistent_lsn,
1482 1 : None,
1483 1 : None,
1484 1 : Lsn(0),
1485 1 : Lsn(0),
1486 1 : Lsn(0),
1487 1 : // Any version will do
1488 1 : // but it should be consistent with the one in the tests
1489 1 : crate::DEFAULT_PG_VERSION,
1490 1 : );
1491 1 :
1492 1 : // go through serialize + deserialize to fix the header, including checksum
1493 1 : TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
1494 1 : }
1495 :
1496 1 : fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
1497 3 : let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
1498 1 : avec.sort();
1499 1 :
1500 1 : let mut bvec = b.to_vec();
1501 1 : bvec.sort_unstable();
1502 1 :
1503 1 : assert_eq!(avec, bvec);
1504 1 : }
1505 :
1506 2 : fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) {
1507 2 : let mut expected: Vec<String> = expected
1508 2 : .iter()
1509 8 : .map(|x| format!("{}{}", x, generation.get_suffix()))
1510 2 : .collect();
1511 2 : expected.sort();
1512 2 :
1513 2 : let mut found: Vec<String> = Vec::new();
1514 8 : for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
1515 8 : let entry_name = entry.file_name();
1516 8 : let fname = entry_name.to_str().unwrap();
1517 8 : found.push(String::from(fname));
1518 8 : }
1519 2 : found.sort();
1520 2 :
1521 2 : assert_eq!(found, expected);
1522 2 : }
1523 :
1524 : struct TestSetup {
1525 : harness: TenantHarness,
1526 : tenant: Arc<Tenant>,
1527 : timeline: Arc<Timeline>,
1528 : tenant_ctx: RequestContext,
1529 : }
1530 :
1531 : impl TestSetup {
1532 2 : async fn new(test_name: &str) -> anyhow::Result<Self> {
1533 2 : // Use a current-thread runtime in the test
1534 2 : let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
1535 2 : let harness = TenantHarness::create(test_name)?;
1536 2 : let (tenant, ctx) = harness.load().await;
1537 :
1538 2 : let timeline = tenant
1539 2 : .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
1540 4 : .await?;
1541 :
1542 2 : Ok(Self {
1543 2 : harness,
1544 2 : tenant,
1545 2 : timeline,
1546 2 : tenant_ctx: ctx,
1547 2 : })
1548 2 : }
1549 : }
1550 :
1551 : // Test scheduling
1552 1 : #[tokio::test]
1553 1 : async fn upload_scheduling() {
1554 : // Test outline:
1555 : //
1556 : // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
1557 : // Schedule upload of index. Check that it is queued
1558 : // let the layer file uploads finish. Check that the index-upload is now started
1559 : // let the index-upload finish.
1560 : //
1561 : // Download back the index.json. Check that the list of files is correct
1562 : //
1563 : // Schedule upload. Schedule deletion. Check that the deletion is queued
1564 : // let upload finish. Check that deletion is now started
1565 : // Schedule another deletion. Check that it's launched immediately.
1566 : // Schedule index upload. Check that it's queued
1567 :
1568 : let TestSetup {
1569 1 : harness,
1570 1 : tenant: _tenant,
1571 1 : timeline,
1572 1 : tenant_ctx: _tenant_ctx,
1573 3 : } = TestSetup::new("upload_scheduling").await.unwrap();
1574 1 :
1575 1 : let client = timeline.remote_client.as_ref().unwrap();
1576 :
1577 : // Download back the index.json, and check that the list of files is correct
1578 3 : let initial_index_part = match client.download_index_file().await.unwrap() {
1579 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1580 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1581 : };
1582 1 : let initial_layers = initial_index_part
1583 1 : .layer_metadata
1584 1 : .keys()
1585 1 : .map(|f| f.to_owned())
1586 1 : .collect::<HashSet<LayerFileName>>();
1587 1 : let initial_layer = {
1588 1 : assert!(initial_layers.len() == 1);
1589 1 : initial_layers.into_iter().next().unwrap()
1590 1 : };
1591 1 :
1592 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
1593 1 :
1594 1 : println!("workdir: {}", harness.conf.workdir.display());
1595 1 :
1596 1 : let remote_timeline_dir = harness
1597 1 : .remote_fs_dir
1598 1 : .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
1599 1 : println!("remote_timeline_dir: {}", remote_timeline_dir.display());
1600 1 :
1601 1 : let generation = harness.generation;
1602 1 :
1603 1 : // Create a couple of dummy files, schedule upload for them
1604 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
1605 1 : let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
1606 1 : let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
1607 1 : let content_1 = dummy_contents("foo");
1608 1 : let content_2 = dummy_contents("bar");
1609 1 : let content_3 = dummy_contents("baz");
1610 :
1611 3 : for (filename, content) in [
1612 1 : (&layer_file_name_1, &content_1),
1613 1 : (&layer_file_name_2, &content_2),
1614 1 : (&layer_file_name_3, &content_3),
1615 3 : ] {
1616 3 : std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
1617 3 : }
1618 :
1619 1 : client
1620 1 : .schedule_layer_file_upload(
1621 1 : &layer_file_name_1,
1622 1 : &LayerFileMetadata::new(content_1.len() as u64, generation),
1623 1 : )
1624 1 : .unwrap();
1625 1 : client
1626 1 : .schedule_layer_file_upload(
1627 1 : &layer_file_name_2,
1628 1 : &LayerFileMetadata::new(content_2.len() as u64, generation),
1629 1 : )
1630 1 : .unwrap();
1631 1 :
1632 1 : // Check that they are started immediately, not queued
1633 1 : //
1634 1 : // this works because we running within block_on, so any futures are now queued up until
1635 1 : // our next await point.
1636 1 : {
1637 1 : let mut guard = client.upload_queue.lock().unwrap();
1638 1 : let upload_queue = guard.initialized_mut().unwrap();
1639 1 : assert!(upload_queue.queued_operations.is_empty());
1640 1 : assert!(upload_queue.inprogress_tasks.len() == 2);
1641 1 : assert!(upload_queue.num_inprogress_layer_uploads == 2);
1642 :
1643 : // also check that `latest_file_changes` was updated
1644 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
1645 : }
1646 :
1647 : // Schedule upload of index. Check that it is queued
1648 1 : let metadata = dummy_metadata(Lsn(0x20));
1649 1 : client
1650 1 : .schedule_index_upload_for_metadata_update(&metadata)
1651 1 : .unwrap();
1652 1 : {
1653 1 : let mut guard = client.upload_queue.lock().unwrap();
1654 1 : let upload_queue = guard.initialized_mut().unwrap();
1655 1 : assert!(upload_queue.queued_operations.len() == 1);
1656 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
1657 : }
1658 :
1659 : // Wait for the uploads to finish
1660 1 : client.wait_completion().await.unwrap();
1661 1 : {
1662 1 : let mut guard = client.upload_queue.lock().unwrap();
1663 1 : let upload_queue = guard.initialized_mut().unwrap();
1664 1 :
1665 1 : assert!(upload_queue.queued_operations.is_empty());
1666 1 : assert!(upload_queue.inprogress_tasks.is_empty());
1667 : }
1668 :
1669 : // Download back the index.json, and check that the list of files is correct
1670 3 : let index_part = match client.download_index_file().await.unwrap() {
1671 1 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1672 0 : MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
1673 : };
1674 :
1675 1 : assert_file_list(
1676 1 : &index_part
1677 1 : .layer_metadata
1678 1 : .keys()
1679 3 : .map(|f| f.to_owned())
1680 1 : .collect(),
1681 1 : &[
1682 1 : &initial_layer.file_name(),
1683 1 : &layer_file_name_1.file_name(),
1684 1 : &layer_file_name_2.file_name(),
1685 1 : ],
1686 1 : );
1687 1 : assert_eq!(index_part.metadata, metadata);
1688 :
1689 : // Schedule upload and then a deletion. Check that the deletion is queued
1690 1 : client
1691 1 : .schedule_layer_file_upload(
1692 1 : &layer_file_name_3,
1693 1 : &LayerFileMetadata::new(content_3.len() as u64, generation),
1694 1 : )
1695 1 : .unwrap();
1696 1 : client
1697 1 : .schedule_layer_file_deletion(&[layer_file_name_1.clone()])
1698 1 : .unwrap();
1699 1 : {
1700 1 : let mut guard = client.upload_queue.lock().unwrap();
1701 1 : let upload_queue = guard.initialized_mut().unwrap();
1702 1 :
1703 1 : // Deletion schedules upload of the index file, and the file deletion itself
1704 1 : assert!(upload_queue.queued_operations.len() == 2);
1705 1 : assert!(upload_queue.inprogress_tasks.len() == 1);
1706 1 : assert!(upload_queue.num_inprogress_layer_uploads == 1);
1707 1 : assert!(upload_queue.num_inprogress_deletions == 0);
1708 1 : assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
1709 : }
1710 1 : assert_remote_files(
1711 1 : &[
1712 1 : &initial_layer.file_name(),
1713 1 : &layer_file_name_1.file_name(),
1714 1 : &layer_file_name_2.file_name(),
1715 1 : "index_part.json",
1716 1 : ],
1717 1 : &remote_timeline_dir,
1718 1 : generation,
1719 1 : );
1720 1 :
1721 1 : // Finish them
1722 1 : client.wait_completion().await.unwrap();
1723 1 :
1724 1 : assert_remote_files(
1725 1 : &[
1726 1 : &initial_layer.file_name(),
1727 1 : &layer_file_name_2.file_name(),
1728 1 : &layer_file_name_3.file_name(),
1729 1 : "index_part.json",
1730 1 : ],
1731 1 : &remote_timeline_dir,
1732 1 : generation,
1733 1 : );
1734 : }
1735 :
1736 1 : #[tokio::test]
1737 1 : async fn bytes_unfinished_gauge_for_layer_file_uploads() {
1738 : // Setup
1739 :
1740 : let TestSetup {
1741 1 : harness,
1742 1 : tenant: _tenant,
1743 1 : timeline,
1744 : ..
1745 3 : } = TestSetup::new("metrics").await.unwrap();
1746 1 : let client = timeline.remote_client.as_ref().unwrap();
1747 1 : let timeline_path = harness.timeline_path(&TIMELINE_ID);
1748 1 :
1749 1 : let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
1750 1 : let content_1 = dummy_contents("foo");
1751 1 : std::fs::write(
1752 1 : timeline_path.join(layer_file_name_1.file_name()),
1753 1 : &content_1,
1754 1 : )
1755 1 : .unwrap();
1756 1 :
1757 2 : #[derive(Debug, PartialEq, Clone, Copy)]
1758 1 : struct BytesStartedFinished {
1759 1 : started: Option<usize>,
1760 1 : finished: Option<usize>,
1761 1 : }
1762 1 : impl std::ops::Add for BytesStartedFinished {
1763 1 : type Output = Self;
1764 2 : fn add(self, rhs: Self) -> Self::Output {
1765 2 : Self {
1766 2 : started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
1767 2 : finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
1768 2 : }
1769 2 : }
1770 1 : }
1771 3 : let get_bytes_started_stopped = || {
1772 3 : let started = client
1773 3 : .metrics
1774 3 : .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
1775 3 : .map(|v| v.try_into().unwrap());
1776 3 : let stopped = client
1777 3 : .metrics
1778 3 : .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
1779 3 : .map(|v| v.try_into().unwrap());
1780 3 : BytesStartedFinished {
1781 3 : started,
1782 3 : finished: stopped,
1783 3 : }
1784 3 : };
1785 :
1786 : // Test
1787 1 : tracing::info!("now doing actual test");
1788 :
1789 1 : let actual_a = get_bytes_started_stopped();
1790 1 :
1791 1 : client
1792 1 : .schedule_layer_file_upload(
1793 1 : &layer_file_name_1,
1794 1 : &LayerFileMetadata::new(content_1.len() as u64, harness.generation),
1795 1 : )
1796 1 : .unwrap();
1797 1 :
1798 1 : let actual_b = get_bytes_started_stopped();
1799 1 :
1800 1 : client.wait_completion().await.unwrap();
1801 1 :
1802 1 : let actual_c = get_bytes_started_stopped();
1803 1 :
1804 1 : // Validate
1805 1 :
1806 1 : let expected_b = actual_a
1807 1 : + BytesStartedFinished {
1808 1 : started: Some(content_1.len()),
1809 1 : // assert that the _finished metric is created eagerly so that subtractions work on first sample
1810 1 : finished: Some(0),
1811 1 : };
1812 1 : assert_eq!(actual_b, expected_b);
1813 :
1814 1 : let expected_c = actual_a
1815 1 : + BytesStartedFinished {
1816 1 : started: Some(content_1.len()),
1817 1 : finished: Some(content_1.len()),
1818 1 : };
1819 1 : assert_eq!(actual_c, expected_c);
1820 : }
1821 : }
|