Line data Source code
1 : use std::collections::HashSet;
2 : use std::sync::Arc;
3 :
4 : use anyhow::Context;
5 : use http_utils::error::ApiError;
6 : use pageserver_api::models::detach_ancestor::AncestorDetached;
7 : use pageserver_api::shard::ShardIdentity;
8 : use tokio::sync::Semaphore;
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::Instrument;
11 : use utils::completion;
12 : use utils::generation::Generation;
13 : use utils::id::TimelineId;
14 : use utils::lsn::Lsn;
15 : use utils::sync::gate::GateError;
16 :
17 : use super::layer_manager::LayerManager;
18 : use super::{FlushLayerError, Timeline};
19 : use crate::context::{DownloadBehavior, RequestContext};
20 : use crate::task_mgr::TaskKind;
21 : use crate::tenant::Tenant;
22 : use crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor;
23 : use crate::tenant::storage_layer::layer::local_layer_path;
24 : use crate::tenant::storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer};
25 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
26 :
27 : #[derive(Debug, thiserror::Error)]
28 : pub(crate) enum Error {
29 : #[error("no ancestors")]
30 : NoAncestor,
31 :
32 : #[error("too many ancestors")]
33 : TooManyAncestors,
34 :
35 : #[error("shutting down, please retry later")]
36 : ShuttingDown,
37 :
38 : #[error("archived: {}", .0)]
39 : Archived(TimelineId),
40 :
41 : #[error(transparent)]
42 : NotFound(crate::tenant::GetTimelineError),
43 :
44 : #[error("failed to reparent all candidate timelines, please retry")]
45 : FailedToReparentAll,
46 :
47 : #[error("ancestor is already being detached by: {}", .0)]
48 : OtherTimelineDetachOngoing(TimelineId),
49 :
50 : #[error("preparing to timeline ancestor detach failed")]
51 : Prepare(#[source] anyhow::Error),
52 :
53 : #[error("detaching and reparenting failed")]
54 : DetachReparent(#[source] anyhow::Error),
55 :
56 : #[error("completing ancestor detach failed")]
57 : Complete(#[source] anyhow::Error),
58 :
59 : #[error("failpoint: {}", .0)]
60 : Failpoint(&'static str),
61 : }
62 :
63 : impl Error {
64 : /// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
65 : /// variant or fancier `or_else`.
66 0 : fn launder<F>(e: anyhow::Error, or_else: F) -> Error
67 0 : where
68 0 : F: Fn(anyhow::Error) -> Error,
69 0 : {
70 : use remote_storage::TimeoutOrCancel;
71 :
72 : use crate::tenant::remote_timeline_client::WaitCompletionError;
73 : use crate::tenant::upload_queue::NotInitialized;
74 :
75 0 : if e.is::<NotInitialized>()
76 0 : || TimeoutOrCancel::caused_by_cancel(&e)
77 0 : || e.downcast_ref::<remote_storage::DownloadError>()
78 0 : .is_some_and(|e| e.is_cancelled())
79 0 : || e.is::<WaitCompletionError>()
80 : {
81 0 : Error::ShuttingDown
82 : } else {
83 0 : or_else(e)
84 : }
85 0 : }
86 : }
87 :
88 : impl From<Error> for ApiError {
89 0 : fn from(value: Error) -> Self {
90 0 : match value {
91 0 : Error::NoAncestor => ApiError::Conflict(value.to_string()),
92 0 : Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{value}")),
93 0 : Error::ShuttingDown => ApiError::ShuttingDown,
94 0 : Error::Archived(_) => ApiError::BadRequest(anyhow::anyhow!("{value}")),
95 : Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
96 0 : ApiError::ResourceUnavailable(value.to_string().into())
97 : }
98 0 : Error::NotFound(e) => ApiError::from(e),
99 : // these variants should have no cancellation errors because of Error::launder
100 : Error::Prepare(_)
101 : | Error::DetachReparent(_)
102 : | Error::Complete(_)
103 0 : | Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
104 : }
105 0 : }
106 : }
107 :
108 : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
109 0 : fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
110 0 : // treat all as shutting down signals, even though that is not entirely correct
111 0 : // (uninitialized state)
112 0 : Error::ShuttingDown
113 0 : }
114 : }
115 : impl From<super::layer_manager::Shutdown> for Error {
116 0 : fn from(_: super::layer_manager::Shutdown) -> Self {
117 0 : Error::ShuttingDown
118 0 : }
119 : }
120 :
121 : pub(crate) enum Progress {
122 : Prepared(Attempt, PreparedTimelineDetach),
123 : Done(AncestorDetached),
124 : }
125 :
126 : pub(crate) struct PreparedTimelineDetach {
127 : layers: Vec<Layer>,
128 : }
129 :
130 : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
131 : #[derive(Debug)]
132 : pub(crate) struct Options {
133 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
134 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
135 : }
136 :
137 : impl Default for Options {
138 0 : fn default() -> Self {
139 0 : Self {
140 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
141 0 : copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
142 0 : }
143 0 : }
144 : }
145 :
146 : /// Represents an across tenant reset exclusive single attempt to detach ancestor.
147 : #[derive(Debug)]
148 : pub(crate) struct Attempt {
149 : pub(crate) timeline_id: TimelineId,
150 :
151 : _guard: completion::Completion,
152 : gate_entered: Option<utils::sync::gate::GateGuard>,
153 : }
154 :
155 : impl Attempt {
156 0 : pub(crate) fn before_reset_tenant(&mut self) {
157 0 : let taken = self.gate_entered.take();
158 0 : assert!(taken.is_some());
159 0 : }
160 :
161 0 : pub(crate) fn new_barrier(&self) -> completion::Barrier {
162 0 : self._guard.barrier()
163 0 : }
164 : }
165 :
166 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
167 0 : pub(super) async fn prepare(
168 0 : detached: &Arc<Timeline>,
169 0 : tenant: &Tenant,
170 0 : options: Options,
171 0 : ctx: &RequestContext,
172 0 : ) -> Result<Progress, Error> {
173 : use Error::*;
174 :
175 0 : let Some((ancestor, ancestor_lsn)) = detached
176 0 : .ancestor_timeline
177 0 : .as_ref()
178 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
179 : else {
180 0 : let still_in_progress = {
181 0 : let accessor = detached.remote_client.initialized_upload_queue()?;
182 :
183 : // we are safe to inspect the latest uploaded, because we can only witness this after
184 : // restart is complete and ancestor is no more.
185 0 : let latest = accessor.latest_uploaded_index_part();
186 0 : if latest.lineage.detached_previous_ancestor().is_none() {
187 0 : return Err(NoAncestor);
188 0 : };
189 0 :
190 0 : latest
191 0 : .gc_blocking
192 0 : .as_ref()
193 0 : .is_some_and(|b| b.blocked_by(DetachAncestor))
194 0 : };
195 0 :
196 0 : if still_in_progress {
197 : // gc is still blocked, we can still reparent and complete.
198 : // we are safe to reparent remaining, because they were locked in in the beginning.
199 0 : let attempt = continue_with_blocked_gc(detached, tenant).await?;
200 :
201 : // because the ancestor of detached is already set to none, we have published all
202 : // of the layers, so we are still "prepared."
203 0 : return Ok(Progress::Prepared(
204 0 : attempt,
205 0 : PreparedTimelineDetach { layers: Vec::new() },
206 0 : ));
207 0 : }
208 :
209 0 : let reparented_timelines = reparented_direct_children(detached, tenant)?;
210 0 : return Ok(Progress::Done(AncestorDetached {
211 0 : reparented_timelines,
212 0 : }));
213 : };
214 :
215 0 : if detached.is_archived() != Some(false) {
216 0 : return Err(Archived(detached.timeline_id));
217 0 : }
218 0 :
219 0 : if !ancestor_lsn.is_valid() {
220 : // rare case, probably wouldn't even load
221 0 : tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
222 0 : return Err(NoAncestor);
223 0 : }
224 0 :
225 0 : check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
226 :
227 0 : if ancestor.ancestor_timeline.is_some() {
228 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
229 : // not to, at least initially
230 0 : return Err(TooManyAncestors);
231 0 : }
232 :
233 0 : let attempt = start_new_attempt(detached, tenant).await?;
234 :
235 0 : utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
236 :
237 0 : fail::fail_point!(
238 0 : "timeline-detach-ancestor::before_starting_after_locking",
239 0 : |_| Err(Error::Failpoint(
240 0 : "timeline-detach-ancestor::before_starting_after_locking"
241 0 : ))
242 0 : );
243 :
244 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
245 0 : let span =
246 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
247 0 : async {
248 0 : let started_at = std::time::Instant::now();
249 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
250 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
251 :
252 0 : let res =
253 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
254 0 : .await;
255 :
256 0 : let res = match res {
257 0 : Ok(res) => res,
258 0 : Err(_elapsed) => {
259 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
260 0 : freeze_and_flush.await
261 : }
262 : };
263 :
264 0 : res.map_err(|e| {
265 : use FlushLayerError::*;
266 0 : match e {
267 : Cancelled | NotRunning(_) => {
268 : // FIXME(#6424): technically statically unreachable right now, given how we never
269 : // drop the sender
270 0 : Error::ShuttingDown
271 : }
272 0 : CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
273 : }
274 0 : })?;
275 :
276 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
277 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
278 0 : tracing::info!(
279 0 : elapsed_ms = started_at.elapsed().as_millis(),
280 0 : "froze and flushed the ancestor"
281 : );
282 0 : Ok::<_, Error>(())
283 0 : }
284 0 : .instrument(span)
285 0 : .await?;
286 0 : }
287 :
288 0 : let end_lsn = ancestor_lsn + 1;
289 :
290 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
291 : // we do not need to start from our layers, because they can only be layers that come
292 : // *after* ancestor_lsn
293 0 : let layers = tokio::select! {
294 0 : guard = ancestor.layers.read() => guard,
295 0 : _ = detached.cancel.cancelled() => {
296 0 : return Err(ShuttingDown);
297 : }
298 0 : _ = ancestor.cancel.cancelled() => {
299 0 : return Err(ShuttingDown);
300 : }
301 : };
302 :
303 : // between retries, these can change if compaction or gc ran in between. this will mean
304 : // we have to redo work.
305 0 : partition_work(ancestor_lsn, &layers)?
306 : };
307 :
308 : // TODO: layers are already sorted by something: use that to determine how much of remote
309 : // copies are already done -- gc is blocked, but a compaction could had happened on ancestor,
310 : // which is something to keep in mind if copy skipping is implemented.
311 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
312 :
313 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
314 0 : let mut new_layers: Vec<Layer> =
315 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
316 0 :
317 0 : {
318 0 : tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
319 :
320 0 : let mut tasks = tokio::task::JoinSet::new();
321 0 :
322 0 : let mut wrote_any = false;
323 0 :
324 0 : let limiter = Arc::new(Semaphore::new(options.rewrite_concurrency.get()));
325 :
326 0 : for layer in straddling_branchpoint {
327 0 : let limiter = limiter.clone();
328 0 : let timeline = detached.clone();
329 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
330 :
331 0 : let span = tracing::info_span!("upload_rewritten_layer", %layer);
332 0 : tasks.spawn(
333 0 : async move {
334 0 : let _permit = limiter.acquire().await;
335 0 : let copied =
336 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
337 0 : .await?;
338 0 : if let Some(copied) = copied.as_ref() {
339 0 : tracing::info!(%copied, "rewrote and uploaded");
340 0 : }
341 0 : Ok(copied)
342 0 : }
343 0 : .instrument(span),
344 0 : );
345 0 : }
346 :
347 0 : while let Some(res) = tasks.join_next().await {
348 0 : match res {
349 0 : Ok(Ok(Some(copied))) => {
350 0 : wrote_any = true;
351 0 : new_layers.push(copied);
352 0 : }
353 0 : Ok(Ok(None)) => {}
354 0 : Ok(Err(e)) => return Err(e),
355 0 : Err(je) => return Err(Error::Prepare(je.into())),
356 : }
357 : }
358 :
359 : // FIXME: the fsync should be mandatory, after both rewrites and copies
360 0 : if wrote_any {
361 0 : fsync_timeline_dir(detached, ctx).await;
362 0 : }
363 : }
364 :
365 0 : let mut tasks = tokio::task::JoinSet::new();
366 0 : let limiter = Arc::new(Semaphore::new(options.copy_concurrency.get()));
367 0 : let cancel_eval = CancellationToken::new();
368 :
369 0 : for adopted in rest_of_historic {
370 0 : let limiter = limiter.clone();
371 0 : let timeline = detached.clone();
372 0 : let cancel_eval = cancel_eval.clone();
373 0 :
374 0 : tasks.spawn(
375 0 : async move {
376 0 : let _permit = tokio::select! {
377 0 : permit = limiter.acquire() => {
378 0 : permit
379 : }
380 : // Wait for the cancellation here instead of letting the entire task be cancelled.
381 : // Cancellations are racy in that they might leave layers on disk.
382 0 : _ = cancel_eval.cancelled() => {
383 0 : Err(Error::ShuttingDown)?
384 : }
385 : };
386 0 : let (owned, did_hardlink) = remote_copy(
387 0 : &adopted,
388 0 : &timeline,
389 0 : timeline.generation,
390 0 : timeline.shard_identity,
391 0 : &timeline.cancel,
392 0 : )
393 0 : .await?;
394 0 : tracing::info!(layer=%owned, did_hard_link=%did_hardlink, "remote copied");
395 0 : Ok((owned, did_hardlink))
396 0 : }
397 0 : .in_current_span(),
398 0 : );
399 0 : }
400 :
401 0 : fn delete_layers(timeline: &Timeline, layers: Vec<Layer>) -> Result<(), Error> {
402 : // We are deleting layers, so we must hold the gate
403 0 : let _gate = timeline.gate.enter().map_err(|e| match e {
404 0 : GateError::GateClosed => Error::ShuttingDown,
405 0 : })?;
406 0 : {
407 0 : layers.into_iter().for_each(|l: Layer| {
408 0 : l.delete_on_drop();
409 0 : std::mem::drop(l);
410 0 : });
411 0 : }
412 0 : Ok(())
413 0 : }
414 :
415 0 : let mut should_fsync = false;
416 0 : let mut first_err = None;
417 0 : while let Some(res) = tasks.join_next().await {
418 0 : match res {
419 0 : Ok(Ok((owned, did_hardlink))) => {
420 0 : if did_hardlink {
421 0 : should_fsync = true;
422 0 : }
423 0 : new_layers.push(owned);
424 : }
425 :
426 : // Don't stop the evaluation on errors, so that we get the full set of hardlinked layers to delete.
427 0 : Ok(Err(failed)) => {
428 0 : cancel_eval.cancel();
429 0 : first_err.get_or_insert(failed);
430 0 : }
431 0 : Err(je) => {
432 0 : cancel_eval.cancel();
433 0 : first_err.get_or_insert(Error::Prepare(je.into()));
434 0 : }
435 : }
436 : }
437 :
438 0 : if let Some(failed) = first_err {
439 0 : delete_layers(detached, new_layers)?;
440 0 : return Err(failed);
441 0 : }
442 0 :
443 0 : // fsync directory again if we hardlinked something
444 0 : if should_fsync {
445 0 : fsync_timeline_dir(detached, ctx).await;
446 0 : }
447 :
448 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
449 0 :
450 0 : Ok(Progress::Prepared(attempt, prepared))
451 0 : }
452 :
453 0 : async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
454 0 : let attempt = obtain_exclusive_attempt(detached, tenant)?;
455 :
456 : // insert the block in the index_part.json, if not already there.
457 0 : let _dont_care = tenant
458 0 : .gc_block
459 0 : .insert(
460 0 : detached,
461 0 : crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
462 0 : )
463 0 : .await
464 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
465 :
466 0 : Ok(attempt)
467 0 : }
468 :
469 0 : async fn continue_with_blocked_gc(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
470 0 : // FIXME: it would be nice to confirm that there is an in-memory version, since we've just
471 0 : // verified there is a persistent one?
472 0 : obtain_exclusive_attempt(detached, tenant)
473 0 : }
474 :
475 0 : fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
476 : use Error::{OtherTimelineDetachOngoing, ShuttingDown};
477 :
478 : // ensure we are the only active attempt for this tenant
479 0 : let (guard, barrier) = completion::channel();
480 0 : {
481 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
482 0 : if let Some((tl, other)) = guard.as_ref() {
483 0 : if !other.is_ready() {
484 0 : return Err(OtherTimelineDetachOngoing(*tl));
485 0 : }
486 : // FIXME: no test enters here
487 0 : }
488 0 : *guard = Some((detached.timeline_id, barrier));
489 : }
490 :
491 : // ensure the gate is still open
492 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
493 :
494 0 : Ok(Attempt {
495 0 : timeline_id: detached.timeline_id,
496 0 : _guard: guard,
497 0 : gate_entered: Some(_gate_entered),
498 0 : })
499 0 : }
500 :
501 0 : fn reparented_direct_children(
502 0 : detached: &Arc<Timeline>,
503 0 : tenant: &Tenant,
504 0 : ) -> Result<HashSet<TimelineId>, Error> {
505 0 : let mut all_direct_children = tenant
506 0 : .timelines
507 0 : .lock()
508 0 : .unwrap()
509 0 : .values()
510 0 : .filter_map(|tl| {
511 0 : let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
512 :
513 0 : if is_direct_child {
514 0 : Some(tl.clone())
515 : } else {
516 0 : if let Some(timeline) = tl.ancestor_timeline.as_ref() {
517 0 : assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
518 0 : }
519 0 : None
520 : }
521 0 : })
522 0 : // Collect to avoid lock taking order problem with Tenant::timelines and
523 0 : // Timeline::remote_client
524 0 : .collect::<Vec<_>>();
525 0 :
526 0 : let mut any_shutdown = false;
527 0 :
528 0 : all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
529 0 : Ok(accessor) => accessor
530 0 : .latest_uploaded_index_part()
531 0 : .lineage
532 0 : .is_reparented(),
533 0 : Err(_shutdownalike) => {
534 0 : // not 100% a shutdown, but let's bail early not to give inconsistent results in
535 0 : // sharded enviroment.
536 0 : any_shutdown = true;
537 0 : true
538 : }
539 0 : });
540 0 :
541 0 : if any_shutdown {
542 : // it could be one or many being deleted; have client retry
543 0 : return Err(Error::ShuttingDown);
544 0 : }
545 0 :
546 0 : Ok(all_direct_children
547 0 : .into_iter()
548 0 : .map(|tl| tl.timeline_id)
549 0 : .collect())
550 0 : }
551 :
552 0 : fn partition_work(
553 0 : ancestor_lsn: Lsn,
554 0 : source: &LayerManager,
555 0 : ) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
556 0 : let mut straddling_branchpoint = vec![];
557 0 : let mut rest_of_historic = vec![];
558 0 :
559 0 : let mut later_by_lsn = 0;
560 :
561 0 : for desc in source.layer_map()?.iter_historic_layers() {
562 : // off by one chances here:
563 : // - start is inclusive
564 : // - end is exclusive
565 0 : if desc.lsn_range.start > ancestor_lsn {
566 0 : later_by_lsn += 1;
567 0 : continue;
568 0 : }
569 :
570 0 : let target = if desc.lsn_range.start <= ancestor_lsn
571 0 : && desc.lsn_range.end > ancestor_lsn
572 0 : && desc.is_delta
573 : {
574 : // TODO: image layer at Lsn optimization
575 0 : &mut straddling_branchpoint
576 : } else {
577 0 : &mut rest_of_historic
578 : };
579 :
580 0 : target.push(source.get_from_desc(&desc));
581 : }
582 :
583 0 : Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
584 0 : }
585 :
586 0 : async fn upload_rewritten_layer(
587 0 : end_lsn: Lsn,
588 0 : layer: &Layer,
589 0 : target: &Arc<Timeline>,
590 0 : cancel: &CancellationToken,
591 0 : ctx: &RequestContext,
592 0 : ) -> Result<Option<Layer>, Error> {
593 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
594 :
595 0 : let Some(copied) = copied else {
596 0 : return Ok(None);
597 : };
598 :
599 0 : target
600 0 : .remote_client
601 0 : .upload_layer_file(&copied, cancel)
602 0 : .await
603 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
604 :
605 0 : Ok(Some(copied.into()))
606 0 : }
607 :
608 0 : async fn copy_lsn_prefix(
609 0 : end_lsn: Lsn,
610 0 : layer: &Layer,
611 0 : target_timeline: &Arc<Timeline>,
612 0 : ctx: &RequestContext,
613 0 : ) -> Result<Option<ResidentLayer>, Error> {
614 0 : if target_timeline.cancel.is_cancelled() {
615 0 : return Err(Error::ShuttingDown);
616 0 : }
617 0 :
618 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
619 :
620 0 : let mut writer = DeltaLayerWriter::new(
621 0 : target_timeline.conf,
622 0 : target_timeline.timeline_id,
623 0 : target_timeline.tenant_shard_id,
624 0 : layer.layer_desc().key_range.start,
625 0 : layer.layer_desc().lsn_range.start..end_lsn,
626 0 : ctx,
627 0 : )
628 0 : .await
629 0 : .with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
630 0 : .map_err(Error::Prepare)?;
631 :
632 0 : let resident = layer.download_and_keep_resident(ctx).await.map_err(|e| {
633 0 : if e.is_cancelled() {
634 0 : Error::ShuttingDown
635 : } else {
636 0 : Error::Prepare(e.into())
637 : }
638 0 : })?;
639 :
640 0 : let records = resident
641 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
642 0 : .await
643 0 : .with_context(|| format!("copy lsn prefix of ancestors {layer}"))
644 0 : .map_err(Error::Prepare)?;
645 :
646 0 : drop(resident);
647 0 :
648 0 : tracing::debug!(%layer, records, "copied records");
649 :
650 0 : if records == 0 {
651 0 : drop(writer);
652 0 : // TODO: we might want to store an empty marker in remote storage for this
653 0 : // layer so that we will not needlessly walk `layer` on repeated attempts.
654 0 : Ok(None)
655 : } else {
656 : // reuse the key instead of adding more holes between layers by using the real
657 : // highest key in the layer.
658 0 : let reused_highest_key = layer.layer_desc().key_range.end;
659 0 : let (desc, path) = writer
660 0 : .finish(reused_highest_key, ctx)
661 0 : .await
662 0 : .map_err(Error::Prepare)?;
663 0 : let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
664 0 : .map_err(Error::Prepare)?;
665 :
666 0 : tracing::debug!(%layer, %copied, "new layer produced");
667 :
668 0 : Ok(Some(copied))
669 : }
670 0 : }
671 :
672 : /// Creates a new Layer instance for the adopted layer, and ensures it is found in the remote
673 : /// storage on successful return. without the adopted layer being added to `index_part.json`.
674 : /// Returns (Layer, did hardlink)
675 0 : async fn remote_copy(
676 0 : adopted: &Layer,
677 0 : adoptee: &Arc<Timeline>,
678 0 : generation: Generation,
679 0 : shard_identity: ShardIdentity,
680 0 : cancel: &CancellationToken,
681 0 : ) -> Result<(Layer, bool), Error> {
682 0 : let mut metadata = adopted.metadata();
683 0 : debug_assert!(metadata.generation <= generation);
684 0 : metadata.generation = generation;
685 0 : metadata.shard = shard_identity.shard_index();
686 0 :
687 0 : let conf = adoptee.conf;
688 0 : let file_name = adopted.layer_desc().layer_name();
689 :
690 : // We don't want to shut the timeline down during this operation because we do `delete_on_drop` below
691 0 : let _gate = adoptee.gate.enter().map_err(|e| match e {
692 0 : GateError::GateClosed => Error::ShuttingDown,
693 0 : })?;
694 :
695 : // depending if Layer::keep_resident, do a hardlink
696 : let did_hardlink;
697 0 : let owned = if let Some(adopted_resident) = adopted.keep_resident().await {
698 0 : let adopted_path = adopted_resident.local_path();
699 0 : let adoptee_path = local_layer_path(
700 0 : conf,
701 0 : &adoptee.tenant_shard_id,
702 0 : &adoptee.timeline_id,
703 0 : &file_name,
704 0 : &metadata.generation,
705 0 : );
706 0 :
707 0 : match std::fs::hard_link(adopted_path, &adoptee_path) {
708 0 : Ok(()) => {}
709 0 : Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
710 0 : // In theory we should not get into this situation as we are doing cleanups of the layer file after errors.
711 0 : // However, we don't do cleanups for errors past `prepare`, so there is the slight chance to get to this branch.
712 0 :
713 0 : // Double check that the file is orphan (probably from an earlier attempt), then delete it
714 0 : let key = file_name.clone().into();
715 0 : if adoptee.layers.read().await.contains_key(&key) {
716 : // We are supposed to filter out such cases before coming to this function
717 0 : return Err(Error::Prepare(anyhow::anyhow!(
718 0 : "layer file {file_name} already present and inside layer map"
719 0 : )));
720 0 : }
721 0 : tracing::info!("Deleting orphan layer file to make way for hard linking");
722 : // Delete orphan layer file and try again, to ensure this layer has a well understood source
723 0 : std::fs::remove_file(adopted_path)
724 0 : .map_err(|e| Error::launder(e.into(), Error::Prepare))?;
725 0 : std::fs::hard_link(adopted_path, &adoptee_path)
726 0 : .map_err(|e| Error::launder(e.into(), Error::Prepare))?;
727 : }
728 0 : Err(e) => {
729 0 : return Err(Error::launder(e.into(), Error::Prepare));
730 : }
731 : };
732 0 : did_hardlink = true;
733 0 : Layer::for_resident(conf, adoptee, adoptee_path, file_name, metadata).drop_eviction_guard()
734 : } else {
735 0 : did_hardlink = false;
736 0 : Layer::for_evicted(conf, adoptee, file_name, metadata)
737 : };
738 :
739 0 : let layer = match adoptee
740 0 : .remote_client
741 0 : .copy_timeline_layer(adopted, &owned, cancel)
742 0 : .await
743 : {
744 0 : Ok(()) => owned,
745 0 : Err(e) => {
746 0 : {
747 0 : // Clean up the layer so that on a retry we don't get errors that the file already exists
748 0 : owned.delete_on_drop();
749 0 : std::mem::drop(owned);
750 0 : }
751 0 : return Err(Error::launder(e, Error::Prepare));
752 : }
753 : };
754 :
755 0 : Ok((layer, did_hardlink))
756 0 : }
757 :
758 : pub(crate) enum DetachingAndReparenting {
759 : /// All of the following timeline ids were reparented and the timeline ancestor detach must be
760 : /// marked as completed.
761 : Reparented(HashSet<TimelineId>),
762 :
763 : /// Some of the reparentings failed. The timeline ancestor detach must **not** be marked as
764 : /// completed.
765 : ///
766 : /// Nested `must_reset_tenant` is set to true when any restart requiring changes were made.
767 : SomeReparentingFailed { must_reset_tenant: bool },
768 :
769 : /// Detaching and reparentings were completed in a previous attempt. Timeline ancestor detach
770 : /// must be marked as completed.
771 : AlreadyDone(HashSet<TimelineId>),
772 : }
773 :
774 : impl DetachingAndReparenting {
775 0 : pub(crate) fn reset_tenant_required(&self) -> bool {
776 : use DetachingAndReparenting::*;
777 0 : match self {
778 0 : Reparented(_) => true,
779 0 : SomeReparentingFailed { must_reset_tenant } => *must_reset_tenant,
780 0 : AlreadyDone(_) => false,
781 : }
782 0 : }
783 :
784 0 : pub(crate) fn completed(self) -> Option<HashSet<TimelineId>> {
785 : use DetachingAndReparenting::*;
786 0 : match self {
787 0 : Reparented(x) | AlreadyDone(x) => Some(x),
788 0 : SomeReparentingFailed { .. } => None,
789 : }
790 0 : }
791 : }
792 :
793 : /// See [`Timeline::detach_from_ancestor_and_reparent`].
794 0 : pub(super) async fn detach_and_reparent(
795 0 : detached: &Arc<Timeline>,
796 0 : tenant: &Tenant,
797 0 : prepared: PreparedTimelineDetach,
798 0 : _ctx: &RequestContext,
799 0 : ) -> Result<DetachingAndReparenting, Error> {
800 0 : let PreparedTimelineDetach { layers } = prepared;
801 :
802 : #[derive(Debug)]
803 : enum Ancestor {
804 : NotDetached(Arc<Timeline>, Lsn),
805 : Detached(Arc<Timeline>, Lsn),
806 : }
807 :
808 0 : let (recorded_branchpoint, still_ongoing) = {
809 0 : let access = detached.remote_client.initialized_upload_queue()?;
810 0 : let latest = access.latest_uploaded_index_part();
811 0 :
812 0 : (
813 0 : latest.lineage.detached_previous_ancestor(),
814 0 : latest
815 0 : .gc_blocking
816 0 : .as_ref()
817 0 : .is_some_and(|b| b.blocked_by(DetachAncestor)),
818 0 : )
819 0 : };
820 0 : assert!(
821 0 : still_ongoing,
822 0 : "cannot (detach? reparent)? complete if the operation is not still ongoing"
823 : );
824 :
825 0 : let ancestor = match (detached.ancestor_timeline.as_ref(), recorded_branchpoint) {
826 0 : (Some(ancestor), None) => {
827 0 : assert!(
828 0 : !layers.is_empty(),
829 0 : "there should always be at least one layer to inherit"
830 : );
831 0 : Ancestor::NotDetached(ancestor.clone(), detached.ancestor_lsn)
832 : }
833 : (Some(_), Some(_)) => {
834 0 : panic!(
835 0 : "it should be impossible to get to here without having gone through the tenant reset; if the tenant was reset, then the ancestor_timeline would be None"
836 0 : );
837 : }
838 0 : (None, Some((ancestor_id, ancestor_lsn))) => {
839 0 : // it has been either:
840 0 : // - detached but still exists => we can try reparenting
841 0 : // - detached and deleted
842 0 : //
843 0 : // either way, we must complete
844 0 : assert!(
845 0 : layers.is_empty(),
846 0 : "no layers should had been copied as detach is done"
847 : );
848 :
849 0 : let existing = tenant.timelines.lock().unwrap().get(&ancestor_id).cloned();
850 :
851 0 : if let Some(ancestor) = existing {
852 0 : Ancestor::Detached(ancestor, ancestor_lsn)
853 : } else {
854 0 : let direct_children = reparented_direct_children(detached, tenant)?;
855 0 : return Ok(DetachingAndReparenting::AlreadyDone(direct_children));
856 : }
857 : }
858 : (None, None) => {
859 : // TODO: make sure there are no `?` before tenant_reset from after a questionmark from
860 : // here.
861 0 : panic!(
862 0 : "bug: detach_and_reparent called on a timeline which has not been detached or which has no live ancestor"
863 0 : );
864 : }
865 : };
866 :
867 : // publish the prepared layers before we reparent any of the timelines, so that on restart
868 : // reparented timelines find layers. also do the actual detaching.
869 : //
870 : // if we crash after this operation, a retry will allow reparenting the remaining timelines as
871 : // gc is blocked.
872 :
873 0 : let (ancestor, ancestor_lsn, was_detached) = match ancestor {
874 0 : Ancestor::NotDetached(ancestor, ancestor_lsn) => {
875 0 : // this has to complete before any reparentings because otherwise they would not have
876 0 : // layers on the new parent.
877 0 : detached
878 0 : .remote_client
879 0 : .schedule_adding_existing_layers_to_index_detach_and_wait(
880 0 : &layers,
881 0 : (ancestor.timeline_id, ancestor_lsn),
882 0 : )
883 0 : .await
884 0 : .context("publish layers and detach ancestor")
885 0 : .map_err(|e| Error::launder(e, Error::DetachReparent))?;
886 :
887 0 : tracing::info!(
888 0 : ancestor=%ancestor.timeline_id,
889 0 : %ancestor_lsn,
890 0 : inherited_layers=%layers.len(),
891 0 : "detached from ancestor"
892 : );
893 0 : (ancestor, ancestor_lsn, true)
894 : }
895 0 : Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn, false),
896 : };
897 :
898 0 : let mut tasks = tokio::task::JoinSet::new();
899 0 :
900 0 : // Returns a single permit semaphore which will be used to make one reparenting succeed,
901 0 : // others will fail as if those timelines had been stopped for whatever reason.
902 0 : #[cfg(feature = "testing")]
903 0 : let failpoint_sem = || -> Option<Arc<Semaphore>> {
904 0 : fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
905 0 : Arc::new(Semaphore::new(1))
906 0 : ));
907 0 : None
908 0 : }();
909 0 :
910 0 : // because we are now keeping the slot in progress, it is unlikely that there will be any
911 0 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
912 0 : {
913 0 : let g = tenant.timelines.lock().unwrap();
914 0 : reparentable_timelines(g.values(), detached, &ancestor, ancestor_lsn)
915 0 : .cloned()
916 0 : .for_each(|timeline| {
917 : // important in this scope: we are holding the Tenant::timelines lock
918 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
919 0 : let new_parent = detached.timeline_id;
920 0 : #[cfg(feature = "testing")]
921 0 : let failpoint_sem = failpoint_sem.clone();
922 0 :
923 0 : tasks.spawn(
924 0 : async move {
925 0 : let res = async {
926 : #[cfg(feature = "testing")]
927 0 : if let Some(failpoint_sem) = failpoint_sem {
928 0 : let _permit = failpoint_sem.acquire().await.map_err(|_| {
929 0 : anyhow::anyhow!(
930 0 : "failpoint: timeline-detach-ancestor::allow_one_reparented",
931 0 : )
932 0 : })?;
933 0 : failpoint_sem.close();
934 0 : }
935 :
936 0 : timeline
937 0 : .remote_client
938 0 : .schedule_reparenting_and_wait(&new_parent)
939 0 : .await
940 0 : }
941 0 : .await;
942 :
943 0 : match res {
944 : Ok(()) => {
945 0 : tracing::info!("reparented");
946 0 : Some(timeline)
947 : }
948 0 : Err(e) => {
949 0 : // with the use of tenant slot, raced timeline deletion is the most
950 0 : // likely reason.
951 0 : tracing::warn!("reparenting failed: {e:#}");
952 0 : None
953 : }
954 : }
955 0 : }
956 0 : .instrument(span),
957 0 : );
958 0 : });
959 0 : }
960 0 :
961 0 : let reparenting_candidates = tasks.len();
962 0 : let mut reparented = HashSet::with_capacity(tasks.len());
963 :
964 0 : while let Some(res) = tasks.join_next().await {
965 0 : match res {
966 0 : Ok(Some(timeline)) => {
967 0 : assert!(
968 0 : reparented.insert(timeline.timeline_id),
969 0 : "duplicate reparenting? timeline_id={}",
970 0 : timeline.timeline_id
971 : );
972 : }
973 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
974 : // just ignore failures now, we can retry
975 0 : Ok(None) => {}
976 0 : Err(je) if je.is_panic() => {}
977 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
978 : }
979 : }
980 :
981 0 : let reparented_all = reparenting_candidates == reparented.len();
982 0 :
983 0 : if reparented_all {
984 0 : Ok(DetachingAndReparenting::Reparented(reparented))
985 : } else {
986 0 : tracing::info!(
987 0 : reparented = reparented.len(),
988 0 : candidates = reparenting_candidates,
989 0 : "failed to reparent all candidates; they can be retried after the tenant_reset",
990 : );
991 :
992 0 : let must_reset_tenant = !reparented.is_empty() || was_detached;
993 0 : Ok(DetachingAndReparenting::SomeReparentingFailed { must_reset_tenant })
994 : }
995 0 : }
996 :
997 0 : pub(super) async fn complete(
998 0 : detached: &Arc<Timeline>,
999 0 : tenant: &Tenant,
1000 0 : mut attempt: Attempt,
1001 0 : _ctx: &RequestContext,
1002 0 : ) -> Result<(), Error> {
1003 0 : assert_eq!(detached.timeline_id, attempt.timeline_id);
1004 :
1005 0 : if attempt.gate_entered.is_none() {
1006 0 : let entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
1007 0 : attempt.gate_entered = Some(entered);
1008 0 : } else {
1009 0 : // Some(gate_entered) means the tenant was not restarted, as is not required
1010 0 : }
1011 :
1012 0 : assert!(detached.ancestor_timeline.is_none());
1013 :
1014 : // this should be an 503 at least...?
1015 0 : fail::fail_point!(
1016 0 : "timeline-detach-ancestor::complete_before_uploading",
1017 0 : |_| Err(Error::Failpoint(
1018 0 : "timeline-detach-ancestor::complete_before_uploading"
1019 0 : ))
1020 0 : );
1021 :
1022 0 : tenant
1023 0 : .gc_block
1024 0 : .remove(
1025 0 : detached,
1026 0 : crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
1027 0 : )
1028 0 : .await
1029 0 : .map_err(|e| Error::launder(e, Error::Complete))?;
1030 :
1031 0 : Ok(())
1032 0 : }
1033 :
1034 : /// Query against a locked `Tenant::timelines`.
1035 0 : fn reparentable_timelines<'a, I>(
1036 0 : timelines: I,
1037 0 : detached: &'a Arc<Timeline>,
1038 0 : ancestor: &'a Arc<Timeline>,
1039 0 : ancestor_lsn: Lsn,
1040 0 : ) -> impl Iterator<Item = &'a Arc<Timeline>> + 'a
1041 0 : where
1042 0 : I: Iterator<Item = &'a Arc<Timeline>> + 'a,
1043 0 : {
1044 0 : timelines.filter_map(move |tl| {
1045 0 : if Arc::ptr_eq(tl, detached) {
1046 0 : return None;
1047 0 : }
1048 :
1049 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
1050 0 : let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
1051 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
1052 0 :
1053 0 : let is_deleting = tl
1054 0 : .delete_progress
1055 0 : .try_lock()
1056 0 : .map(|flow| !flow.is_not_started())
1057 0 : .unwrap_or(true);
1058 0 :
1059 0 : if is_same && is_earlier && !is_deleting {
1060 0 : Some(tl)
1061 : } else {
1062 0 : None
1063 : }
1064 0 : })
1065 0 : }
1066 :
1067 0 : fn check_no_archived_children_of_ancestor(
1068 0 : tenant: &Tenant,
1069 0 : detached: &Arc<Timeline>,
1070 0 : ancestor: &Arc<Timeline>,
1071 0 : ancestor_lsn: Lsn,
1072 0 : ) -> Result<(), Error> {
1073 0 : let timelines = tenant.timelines.lock().unwrap();
1074 0 : let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
1075 0 : for timeline in reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn) {
1076 0 : if timeline.is_archived() == Some(true) {
1077 0 : return Err(Error::Archived(timeline.timeline_id));
1078 0 : }
1079 : }
1080 0 : for timeline_offloaded in timelines_offloaded.values() {
1081 0 : if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
1082 0 : continue;
1083 0 : }
1084 : // This forbids the detach ancestor feature if flattened timelines are present,
1085 : // even if the ancestor_lsn is from after the branchpoint of the detached timeline.
1086 : // But as per current design, we don't record the ancestor_lsn of flattened timelines.
1087 : // This is a bit unfortunate, but as of writing this we don't support flattening
1088 : // anyway. Maybe we can evolve the data model in the future.
1089 0 : if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
1090 0 : let is_earlier = retain_lsn <= ancestor_lsn;
1091 0 : if !is_earlier {
1092 0 : continue;
1093 0 : }
1094 0 : }
1095 0 : return Err(Error::Archived(timeline_offloaded.timeline_id));
1096 : }
1097 0 : Ok(())
1098 0 : }
1099 :
1100 0 : async fn fsync_timeline_dir(timeline: &Timeline, ctx: &RequestContext) {
1101 0 : let path = &timeline
1102 0 : .conf
1103 0 : .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id);
1104 0 : let timeline_dir = VirtualFile::open(&path, ctx)
1105 0 : .await
1106 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
1107 0 : timeline_dir
1108 0 : .sync_all()
1109 0 : .await
1110 0 : .fatal_err("VirtualFile::sync_all timeline dir");
1111 0 : }
|