Line data Source code
1 : use std::collections::HashSet;
2 : use std::sync::Arc;
3 :
4 : use anyhow::Context;
5 : use bytes::Bytes;
6 : use http_utils::error::ApiError;
7 : use pageserver_api::key::Key;
8 : use pageserver_api::keyspace::KeySpace;
9 : use pageserver_api::models::DetachBehavior;
10 : use pageserver_api::models::detach_ancestor::AncestorDetached;
11 : use pageserver_api::shard::ShardIdentity;
12 : use pageserver_compaction::helpers::overlaps_with;
13 : use tokio::sync::Semaphore;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::Instrument;
16 : use utils::completion;
17 : use utils::generation::Generation;
18 : use utils::id::TimelineId;
19 : use utils::lsn::Lsn;
20 : use utils::sync::gate::GateError;
21 :
22 : use super::layer_manager::{LayerManager, LayerManagerLockHolder};
23 : use super::{FlushLayerError, Timeline};
24 : use crate::context::{DownloadBehavior, RequestContext};
25 : use crate::task_mgr::TaskKind;
26 : use crate::tenant::TenantShard;
27 : use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
28 : use crate::tenant::storage_layer::layer::local_layer_path;
29 : use crate::tenant::storage_layer::{
30 : AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
31 : ValuesReconstructState,
32 : };
33 : use crate::tenant::timeline::VersionedKeySpaceQuery;
34 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
35 :
36 : #[derive(Debug, thiserror::Error)]
37 : pub(crate) enum Error {
38 : #[error("no ancestors")]
39 : NoAncestor,
40 :
41 : #[error("too many ancestors")]
42 : TooManyAncestors,
43 :
44 : #[error("ancestor is not empty")]
45 : AncestorNotEmpty,
46 :
47 : #[error("shutting down, please retry later")]
48 : ShuttingDown,
49 :
50 : #[error("archived: {}", .0)]
51 : Archived(TimelineId),
52 :
53 : #[error(transparent)]
54 : NotFound(crate::tenant::GetTimelineError),
55 :
56 : #[error("failed to reparent all candidate timelines, please retry")]
57 : FailedToReparentAll,
58 :
59 : #[error("ancestor is already being detached by: {}", .0)]
60 : OtherTimelineDetachOngoing(TimelineId),
61 :
62 : #[error("preparing to timeline ancestor detach failed")]
63 : Prepare(#[source] anyhow::Error),
64 :
65 : #[error("detaching and reparenting failed")]
66 : DetachReparent(#[source] anyhow::Error),
67 :
68 : #[error("completing ancestor detach failed")]
69 : Complete(#[source] anyhow::Error),
70 :
71 : #[error("failpoint: {}", .0)]
72 : Failpoint(&'static str),
73 : }
74 :
75 : impl Error {
76 : /// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
77 : /// variant or fancier `or_else`.
78 0 : fn launder<F>(e: anyhow::Error, or_else: F) -> Error
79 0 : where
80 0 : F: Fn(anyhow::Error) -> Error,
81 : {
82 : use remote_storage::TimeoutOrCancel;
83 :
84 : use crate::tenant::remote_timeline_client::WaitCompletionError;
85 : use crate::tenant::upload_queue::NotInitialized;
86 :
87 0 : if e.is::<NotInitialized>()
88 0 : || TimeoutOrCancel::caused_by_cancel(&e)
89 0 : || e.downcast_ref::<remote_storage::DownloadError>()
90 0 : .is_some_and(|e| e.is_cancelled())
91 0 : || e.is::<WaitCompletionError>()
92 : {
93 0 : Error::ShuttingDown
94 : } else {
95 0 : or_else(e)
96 : }
97 0 : }
98 : }
99 :
100 : impl From<Error> for ApiError {
101 0 : fn from(value: Error) -> Self {
102 0 : match value {
103 0 : Error::NoAncestor => ApiError::Conflict(value.to_string()),
104 : Error::TooManyAncestors | Error::AncestorNotEmpty => {
105 0 : ApiError::BadRequest(anyhow::anyhow!("{value}"))
106 : }
107 0 : Error::ShuttingDown => ApiError::ShuttingDown,
108 0 : Error::Archived(_) => ApiError::BadRequest(anyhow::anyhow!("{value}")),
109 : Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
110 0 : ApiError::ResourceUnavailable(value.to_string().into())
111 : }
112 0 : Error::NotFound(e) => ApiError::from(e),
113 : // these variants should have no cancellation errors because of Error::launder
114 : Error::Prepare(_)
115 : | Error::DetachReparent(_)
116 : | Error::Complete(_)
117 0 : | Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
118 : }
119 0 : }
120 : }
121 :
122 : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
123 0 : fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
124 : // treat all as shutting down signals, even though that is not entirely correct
125 : // (uninitialized state)
126 0 : Error::ShuttingDown
127 0 : }
128 : }
129 : impl From<super::layer_manager::Shutdown> for Error {
130 0 : fn from(_: super::layer_manager::Shutdown) -> Self {
131 0 : Error::ShuttingDown
132 0 : }
133 : }
134 :
135 : pub(crate) enum Progress {
136 : Prepared(Attempt, PreparedTimelineDetach),
137 : Done(AncestorDetached),
138 : }
139 :
140 : pub(crate) struct PreparedTimelineDetach {
141 : layers: Vec<Layer>,
142 : }
143 :
144 : // TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
145 : #[derive(Debug)]
146 : pub(crate) struct Options {
147 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
148 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
149 : }
150 :
151 : impl Default for Options {
152 0 : fn default() -> Self {
153 0 : Self {
154 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
155 0 : copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
156 0 : }
157 0 : }
158 : }
159 :
160 : /// Represents an across tenant reset exclusive single attempt to detach ancestor.
161 : #[derive(Debug)]
162 : pub(crate) struct Attempt {
163 : pub(crate) timeline_id: TimelineId,
164 : pub(crate) ancestor_timeline_id: TimelineId,
165 : pub(crate) ancestor_lsn: Lsn,
166 : _guard: completion::Completion,
167 : gate_entered: Option<utils::sync::gate::GateGuard>,
168 : }
169 :
170 : impl Attempt {
171 0 : pub(crate) fn before_reset_tenant(&mut self) {
172 0 : let taken = self.gate_entered.take();
173 0 : assert!(taken.is_some());
174 0 : }
175 :
176 0 : pub(crate) fn new_barrier(&self) -> completion::Barrier {
177 0 : self._guard.barrier()
178 0 : }
179 : }
180 :
181 0 : pub(crate) async fn generate_tombstone_image_layer(
182 0 : detached: &Arc<Timeline>,
183 0 : ancestor: &Arc<Timeline>,
184 0 : ancestor_lsn: Lsn,
185 0 : historic_layers_to_copy: &Vec<Layer>,
186 0 : ctx: &RequestContext,
187 0 : ) -> Result<Option<ResidentLayer>, Error> {
188 0 : tracing::info!(
189 0 : "removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
190 : );
191 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
192 0 : detached.conf.get_vectored_concurrent_io,
193 0 : detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
194 : );
195 0 : let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
196 : // Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
197 : // not contain too many keys, otherwise this takes a lot of memory. Currently we limit it to 10k keys in the compute.
198 0 : let key_range = Key::sparse_non_inherited_keyspace();
199 : // avoid generating a "future layer" which will then be removed
200 0 : let image_lsn = ancestor_lsn;
201 :
202 : {
203 0 : for layer in historic_layers_to_copy {
204 0 : let desc = layer.layer_desc();
205 0 : if !desc.is_delta
206 0 : && desc.lsn_range.start == image_lsn
207 0 : && overlaps_with(&key_range, &desc.key_range)
208 : {
209 0 : tracing::info!(
210 0 : layer=%layer, "will copy tombstone from ancestor instead of creating a new one"
211 : );
212 :
213 0 : return Ok(None);
214 0 : }
215 : }
216 :
217 0 : let layers = detached
218 0 : .layers
219 0 : .read(LayerManagerLockHolder::DetachAncestor)
220 0 : .await;
221 0 : for layer in layers.all_persistent_layers() {
222 0 : if !layer.is_delta
223 0 : && layer.lsn_range.start == image_lsn
224 0 : && overlaps_with(&key_range, &layer.key_range)
225 : {
226 0 : tracing::warn!(
227 0 : layer=%layer, "image layer at the detach LSN already exists, skipping removing aux files"
228 : );
229 0 : return Ok(None);
230 0 : }
231 : }
232 : }
233 :
234 0 : let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key_range.clone()), image_lsn);
235 0 : let data = ancestor
236 0 : .get_vectored_impl(query, &mut reconstruct_state, ctx)
237 0 : .await
238 0 : .context("failed to retrieve aux keys")
239 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
240 0 : if !data.is_empty() {
241 : // TODO: is it possible that we can have an image at `image_lsn`? Unlikely because image layers are only generated
242 : // upon compaction but theoretically possible.
243 0 : let mut image_layer_writer = ImageLayerWriter::new(
244 0 : detached.conf,
245 0 : detached.timeline_id,
246 0 : detached.tenant_shard_id,
247 0 : &key_range,
248 0 : image_lsn,
249 0 : &detached.gate,
250 0 : detached.cancel.clone(),
251 0 : ctx,
252 0 : )
253 0 : .await
254 0 : .context("failed to create image layer writer")
255 0 : .map_err(Error::Prepare)?;
256 0 : for key in data.keys() {
257 0 : image_layer_writer
258 0 : .put_image(*key, Bytes::new(), ctx)
259 0 : .await
260 0 : .context("failed to write key")
261 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
262 : }
263 0 : let (desc, path) = image_layer_writer
264 0 : .finish(ctx)
265 0 : .await
266 0 : .context("failed to finish image layer writer for removing the metadata keys")
267 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
268 0 : let generated = Layer::finish_creating(detached.conf, detached, desc, &path)
269 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
270 0 : detached
271 0 : .remote_client
272 0 : .upload_layer_file(&generated, &detached.cancel)
273 0 : .await
274 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
275 0 : tracing::info!(layer=%generated, "wrote image layer");
276 0 : Ok(Some(generated))
277 : } else {
278 0 : tracing::info!("no aux keys found in ancestor");
279 0 : Ok(None)
280 : }
281 0 : }
282 :
283 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
284 0 : pub(super) async fn prepare(
285 0 : detached: &Arc<Timeline>,
286 0 : tenant: &TenantShard,
287 0 : behavior: DetachBehavior,
288 0 : options: Options,
289 0 : ctx: &RequestContext,
290 0 : ) -> Result<Progress, Error> {
291 : use Error::*;
292 :
293 0 : let Some((mut ancestor, mut ancestor_lsn)) = detached
294 0 : .ancestor_timeline
295 0 : .as_ref()
296 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
297 : else {
298 : let ancestor_id;
299 : let ancestor_lsn;
300 0 : let still_in_progress = {
301 0 : let accessor = detached.remote_client.initialized_upload_queue()?;
302 :
303 : // we are safe to inspect the latest uploaded, because we can only witness this after
304 : // restart is complete and ancestor is no more.
305 0 : let latest = accessor.latest_uploaded_index_part();
306 0 : let Some((id, lsn)) = latest.lineage.detached_previous_ancestor() else {
307 0 : return Err(NoAncestor);
308 : };
309 0 : ancestor_id = id;
310 0 : ancestor_lsn = lsn;
311 :
312 0 : latest
313 0 : .gc_blocking
314 0 : .as_ref()
315 0 : .is_some_and(|b| b.blocked_by(DetachAncestor))
316 : };
317 :
318 0 : if still_in_progress {
319 : // gc is still blocked, we can still reparent and complete.
320 : // we are safe to reparent remaining, because they were locked in in the beginning.
321 0 : let attempt =
322 0 : continue_with_blocked_gc(detached, tenant, ancestor_id, ancestor_lsn).await?;
323 :
324 : // because the ancestor of detached is already set to none, we have published all
325 : // of the layers, so we are still "prepared."
326 0 : return Ok(Progress::Prepared(
327 0 : attempt,
328 0 : PreparedTimelineDetach { layers: Vec::new() },
329 0 : ));
330 0 : }
331 :
332 0 : let reparented_timelines = reparented_direct_children(detached, tenant)?;
333 0 : return Ok(Progress::Done(AncestorDetached {
334 0 : reparented_timelines,
335 0 : }));
336 : };
337 :
338 0 : if detached.is_archived() != Some(false) {
339 0 : return Err(Archived(detached.timeline_id));
340 0 : }
341 :
342 0 : if !ancestor_lsn.is_valid() {
343 : // rare case, probably wouldn't even load
344 0 : tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
345 0 : return Err(NoAncestor);
346 0 : }
347 :
348 0 : check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn, behavior)?;
349 :
350 0 : if let DetachBehavior::MultiLevelAndNoReparent = behavior {
351 : // If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
352 0 : while let Some(ancestor_of_ancestor) = ancestor.ancestor_timeline.clone() {
353 0 : if ancestor_lsn != ancestor.ancestor_lsn {
354 : // non-technical requirement; we could flatten still if ancestor LSN does not match but that needs
355 : // us to copy and cut more layers.
356 0 : return Err(AncestorNotEmpty);
357 0 : }
358 : // Use the ancestor of the ancestor as the new ancestor (only when the ancestor LSNs are the same)
359 0 : ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
360 0 : ancestor = ancestor_of_ancestor;
361 : // TODO: do we still need to check if we don't want to reparent?
362 0 : check_no_archived_children_of_ancestor(
363 0 : tenant,
364 0 : detached,
365 0 : &ancestor,
366 0 : ancestor_lsn,
367 0 : behavior,
368 0 : )?;
369 : }
370 0 : } else if ancestor.ancestor_timeline.is_some() {
371 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
372 : // not to, at least initially
373 0 : return Err(TooManyAncestors);
374 0 : }
375 :
376 0 : tracing::info!(
377 0 : "attempt to detach the timeline from the ancestor: {}@{}, behavior={:?}",
378 0 : ancestor.timeline_id,
379 : ancestor_lsn,
380 : behavior
381 : );
382 :
383 0 : let attempt = start_new_attempt(detached, tenant, ancestor.timeline_id, ancestor_lsn).await?;
384 :
385 0 : utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
386 :
387 0 : fail::fail_point!(
388 0 : "timeline-detach-ancestor::before_starting_after_locking",
389 0 : |_| Err(Error::Failpoint(
390 0 : "timeline-detach-ancestor::before_starting_after_locking"
391 0 : ))
392 : );
393 :
394 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
395 0 : let span =
396 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
397 0 : async {
398 0 : let started_at = std::time::Instant::now();
399 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
400 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
401 :
402 0 : let res =
403 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
404 0 : .await;
405 :
406 0 : let res = match res {
407 0 : Ok(res) => res,
408 0 : Err(_elapsed) => {
409 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
410 0 : freeze_and_flush.await
411 : }
412 : };
413 :
414 0 : res.map_err(|e| {
415 : use FlushLayerError::*;
416 0 : match e {
417 : Cancelled | NotRunning(_) => {
418 : // FIXME(#6424): technically statically unreachable right now, given how we never
419 : // drop the sender
420 0 : Error::ShuttingDown
421 : }
422 0 : CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
423 : }
424 0 : })?;
425 :
426 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
427 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
428 0 : tracing::info!(
429 0 : elapsed_ms = started_at.elapsed().as_millis(),
430 0 : "froze and flushed the ancestor"
431 : );
432 0 : Ok::<_, Error>(())
433 0 : }
434 0 : .instrument(span)
435 0 : .await?;
436 0 : }
437 :
438 0 : let end_lsn = ancestor_lsn + 1;
439 :
440 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
441 : // we do not need to start from our layers, because they can only be layers that come
442 : // *after* ancestor_lsn
443 0 : let layers = tokio::select! {
444 0 : guard = ancestor.layers.read(LayerManagerLockHolder::DetachAncestor) => guard,
445 0 : _ = detached.cancel.cancelled() => {
446 0 : return Err(ShuttingDown);
447 : }
448 0 : _ = ancestor.cancel.cancelled() => {
449 0 : return Err(ShuttingDown);
450 : }
451 : };
452 :
453 : // between retries, these can change if compaction or gc ran in between. this will mean
454 : // we have to redo work.
455 0 : partition_work(ancestor_lsn, &layers)?
456 : };
457 :
458 : // TODO: layers are already sorted by something: use that to determine how much of remote
459 : // copies are already done -- gc is blocked, but a compaction could had happened on ancestor,
460 : // which is something to keep in mind if copy skipping is implemented.
461 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
462 :
463 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
464 0 : let mut new_layers: Vec<Layer> =
465 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
466 :
467 0 : if let Some(tombstone_layer) =
468 0 : generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, &rest_of_historic, ctx)
469 0 : .await?
470 0 : {
471 0 : new_layers.push(tombstone_layer.into());
472 0 : }
473 :
474 : {
475 0 : tracing::info!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
476 :
477 0 : let mut tasks = tokio::task::JoinSet::new();
478 :
479 0 : let mut wrote_any = false;
480 :
481 0 : let limiter = Arc::new(Semaphore::new(options.rewrite_concurrency.get()));
482 :
483 0 : for layer in straddling_branchpoint {
484 0 : let limiter = limiter.clone();
485 0 : let timeline = detached.clone();
486 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
487 :
488 0 : let span = tracing::info_span!("upload_rewritten_layer", %layer);
489 0 : tasks.spawn(
490 0 : async move {
491 0 : let _permit = limiter.acquire().await;
492 0 : let copied =
493 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
494 0 : .await?;
495 0 : if let Some(copied) = copied.as_ref() {
496 0 : tracing::info!(%copied, "rewrote and uploaded");
497 0 : }
498 0 : Ok(copied)
499 0 : }
500 0 : .instrument(span),
501 : );
502 : }
503 :
504 0 : while let Some(res) = tasks.join_next().await {
505 0 : match res {
506 0 : Ok(Ok(Some(copied))) => {
507 0 : wrote_any = true;
508 0 : new_layers.push(copied);
509 0 : }
510 0 : Ok(Ok(None)) => {}
511 0 : Ok(Err(e)) => return Err(e),
512 0 : Err(je) => return Err(Error::Prepare(je.into())),
513 : }
514 : }
515 :
516 : // FIXME: the fsync should be mandatory, after both rewrites and copies
517 0 : if wrote_any {
518 0 : fsync_timeline_dir(detached, ctx).await;
519 0 : }
520 : }
521 :
522 0 : let mut tasks = tokio::task::JoinSet::new();
523 0 : let limiter = Arc::new(Semaphore::new(options.copy_concurrency.get()));
524 0 : let cancel_eval = CancellationToken::new();
525 :
526 0 : for adopted in rest_of_historic {
527 0 : let limiter = limiter.clone();
528 0 : let timeline = detached.clone();
529 0 : let cancel_eval = cancel_eval.clone();
530 :
531 0 : tasks.spawn(
532 0 : async move {
533 0 : let _permit = tokio::select! {
534 0 : permit = limiter.acquire() => {
535 0 : permit
536 : }
537 : // Wait for the cancellation here instead of letting the entire task be cancelled.
538 : // Cancellations are racy in that they might leave layers on disk.
539 0 : _ = cancel_eval.cancelled() => {
540 0 : Err(Error::ShuttingDown)?
541 : }
542 : };
543 0 : let (owned, did_hardlink) = remote_copy(
544 0 : &adopted,
545 0 : &timeline,
546 0 : timeline.generation,
547 0 : timeline.shard_identity,
548 0 : &timeline.cancel,
549 0 : )
550 0 : .await?;
551 0 : tracing::info!(layer=%owned, did_hard_link=%did_hardlink, "remote copied");
552 0 : Ok((owned, did_hardlink))
553 0 : }
554 0 : .in_current_span(),
555 : );
556 : }
557 :
558 0 : fn delete_layers(timeline: &Timeline, layers: Vec<Layer>) -> Result<(), Error> {
559 : // We are deleting layers, so we must hold the gate
560 0 : let _gate = timeline.gate.enter().map_err(|e| match e {
561 0 : GateError::GateClosed => Error::ShuttingDown,
562 0 : })?;
563 : {
564 0 : layers.into_iter().for_each(|l: Layer| {
565 0 : l.delete_on_drop();
566 0 : std::mem::drop(l);
567 0 : });
568 : }
569 0 : Ok(())
570 0 : }
571 :
572 0 : let mut should_fsync = false;
573 0 : let mut first_err = None;
574 0 : while let Some(res) = tasks.join_next().await {
575 0 : match res {
576 0 : Ok(Ok((owned, did_hardlink))) => {
577 0 : if did_hardlink {
578 0 : should_fsync = true;
579 0 : }
580 0 : new_layers.push(owned);
581 : }
582 :
583 : // Don't stop the evaluation on errors, so that we get the full set of hardlinked layers to delete.
584 0 : Ok(Err(failed)) => {
585 0 : cancel_eval.cancel();
586 0 : first_err.get_or_insert(failed);
587 0 : }
588 0 : Err(je) => {
589 0 : cancel_eval.cancel();
590 0 : first_err.get_or_insert(Error::Prepare(je.into()));
591 0 : }
592 : }
593 : }
594 :
595 0 : if let Some(failed) = first_err {
596 0 : delete_layers(detached, new_layers)?;
597 0 : return Err(failed);
598 0 : }
599 :
600 : // fsync directory again if we hardlinked something
601 0 : if should_fsync {
602 0 : fsync_timeline_dir(detached, ctx).await;
603 0 : }
604 :
605 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
606 :
607 0 : Ok(Progress::Prepared(attempt, prepared))
608 0 : }
609 :
610 0 : async fn start_new_attempt(
611 0 : detached: &Timeline,
612 0 : tenant: &TenantShard,
613 0 : ancestor_timeline_id: TimelineId,
614 0 : ancestor_lsn: Lsn,
615 0 : ) -> Result<Attempt, Error> {
616 0 : let attempt = obtain_exclusive_attempt(detached, tenant, ancestor_timeline_id, ancestor_lsn)?;
617 :
618 : // insert the block in the index_part.json, if not already there.
619 0 : let _dont_care = tenant
620 0 : .gc_block
621 0 : .insert(
622 0 : detached,
623 0 : crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
624 0 : )
625 0 : .await
626 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
627 :
628 0 : Ok(attempt)
629 0 : }
630 :
631 0 : async fn continue_with_blocked_gc(
632 0 : detached: &Timeline,
633 0 : tenant: &TenantShard,
634 0 : ancestor_timeline_id: TimelineId,
635 0 : ancestor_lsn: Lsn,
636 0 : ) -> Result<Attempt, Error> {
637 : // FIXME: it would be nice to confirm that there is an in-memory version, since we've just
638 : // verified there is a persistent one?
639 0 : obtain_exclusive_attempt(detached, tenant, ancestor_timeline_id, ancestor_lsn)
640 0 : }
641 :
642 0 : fn obtain_exclusive_attempt(
643 0 : detached: &Timeline,
644 0 : tenant: &TenantShard,
645 0 : ancestor_timeline_id: TimelineId,
646 0 : ancestor_lsn: Lsn,
647 0 : ) -> Result<Attempt, Error> {
648 : use Error::{OtherTimelineDetachOngoing, ShuttingDown};
649 :
650 : // ensure we are the only active attempt for this tenant
651 0 : let (guard, barrier) = completion::channel();
652 : {
653 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
654 0 : if let Some((tl, other)) = guard.as_ref() {
655 0 : if !other.is_ready() {
656 0 : return Err(OtherTimelineDetachOngoing(*tl));
657 0 : }
658 : // FIXME: no test enters here
659 0 : }
660 0 : *guard = Some((detached.timeline_id, barrier));
661 : }
662 :
663 : // ensure the gate is still open
664 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
665 :
666 0 : Ok(Attempt {
667 0 : timeline_id: detached.timeline_id,
668 0 : ancestor_timeline_id,
669 0 : ancestor_lsn,
670 0 : _guard: guard,
671 0 : gate_entered: Some(_gate_entered),
672 0 : })
673 0 : }
674 :
675 0 : fn reparented_direct_children(
676 0 : detached: &Arc<Timeline>,
677 0 : tenant: &TenantShard,
678 0 : ) -> Result<HashSet<TimelineId>, Error> {
679 0 : let mut all_direct_children = tenant
680 0 : .timelines
681 0 : .lock()
682 0 : .unwrap()
683 0 : .values()
684 0 : .filter_map(|tl| {
685 0 : let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
686 :
687 0 : if is_direct_child {
688 0 : Some(tl.clone())
689 : } else {
690 0 : if let Some(timeline) = tl.ancestor_timeline.as_ref() {
691 0 : assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
692 0 : }
693 0 : None
694 : }
695 0 : })
696 : // Collect to avoid lock taking order problem with Tenant::timelines and
697 : // Timeline::remote_client
698 0 : .collect::<Vec<_>>();
699 :
700 0 : let mut any_shutdown = false;
701 :
702 0 : all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
703 0 : Ok(accessor) => accessor
704 0 : .latest_uploaded_index_part()
705 0 : .lineage
706 0 : .is_reparented(),
707 0 : Err(_shutdownalike) => {
708 : // not 100% a shutdown, but let's bail early not to give inconsistent results in
709 : // sharded enviroment.
710 0 : any_shutdown = true;
711 0 : true
712 : }
713 0 : });
714 :
715 0 : if any_shutdown {
716 : // it could be one or many being deleted; have client retry
717 0 : return Err(Error::ShuttingDown);
718 0 : }
719 :
720 0 : Ok(all_direct_children
721 0 : .into_iter()
722 0 : .map(|tl| tl.timeline_id)
723 0 : .collect())
724 0 : }
725 :
726 0 : fn partition_work(
727 0 : ancestor_lsn: Lsn,
728 0 : source: &LayerManager,
729 0 : ) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
730 0 : let mut straddling_branchpoint = vec![];
731 0 : let mut rest_of_historic = vec![];
732 :
733 0 : let mut later_by_lsn = 0;
734 :
735 0 : for desc in source.layer_map()?.iter_historic_layers() {
736 : // off by one chances here:
737 : // - start is inclusive
738 : // - end is exclusive
739 0 : if desc.lsn_range.start > ancestor_lsn {
740 0 : later_by_lsn += 1;
741 0 : continue;
742 0 : }
743 :
744 0 : let target = if desc.lsn_range.start <= ancestor_lsn
745 0 : && desc.lsn_range.end > ancestor_lsn
746 0 : && desc.is_delta
747 : {
748 : // TODO: image layer at Lsn optimization
749 0 : &mut straddling_branchpoint
750 : } else {
751 0 : &mut rest_of_historic
752 : };
753 :
754 0 : target.push(source.get_from_desc(&desc));
755 : }
756 :
757 0 : Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
758 0 : }
759 :
760 0 : async fn upload_rewritten_layer(
761 0 : end_lsn: Lsn,
762 0 : layer: &Layer,
763 0 : target: &Arc<Timeline>,
764 0 : cancel: &CancellationToken,
765 0 : ctx: &RequestContext,
766 0 : ) -> Result<Option<Layer>, Error> {
767 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
768 :
769 0 : let Some(copied) = copied else {
770 0 : return Ok(None);
771 : };
772 :
773 0 : target
774 0 : .remote_client
775 0 : .upload_layer_file(&copied, cancel)
776 0 : .await
777 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
778 :
779 0 : Ok(Some(copied.into()))
780 0 : }
781 :
782 0 : async fn copy_lsn_prefix(
783 0 : end_lsn: Lsn,
784 0 : layer: &Layer,
785 0 : target_timeline: &Arc<Timeline>,
786 0 : ctx: &RequestContext,
787 0 : ) -> Result<Option<ResidentLayer>, Error> {
788 0 : if target_timeline.cancel.is_cancelled() {
789 0 : return Err(Error::ShuttingDown);
790 0 : }
791 :
792 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
793 :
794 0 : let mut writer = DeltaLayerWriter::new(
795 0 : target_timeline.conf,
796 0 : target_timeline.timeline_id,
797 0 : target_timeline.tenant_shard_id,
798 0 : layer.layer_desc().key_range.start,
799 0 : layer.layer_desc().lsn_range.start..end_lsn,
800 0 : &target_timeline.gate,
801 0 : target_timeline.cancel.clone(),
802 0 : ctx,
803 0 : )
804 0 : .await
805 0 : .with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
806 0 : .map_err(Error::Prepare)?;
807 :
808 0 : let resident = layer.download_and_keep_resident(ctx).await.map_err(|e| {
809 0 : if e.is_cancelled() {
810 0 : Error::ShuttingDown
811 : } else {
812 0 : Error::Prepare(e.into())
813 : }
814 0 : })?;
815 :
816 0 : let records = resident
817 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
818 0 : .await
819 0 : .with_context(|| format!("copy lsn prefix of ancestors {layer}"))
820 0 : .map_err(Error::Prepare)?;
821 :
822 0 : drop(resident);
823 :
824 0 : tracing::debug!(%layer, records, "copied records");
825 :
826 0 : if records == 0 {
827 0 : drop(writer);
828 : // TODO: we might want to store an empty marker in remote storage for this
829 : // layer so that we will not needlessly walk `layer` on repeated attempts.
830 0 : Ok(None)
831 : } else {
832 : // reuse the key instead of adding more holes between layers by using the real
833 : // highest key in the layer.
834 0 : let reused_highest_key = layer.layer_desc().key_range.end;
835 0 : let (desc, path) = writer
836 0 : .finish(reused_highest_key, ctx)
837 0 : .await
838 0 : .map_err(Error::Prepare)?;
839 0 : let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
840 0 : .map_err(Error::Prepare)?;
841 :
842 0 : tracing::debug!(%layer, %copied, "new layer produced");
843 :
844 0 : Ok(Some(copied))
845 : }
846 0 : }
847 :
848 : /// Creates a new Layer instance for the adopted layer, and ensures it is found in the remote
849 : /// storage on successful return. without the adopted layer being added to `index_part.json`.
850 : /// Returns (Layer, did hardlink)
851 0 : async fn remote_copy(
852 0 : adopted: &Layer,
853 0 : adoptee: &Arc<Timeline>,
854 0 : generation: Generation,
855 0 : shard_identity: ShardIdentity,
856 0 : cancel: &CancellationToken,
857 0 : ) -> Result<(Layer, bool), Error> {
858 0 : let mut metadata = adopted.metadata();
859 0 : debug_assert!(metadata.generation <= generation);
860 0 : metadata.generation = generation;
861 0 : metadata.shard = shard_identity.shard_index();
862 :
863 0 : let conf = adoptee.conf;
864 0 : let file_name = adopted.layer_desc().layer_name();
865 :
866 : // We don't want to shut the timeline down during this operation because we do `delete_on_drop` below
867 0 : let _gate = adoptee.gate.enter().map_err(|e| match e {
868 0 : GateError::GateClosed => Error::ShuttingDown,
869 0 : })?;
870 :
871 : // depending if Layer::keep_resident, do a hardlink
872 : let did_hardlink;
873 0 : let owned = if let Some(adopted_resident) = adopted.keep_resident().await {
874 0 : let adopted_path = adopted_resident.local_path();
875 0 : let adoptee_path = local_layer_path(
876 0 : conf,
877 0 : &adoptee.tenant_shard_id,
878 0 : &adoptee.timeline_id,
879 0 : &file_name,
880 0 : &metadata.generation,
881 : );
882 :
883 0 : match std::fs::hard_link(adopted_path, &adoptee_path) {
884 0 : Ok(()) => {}
885 0 : Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
886 : // In theory we should not get into this situation as we are doing cleanups of the layer file after errors.
887 : // However, we don't do cleanups for errors past `prepare`, so there is the slight chance to get to this branch.
888 :
889 : // Double check that the file is orphan (probably from an earlier attempt), then delete it
890 0 : let key = file_name.clone().into();
891 0 : if adoptee
892 0 : .layers
893 0 : .read(LayerManagerLockHolder::DetachAncestor)
894 0 : .await
895 0 : .contains_key(&key)
896 : {
897 : // We are supposed to filter out such cases before coming to this function
898 0 : return Err(Error::Prepare(anyhow::anyhow!(
899 0 : "layer file {file_name} already present and inside layer map"
900 0 : )));
901 0 : }
902 0 : tracing::info!("Deleting orphan layer file to make way for hard linking");
903 : // Delete orphan layer file and try again, to ensure this layer has a well understood source
904 0 : std::fs::remove_file(&adoptee_path)
905 0 : .map_err(|e| Error::launder(e.into(), Error::Prepare))?;
906 0 : std::fs::hard_link(adopted_path, &adoptee_path)
907 0 : .map_err(|e| Error::launder(e.into(), Error::Prepare))?;
908 : }
909 0 : Err(e) => {
910 0 : return Err(Error::launder(e.into(), Error::Prepare));
911 : }
912 : };
913 0 : did_hardlink = true;
914 0 : Layer::for_resident(conf, adoptee, adoptee_path, file_name, metadata).drop_eviction_guard()
915 : } else {
916 0 : did_hardlink = false;
917 0 : Layer::for_evicted(conf, adoptee, file_name, metadata)
918 : };
919 :
920 0 : let layer = match adoptee
921 0 : .remote_client
922 0 : .copy_timeline_layer(adopted, &owned, cancel)
923 0 : .await
924 : {
925 0 : Ok(()) => owned,
926 0 : Err(e) => {
927 0 : {
928 0 : // Clean up the layer so that on a retry we don't get errors that the file already exists
929 0 : owned.delete_on_drop();
930 0 : std::mem::drop(owned);
931 0 : }
932 0 : return Err(Error::launder(e, Error::Prepare));
933 : }
934 : };
935 :
936 0 : Ok((layer, did_hardlink))
937 0 : }
938 :
939 : pub(crate) enum DetachingAndReparenting {
940 : /// All of the following timeline ids were reparented and the timeline ancestor detach must be
941 : /// marked as completed.
942 : Reparented(HashSet<TimelineId>),
943 :
944 : /// Some of the reparentings failed. The timeline ancestor detach must **not** be marked as
945 : /// completed.
946 : ///
947 : /// Nested `must_reset_tenant` is set to true when any restart requiring changes were made.
948 : SomeReparentingFailed { must_reset_tenant: bool },
949 :
950 : /// Detaching and reparentings were completed in a previous attempt. Timeline ancestor detach
951 : /// must be marked as completed.
952 : AlreadyDone(HashSet<TimelineId>),
953 : }
954 :
955 : impl DetachingAndReparenting {
956 0 : pub(crate) fn reset_tenant_required(&self) -> bool {
957 : use DetachingAndReparenting::*;
958 0 : match self {
959 0 : Reparented(_) => true,
960 0 : SomeReparentingFailed { must_reset_tenant } => *must_reset_tenant,
961 0 : AlreadyDone(_) => false,
962 : }
963 0 : }
964 :
965 0 : pub(crate) fn completed(self) -> Option<HashSet<TimelineId>> {
966 : use DetachingAndReparenting::*;
967 0 : match self {
968 0 : Reparented(x) | AlreadyDone(x) => Some(x),
969 0 : SomeReparentingFailed { .. } => None,
970 : }
971 0 : }
972 : }
973 :
974 : /// See [`Timeline::detach_from_ancestor_and_reparent`].
975 0 : pub(super) async fn detach_and_reparent(
976 0 : detached: &Arc<Timeline>,
977 0 : tenant: &TenantShard,
978 0 : prepared: PreparedTimelineDetach,
979 0 : ancestor_timeline_id: TimelineId,
980 0 : ancestor_lsn: Lsn,
981 0 : behavior: DetachBehavior,
982 0 : _ctx: &RequestContext,
983 0 : ) -> Result<DetachingAndReparenting, Error> {
984 0 : let PreparedTimelineDetach { layers } = prepared;
985 :
986 : #[derive(Debug)]
987 : enum Ancestor {
988 : NotDetached(Arc<Timeline>, Lsn),
989 : Detached(Arc<Timeline>, Lsn),
990 : }
991 :
992 0 : let (recorded_branchpoint, still_ongoing) = {
993 0 : let access = detached.remote_client.initialized_upload_queue()?;
994 0 : let latest = access.latest_uploaded_index_part();
995 :
996 : (
997 0 : latest.lineage.detached_previous_ancestor(),
998 0 : latest
999 0 : .gc_blocking
1000 0 : .as_ref()
1001 0 : .is_some_and(|b| b.blocked_by(DetachAncestor)),
1002 : )
1003 : };
1004 0 : assert!(
1005 0 : still_ongoing,
1006 0 : "cannot (detach? reparent)? complete if the operation is not still ongoing"
1007 : );
1008 :
1009 0 : let ancestor_to_detach = match detached.ancestor_timeline.as_ref() {
1010 0 : Some(mut ancestor) => {
1011 0 : while ancestor.timeline_id != ancestor_timeline_id {
1012 0 : match ancestor.ancestor_timeline.as_ref() {
1013 0 : Some(found) => {
1014 0 : if ancestor_lsn != ancestor.ancestor_lsn {
1015 0 : return Err(Error::DetachReparent(anyhow::anyhow!(
1016 0 : "cannot find the ancestor timeline to detach from: wrong ancestor lsn"
1017 0 : )));
1018 0 : }
1019 0 : ancestor = found;
1020 : }
1021 : None => {
1022 0 : return Err(Error::DetachReparent(anyhow::anyhow!(
1023 0 : "cannot find the ancestor timeline to detach from"
1024 0 : )));
1025 : }
1026 : }
1027 : }
1028 0 : Some(ancestor)
1029 : }
1030 0 : None => None,
1031 : };
1032 0 : let ancestor = match (ancestor_to_detach, recorded_branchpoint) {
1033 0 : (Some(ancestor), None) => {
1034 0 : assert!(
1035 0 : !layers.is_empty(),
1036 0 : "there should always be at least one layer to inherit"
1037 : );
1038 0 : Ancestor::NotDetached(ancestor.clone(), detached.ancestor_lsn)
1039 : }
1040 : (Some(_), Some(_)) => {
1041 0 : panic!(
1042 0 : "it should be impossible to get to here without having gone through the tenant reset; if the tenant was reset, then the ancestor_timeline would be None"
1043 : );
1044 : }
1045 0 : (None, Some((ancestor_id, ancestor_lsn))) => {
1046 : // it has been either:
1047 : // - detached but still exists => we can try reparenting
1048 : // - detached and deleted
1049 : //
1050 : // either way, we must complete
1051 0 : assert!(
1052 0 : layers.is_empty(),
1053 0 : "no layers should had been copied as detach is done"
1054 : );
1055 :
1056 0 : let existing = tenant.timelines.lock().unwrap().get(&ancestor_id).cloned();
1057 :
1058 0 : if let Some(ancestor) = existing {
1059 0 : Ancestor::Detached(ancestor, ancestor_lsn)
1060 : } else {
1061 0 : let direct_children = reparented_direct_children(detached, tenant)?;
1062 0 : return Ok(DetachingAndReparenting::AlreadyDone(direct_children));
1063 : }
1064 : }
1065 : (None, None) => {
1066 : // TODO: make sure there are no `?` before tenant_reset from after a questionmark from
1067 : // here.
1068 0 : panic!(
1069 0 : "bug: detach_and_reparent called on a timeline which has not been detached or which has no live ancestor"
1070 : );
1071 : }
1072 : };
1073 :
1074 : // publish the prepared layers before we reparent any of the timelines, so that on restart
1075 : // reparented timelines find layers. also do the actual detaching.
1076 : //
1077 : // if we crash after this operation, a retry will allow reparenting the remaining timelines as
1078 : // gc is blocked.
1079 :
1080 0 : let (ancestor, ancestor_lsn, was_detached) = match ancestor {
1081 0 : Ancestor::NotDetached(ancestor, ancestor_lsn) => {
1082 : // this has to complete before any reparentings because otherwise they would not have
1083 : // layers on the new parent.
1084 0 : detached
1085 0 : .remote_client
1086 0 : .schedule_adding_existing_layers_to_index_detach_and_wait(
1087 0 : &layers,
1088 0 : (ancestor.timeline_id, ancestor_lsn),
1089 0 : )
1090 0 : .await
1091 0 : .context("publish layers and detach ancestor")
1092 0 : .map_err(|e| Error::launder(e, Error::DetachReparent))?;
1093 :
1094 0 : tracing::info!(
1095 0 : ancestor=%ancestor.timeline_id,
1096 : %ancestor_lsn,
1097 0 : inherited_layers=%layers.len(),
1098 0 : "detached from ancestor"
1099 : );
1100 0 : (ancestor, ancestor_lsn, true)
1101 : }
1102 0 : Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn, false),
1103 : };
1104 :
1105 0 : if let DetachBehavior::MultiLevelAndNoReparent = behavior {
1106 : // Do not reparent if the user requests to behave so.
1107 0 : return Ok(DetachingAndReparenting::Reparented(HashSet::new()));
1108 0 : }
1109 :
1110 0 : let mut tasks = tokio::task::JoinSet::new();
1111 :
1112 : // Returns a single permit semaphore which will be used to make one reparenting succeed,
1113 : // others will fail as if those timelines had been stopped for whatever reason.
1114 : #[cfg(feature = "testing")]
1115 0 : let failpoint_sem = || -> Option<Arc<Semaphore>> {
1116 0 : fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
1117 0 : Arc::new(Semaphore::new(1))
1118 : ));
1119 0 : None
1120 0 : }();
1121 :
1122 : // because we are now keeping the slot in progress, it is unlikely that there will be any
1123 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
1124 : {
1125 0 : let g = tenant.timelines.lock().unwrap();
1126 0 : reparentable_timelines(g.values(), detached, &ancestor, ancestor_lsn)
1127 0 : .cloned()
1128 0 : .for_each(|timeline| {
1129 : // important in this scope: we are holding the Tenant::timelines lock
1130 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
1131 0 : let new_parent = detached.timeline_id;
1132 : #[cfg(feature = "testing")]
1133 0 : let failpoint_sem = failpoint_sem.clone();
1134 :
1135 0 : tasks.spawn(
1136 0 : async move {
1137 0 : let res = async {
1138 : #[cfg(feature = "testing")]
1139 0 : if let Some(failpoint_sem) = failpoint_sem {
1140 0 : let _permit = failpoint_sem.acquire().await.map_err(|_| {
1141 0 : anyhow::anyhow!(
1142 0 : "failpoint: timeline-detach-ancestor::allow_one_reparented",
1143 : )
1144 0 : })?;
1145 0 : failpoint_sem.close();
1146 0 : }
1147 :
1148 0 : timeline
1149 0 : .remote_client
1150 0 : .schedule_reparenting_and_wait(&new_parent)
1151 0 : .await
1152 0 : }
1153 0 : .await;
1154 :
1155 0 : match res {
1156 : Ok(()) => {
1157 0 : tracing::info!("reparented");
1158 0 : Some(timeline)
1159 : }
1160 0 : Err(e) => {
1161 : // with the use of tenant slot, raced timeline deletion is the most
1162 : // likely reason.
1163 0 : tracing::warn!("reparenting failed: {e:#}");
1164 0 : None
1165 : }
1166 : }
1167 0 : }
1168 0 : .instrument(span),
1169 : );
1170 0 : });
1171 : }
1172 :
1173 0 : let reparenting_candidates = tasks.len();
1174 0 : let mut reparented = HashSet::with_capacity(tasks.len());
1175 :
1176 0 : while let Some(res) = tasks.join_next().await {
1177 0 : match res {
1178 0 : Ok(Some(timeline)) => {
1179 0 : assert!(
1180 0 : reparented.insert(timeline.timeline_id),
1181 0 : "duplicate reparenting? timeline_id={}",
1182 0 : timeline.timeline_id
1183 : );
1184 : }
1185 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
1186 : // just ignore failures now, we can retry
1187 0 : Ok(None) => {}
1188 0 : Err(je) if je.is_panic() => {}
1189 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
1190 : }
1191 : }
1192 :
1193 0 : let reparented_all = reparenting_candidates == reparented.len();
1194 :
1195 0 : if reparented_all {
1196 0 : Ok(DetachingAndReparenting::Reparented(reparented))
1197 : } else {
1198 0 : tracing::info!(
1199 0 : reparented = reparented.len(),
1200 : candidates = reparenting_candidates,
1201 0 : "failed to reparent all candidates; they can be retried after the tenant_reset",
1202 : );
1203 :
1204 0 : let must_reset_tenant = !reparented.is_empty() || was_detached;
1205 0 : Ok(DetachingAndReparenting::SomeReparentingFailed { must_reset_tenant })
1206 : }
1207 0 : }
1208 :
1209 0 : pub(super) async fn complete(
1210 0 : detached: &Arc<Timeline>,
1211 0 : tenant: &TenantShard,
1212 0 : mut attempt: Attempt,
1213 0 : _ctx: &RequestContext,
1214 0 : ) -> Result<(), Error> {
1215 0 : assert_eq!(detached.timeline_id, attempt.timeline_id);
1216 :
1217 0 : if attempt.gate_entered.is_none() {
1218 0 : let entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
1219 0 : attempt.gate_entered = Some(entered);
1220 0 : } else {
1221 0 : // Some(gate_entered) means the tenant was not restarted, as is not required
1222 0 : }
1223 :
1224 0 : assert!(detached.ancestor_timeline.is_none());
1225 :
1226 : // this should be an 503 at least...?
1227 0 : fail::fail_point!(
1228 0 : "timeline-detach-ancestor::complete_before_uploading",
1229 0 : |_| Err(Error::Failpoint(
1230 0 : "timeline-detach-ancestor::complete_before_uploading"
1231 0 : ))
1232 : );
1233 :
1234 0 : tenant
1235 0 : .gc_block
1236 0 : .remove(
1237 0 : detached,
1238 0 : crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
1239 0 : )
1240 0 : .await
1241 0 : .map_err(|e| Error::launder(e, Error::Complete))?;
1242 :
1243 0 : Ok(())
1244 0 : }
1245 :
1246 : /// Query against a locked `Tenant::timelines`.
1247 : ///
1248 : /// A timeline is reparentable if:
1249 : ///
1250 : /// - It is not the timeline being detached.
1251 : /// - It has the same ancestor as the timeline being detached. Note that the ancestor might not be the direct ancestor.
1252 0 : fn reparentable_timelines<'a, I>(
1253 0 : timelines: I,
1254 0 : detached: &'a Arc<Timeline>,
1255 0 : ancestor: &'a Arc<Timeline>,
1256 0 : ancestor_lsn: Lsn,
1257 0 : ) -> impl Iterator<Item = &'a Arc<Timeline>> + 'a
1258 0 : where
1259 0 : I: Iterator<Item = &'a Arc<Timeline>> + 'a,
1260 : {
1261 0 : timelines.filter_map(move |tl| {
1262 0 : if Arc::ptr_eq(tl, detached) {
1263 0 : return None;
1264 0 : }
1265 :
1266 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
1267 0 : let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
1268 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
1269 :
1270 0 : let is_deleting = tl
1271 0 : .delete_progress
1272 0 : .try_lock()
1273 0 : .map(|flow| !flow.is_not_started())
1274 0 : .unwrap_or(true);
1275 :
1276 0 : if is_same && is_earlier && !is_deleting {
1277 0 : Some(tl)
1278 : } else {
1279 0 : None
1280 : }
1281 0 : })
1282 0 : }
1283 :
1284 0 : fn check_no_archived_children_of_ancestor(
1285 0 : tenant: &TenantShard,
1286 0 : detached: &Arc<Timeline>,
1287 0 : ancestor: &Arc<Timeline>,
1288 0 : ancestor_lsn: Lsn,
1289 0 : detach_behavior: DetachBehavior,
1290 0 : ) -> Result<(), Error> {
1291 0 : match detach_behavior {
1292 : DetachBehavior::NoAncestorAndReparent => {
1293 0 : let timelines = tenant.timelines.lock().unwrap();
1294 0 : let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
1295 :
1296 0 : for timeline in
1297 0 : reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn)
1298 : {
1299 0 : if timeline.is_archived() == Some(true) {
1300 0 : return Err(Error::Archived(timeline.timeline_id));
1301 0 : }
1302 : }
1303 :
1304 0 : for timeline_offloaded in timelines_offloaded.values() {
1305 0 : if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
1306 0 : continue;
1307 0 : }
1308 : // This forbids the detach ancestor feature if flattened timelines are present,
1309 : // even if the ancestor_lsn is from after the branchpoint of the detached timeline.
1310 : // But as per current design, we don't record the ancestor_lsn of flattened timelines.
1311 : // This is a bit unfortunate, but as of writing this we don't support flattening
1312 : // anyway. Maybe we can evolve the data model in the future.
1313 0 : if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
1314 0 : let is_earlier = retain_lsn <= ancestor_lsn;
1315 0 : if !is_earlier {
1316 0 : continue;
1317 0 : }
1318 0 : }
1319 0 : return Err(Error::Archived(timeline_offloaded.timeline_id));
1320 : }
1321 : }
1322 0 : DetachBehavior::MultiLevelAndNoReparent => {
1323 0 : // We don't need to check anything if the user requested to not reparent.
1324 0 : }
1325 : }
1326 :
1327 0 : Ok(())
1328 0 : }
1329 :
1330 0 : async fn fsync_timeline_dir(timeline: &Timeline, ctx: &RequestContext) {
1331 0 : let path = &timeline
1332 0 : .conf
1333 0 : .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id);
1334 0 : let timeline_dir = VirtualFile::open(&path, ctx)
1335 0 : .await
1336 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
1337 0 : timeline_dir
1338 0 : .sync_all()
1339 0 : .await
1340 0 : .fatal_err("VirtualFile::sync_all timeline dir");
1341 0 : }
|