Line data Source code
1 : use std::{collections::HashSet, sync::Arc};
2 :
3 : use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
4 : use crate::{
5 : context::{DownloadBehavior, RequestContext},
6 : task_mgr::TaskKind,
7 : tenant::{
8 : remote_timeline_client::index::GcBlockingReason::DetachAncestor,
9 : storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
10 : Tenant,
11 : },
12 : virtual_file::{MaybeFatalIo, VirtualFile},
13 : };
14 : use anyhow::Context;
15 : use pageserver_api::{models::detach_ancestor::AncestorDetached, shard::ShardIdentity};
16 : use tokio::sync::Semaphore;
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::Instrument;
19 : use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
20 :
21 0 : #[derive(Debug, thiserror::Error)]
22 : pub(crate) enum Error {
23 : #[error("no ancestors")]
24 : NoAncestor,
25 :
26 : #[error("too many ancestors")]
27 : TooManyAncestors,
28 :
29 : #[error("shutting down, please retry later")]
30 : ShuttingDown,
31 :
32 : #[error("archived: {}", .0)]
33 : Archived(TimelineId),
34 :
35 : #[error(transparent)]
36 : NotFound(crate::tenant::GetTimelineError),
37 :
38 : #[error("failed to reparent all candidate timelines, please retry")]
39 : FailedToReparentAll,
40 :
41 : #[error("ancestor is already being detached by: {}", .0)]
42 : OtherTimelineDetachOngoing(TimelineId),
43 :
44 : #[error("preparing to timeline ancestor detach failed")]
45 : Prepare(#[source] anyhow::Error),
46 :
47 : #[error("detaching and reparenting failed")]
48 : DetachReparent(#[source] anyhow::Error),
49 :
50 : #[error("completing ancestor detach failed")]
51 : Complete(#[source] anyhow::Error),
52 :
53 : #[error("failpoint: {}", .0)]
54 : Failpoint(&'static str),
55 : }
56 :
57 : impl Error {
58 : /// Try to catch cancellation from within the `anyhow::Error`, or wrap the anyhow as the given
59 : /// variant or fancier `or_else`.
60 0 : fn launder<F>(e: anyhow::Error, or_else: F) -> Error
61 0 : where
62 0 : F: Fn(anyhow::Error) -> Error,
63 0 : {
64 : use crate::tenant::remote_timeline_client::WaitCompletionError;
65 : use crate::tenant::upload_queue::NotInitialized;
66 : use remote_storage::TimeoutOrCancel;
67 :
68 0 : if e.is::<NotInitialized>()
69 0 : || TimeoutOrCancel::caused_by_cancel(&e)
70 0 : || e.downcast_ref::<remote_storage::DownloadError>()
71 0 : .is_some_and(|e| e.is_cancelled())
72 0 : || e.is::<WaitCompletionError>()
73 : {
74 0 : Error::ShuttingDown
75 : } else {
76 0 : or_else(e)
77 : }
78 0 : }
79 : }
80 :
81 : impl From<Error> for ApiError {
82 0 : fn from(value: Error) -> Self {
83 0 : match value {
84 0 : Error::NoAncestor => ApiError::Conflict(value.to_string()),
85 0 : Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{value}")),
86 0 : Error::ShuttingDown => ApiError::ShuttingDown,
87 0 : Error::Archived(_) => ApiError::BadRequest(anyhow::anyhow!("{value}")),
88 : Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
89 0 : ApiError::ResourceUnavailable(value.to_string().into())
90 : }
91 0 : Error::NotFound(e) => ApiError::from(e),
92 : // these variants should have no cancellation errors because of Error::launder
93 : Error::Prepare(_)
94 : | Error::DetachReparent(_)
95 : | Error::Complete(_)
96 0 : | Error::Failpoint(_) => ApiError::InternalServerError(value.into()),
97 : }
98 0 : }
99 : }
100 :
101 : impl From<crate::tenant::upload_queue::NotInitialized> for Error {
102 0 : fn from(_: crate::tenant::upload_queue::NotInitialized) -> Self {
103 0 : // treat all as shutting down signals, even though that is not entirely correct
104 0 : // (uninitialized state)
105 0 : Error::ShuttingDown
106 0 : }
107 : }
108 : impl From<super::layer_manager::Shutdown> for Error {
109 0 : fn from(_: super::layer_manager::Shutdown) -> Self {
110 0 : Error::ShuttingDown
111 0 : }
112 : }
113 :
114 : pub(crate) enum Progress {
115 : Prepared(Attempt, PreparedTimelineDetach),
116 : Done(AncestorDetached),
117 : }
118 :
119 : pub(crate) struct PreparedTimelineDetach {
120 : layers: Vec<Layer>,
121 : }
122 :
123 : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
124 : #[derive(Debug)]
125 : pub(crate) struct Options {
126 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
127 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
128 : }
129 :
130 : impl Default for Options {
131 0 : fn default() -> Self {
132 0 : Self {
133 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
134 0 : copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
135 0 : }
136 0 : }
137 : }
138 :
139 : /// Represents an across tenant reset exclusive single attempt to detach ancestor.
140 : #[derive(Debug)]
141 : pub(crate) struct Attempt {
142 : pub(crate) timeline_id: TimelineId,
143 :
144 : _guard: completion::Completion,
145 : gate_entered: Option<utils::sync::gate::GateGuard>,
146 : }
147 :
148 : impl Attempt {
149 0 : pub(crate) fn before_reset_tenant(&mut self) {
150 0 : let taken = self.gate_entered.take();
151 0 : assert!(taken.is_some());
152 0 : }
153 :
154 0 : pub(crate) fn new_barrier(&self) -> completion::Barrier {
155 0 : self._guard.barrier()
156 0 : }
157 : }
158 :
159 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
160 0 : pub(super) async fn prepare(
161 0 : detached: &Arc<Timeline>,
162 0 : tenant: &Tenant,
163 0 : options: Options,
164 0 : ctx: &RequestContext,
165 0 : ) -> Result<Progress, Error> {
166 : use Error::*;
167 :
168 0 : let Some((ancestor, ancestor_lsn)) = detached
169 0 : .ancestor_timeline
170 0 : .as_ref()
171 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
172 : else {
173 0 : let still_in_progress = {
174 0 : let accessor = detached.remote_client.initialized_upload_queue()?;
175 :
176 : // we are safe to inspect the latest uploaded, because we can only witness this after
177 : // restart is complete and ancestor is no more.
178 0 : let latest = accessor.latest_uploaded_index_part();
179 0 : if latest.lineage.detached_previous_ancestor().is_none() {
180 0 : return Err(NoAncestor);
181 0 : };
182 0 :
183 0 : latest
184 0 : .gc_blocking
185 0 : .as_ref()
186 0 : .is_some_and(|b| b.blocked_by(DetachAncestor))
187 0 : };
188 0 :
189 0 : if still_in_progress {
190 : // gc is still blocked, we can still reparent and complete.
191 : // we are safe to reparent remaining, because they were locked in in the beginning.
192 0 : let attempt = continue_with_blocked_gc(detached, tenant).await?;
193 :
194 : // because the ancestor of detached is already set to none, we have published all
195 : // of the layers, so we are still "prepared."
196 0 : return Ok(Progress::Prepared(
197 0 : attempt,
198 0 : PreparedTimelineDetach { layers: Vec::new() },
199 0 : ));
200 0 : }
201 :
202 0 : let reparented_timelines = reparented_direct_children(detached, tenant)?;
203 0 : return Ok(Progress::Done(AncestorDetached {
204 0 : reparented_timelines,
205 0 : }));
206 : };
207 :
208 0 : if detached.is_archived() != Some(false) {
209 0 : return Err(Archived(detached.timeline_id));
210 0 : }
211 0 :
212 0 : if !ancestor_lsn.is_valid() {
213 : // rare case, probably wouldn't even load
214 0 : tracing::error!("ancestor is set, but ancestor_lsn is invalid, this timeline needs fixing");
215 0 : return Err(NoAncestor);
216 0 : }
217 0 :
218 0 : check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
219 :
220 0 : if ancestor.ancestor_timeline.is_some() {
221 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
222 : // not to, at least initially
223 0 : return Err(TooManyAncestors);
224 0 : }
225 :
226 0 : let attempt = start_new_attempt(detached, tenant).await?;
227 :
228 0 : utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
229 :
230 0 : fail::fail_point!(
231 0 : "timeline-detach-ancestor::before_starting_after_locking",
232 0 : |_| Err(Error::Failpoint(
233 0 : "timeline-detach-ancestor::before_starting_after_locking"
234 0 : ))
235 0 : );
236 :
237 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
238 0 : let span =
239 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
240 0 : async {
241 0 : let started_at = std::time::Instant::now();
242 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
243 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
244 :
245 0 : let res =
246 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
247 0 : .await;
248 :
249 0 : let res = match res {
250 0 : Ok(res) => res,
251 0 : Err(_elapsed) => {
252 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
253 0 : freeze_and_flush.await
254 : }
255 : };
256 :
257 0 : res.map_err(|e| {
258 : use FlushLayerError::*;
259 0 : match e {
260 : Cancelled | NotRunning(_) => {
261 : // FIXME(#6424): technically statically unreachable right now, given how we never
262 : // drop the sender
263 0 : Error::ShuttingDown
264 : }
265 0 : CreateImageLayersError(_) | Other(_) => Error::Prepare(e.into()),
266 : }
267 0 : })?;
268 :
269 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
270 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
271 0 : tracing::info!(
272 0 : elapsed_ms = started_at.elapsed().as_millis(),
273 0 : "froze and flushed the ancestor"
274 : );
275 0 : Ok::<_, Error>(())
276 0 : }
277 0 : .instrument(span)
278 0 : .await?;
279 0 : }
280 :
281 0 : let end_lsn = ancestor_lsn + 1;
282 :
283 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
284 : // we do not need to start from our layers, because they can only be layers that come
285 : // *after* ancestor_lsn
286 0 : let layers = tokio::select! {
287 0 : guard = ancestor.layers.read() => guard,
288 0 : _ = detached.cancel.cancelled() => {
289 0 : return Err(ShuttingDown);
290 : }
291 0 : _ = ancestor.cancel.cancelled() => {
292 0 : return Err(ShuttingDown);
293 : }
294 : };
295 :
296 : // between retries, these can change if compaction or gc ran in between. this will mean
297 : // we have to redo work.
298 0 : partition_work(ancestor_lsn, &layers)?
299 : };
300 :
301 : // TODO: layers are already sorted by something: use that to determine how much of remote
302 : // copies are already done -- gc is blocked, but a compaction could had happened on ancestor,
303 : // which is something to keep in mind if copy skipping is implemented.
304 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
305 :
306 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
307 0 : let mut new_layers: Vec<Layer> =
308 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
309 0 :
310 0 : {
311 0 : tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
312 :
313 0 : let mut tasks = tokio::task::JoinSet::new();
314 0 :
315 0 : let mut wrote_any = false;
316 0 :
317 0 : let limiter = Arc::new(Semaphore::new(options.rewrite_concurrency.get()));
318 :
319 0 : for layer in straddling_branchpoint {
320 0 : let limiter = limiter.clone();
321 0 : let timeline = detached.clone();
322 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
323 :
324 0 : let span = tracing::info_span!("upload_rewritten_layer", %layer);
325 0 : tasks.spawn(
326 0 : async move {
327 0 : let _permit = limiter.acquire().await;
328 0 : let copied =
329 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
330 0 : .await?;
331 0 : if let Some(copied) = copied.as_ref() {
332 0 : tracing::info!(%copied, "rewrote and uploaded");
333 0 : }
334 0 : Ok(copied)
335 0 : }
336 0 : .instrument(span),
337 0 : );
338 0 : }
339 :
340 0 : while let Some(res) = tasks.join_next().await {
341 0 : match res {
342 0 : Ok(Ok(Some(copied))) => {
343 0 : wrote_any = true;
344 0 : new_layers.push(copied);
345 0 : }
346 0 : Ok(Ok(None)) => {}
347 0 : Ok(Err(e)) => return Err(e),
348 0 : Err(je) => return Err(Error::Prepare(je.into())),
349 : }
350 : }
351 :
352 : // FIXME: the fsync should be mandatory, after both rewrites and copies
353 0 : if wrote_any {
354 0 : let timeline_dir = VirtualFile::open(
355 0 : &detached
356 0 : .conf
357 0 : .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
358 0 : ctx,
359 0 : )
360 0 : .await
361 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
362 0 : timeline_dir
363 0 : .sync_all()
364 0 : .await
365 0 : .fatal_err("VirtualFile::sync_all timeline dir");
366 0 : }
367 : }
368 :
369 0 : let mut tasks = tokio::task::JoinSet::new();
370 0 : let limiter = Arc::new(Semaphore::new(options.copy_concurrency.get()));
371 :
372 0 : for adopted in rest_of_historic {
373 0 : let limiter = limiter.clone();
374 0 : let timeline = detached.clone();
375 0 :
376 0 : tasks.spawn(
377 0 : async move {
378 0 : let _permit = limiter.acquire().await;
379 0 : let owned = remote_copy(
380 0 : &adopted,
381 0 : &timeline,
382 0 : timeline.generation,
383 0 : timeline.shard_identity,
384 0 : &timeline.cancel,
385 0 : )
386 0 : .await?;
387 0 : tracing::info!(layer=%owned, "remote copied");
388 0 : Ok(owned)
389 0 : }
390 0 : .in_current_span(),
391 0 : );
392 0 : }
393 :
394 0 : while let Some(res) = tasks.join_next().await {
395 0 : match res {
396 0 : Ok(Ok(owned)) => {
397 0 : new_layers.push(owned);
398 0 : }
399 0 : Ok(Err(failed)) => {
400 0 : return Err(failed);
401 : }
402 0 : Err(je) => return Err(Error::Prepare(je.into())),
403 : }
404 : }
405 :
406 : // TODO: fsync directory again if we hardlinked something
407 :
408 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
409 0 :
410 0 : Ok(Progress::Prepared(attempt, prepared))
411 0 : }
412 :
413 0 : async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
414 0 : let attempt = obtain_exclusive_attempt(detached, tenant)?;
415 :
416 : // insert the block in the index_part.json, if not already there.
417 0 : let _dont_care = tenant
418 0 : .gc_block
419 0 : .insert(
420 0 : detached,
421 0 : crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
422 0 : )
423 0 : .await
424 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
425 :
426 0 : Ok(attempt)
427 0 : }
428 :
429 0 : async fn continue_with_blocked_gc(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
430 0 : // FIXME: it would be nice to confirm that there is an in-memory version, since we've just
431 0 : // verified there is a persistent one?
432 0 : obtain_exclusive_attempt(detached, tenant)
433 0 : }
434 :
435 0 : fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
436 : use Error::{OtherTimelineDetachOngoing, ShuttingDown};
437 :
438 : // ensure we are the only active attempt for this tenant
439 0 : let (guard, barrier) = completion::channel();
440 0 : {
441 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
442 0 : if let Some((tl, other)) = guard.as_ref() {
443 0 : if !other.is_ready() {
444 0 : return Err(OtherTimelineDetachOngoing(*tl));
445 0 : }
446 : // FIXME: no test enters here
447 0 : }
448 0 : *guard = Some((detached.timeline_id, barrier));
449 : }
450 :
451 : // ensure the gate is still open
452 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
453 :
454 0 : Ok(Attempt {
455 0 : timeline_id: detached.timeline_id,
456 0 : _guard: guard,
457 0 : gate_entered: Some(_gate_entered),
458 0 : })
459 0 : }
460 :
461 0 : fn reparented_direct_children(
462 0 : detached: &Arc<Timeline>,
463 0 : tenant: &Tenant,
464 0 : ) -> Result<HashSet<TimelineId>, Error> {
465 0 : let mut all_direct_children = tenant
466 0 : .timelines
467 0 : .lock()
468 0 : .unwrap()
469 0 : .values()
470 0 : .filter_map(|tl| {
471 0 : let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
472 :
473 0 : if is_direct_child {
474 0 : Some(tl.clone())
475 : } else {
476 0 : if let Some(timeline) = tl.ancestor_timeline.as_ref() {
477 0 : assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
478 0 : }
479 0 : None
480 : }
481 0 : })
482 0 : // Collect to avoid lock taking order problem with Tenant::timelines and
483 0 : // Timeline::remote_client
484 0 : .collect::<Vec<_>>();
485 0 :
486 0 : let mut any_shutdown = false;
487 0 :
488 0 : all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
489 0 : Ok(accessor) => accessor
490 0 : .latest_uploaded_index_part()
491 0 : .lineage
492 0 : .is_reparented(),
493 0 : Err(_shutdownalike) => {
494 0 : // not 100% a shutdown, but let's bail early not to give inconsistent results in
495 0 : // sharded enviroment.
496 0 : any_shutdown = true;
497 0 : true
498 : }
499 0 : });
500 0 :
501 0 : if any_shutdown {
502 : // it could be one or many being deleted; have client retry
503 0 : return Err(Error::ShuttingDown);
504 0 : }
505 0 :
506 0 : Ok(all_direct_children
507 0 : .into_iter()
508 0 : .map(|tl| tl.timeline_id)
509 0 : .collect())
510 0 : }
511 :
512 0 : fn partition_work(
513 0 : ancestor_lsn: Lsn,
514 0 : source: &LayerManager,
515 0 : ) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
516 0 : let mut straddling_branchpoint = vec![];
517 0 : let mut rest_of_historic = vec![];
518 0 :
519 0 : let mut later_by_lsn = 0;
520 :
521 0 : for desc in source.layer_map()?.iter_historic_layers() {
522 : // off by one chances here:
523 : // - start is inclusive
524 : // - end is exclusive
525 0 : if desc.lsn_range.start > ancestor_lsn {
526 0 : later_by_lsn += 1;
527 0 : continue;
528 0 : }
529 :
530 0 : let target = if desc.lsn_range.start <= ancestor_lsn
531 0 : && desc.lsn_range.end > ancestor_lsn
532 0 : && desc.is_delta
533 : {
534 : // TODO: image layer at Lsn optimization
535 0 : &mut straddling_branchpoint
536 : } else {
537 0 : &mut rest_of_historic
538 : };
539 :
540 0 : target.push(source.get_from_desc(&desc));
541 : }
542 :
543 0 : Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
544 0 : }
545 :
546 0 : async fn upload_rewritten_layer(
547 0 : end_lsn: Lsn,
548 0 : layer: &Layer,
549 0 : target: &Arc<Timeline>,
550 0 : cancel: &CancellationToken,
551 0 : ctx: &RequestContext,
552 0 : ) -> Result<Option<Layer>, Error> {
553 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
554 :
555 0 : let Some(copied) = copied else {
556 0 : return Ok(None);
557 : };
558 :
559 0 : target
560 0 : .remote_client
561 0 : .upload_layer_file(&copied, cancel)
562 0 : .await
563 0 : .map_err(|e| Error::launder(e, Error::Prepare))?;
564 :
565 0 : Ok(Some(copied.into()))
566 0 : }
567 :
568 0 : async fn copy_lsn_prefix(
569 0 : end_lsn: Lsn,
570 0 : layer: &Layer,
571 0 : target_timeline: &Arc<Timeline>,
572 0 : ctx: &RequestContext,
573 0 : ) -> Result<Option<ResidentLayer>, Error> {
574 0 : if target_timeline.cancel.is_cancelled() {
575 0 : return Err(Error::ShuttingDown);
576 0 : }
577 0 :
578 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
579 :
580 0 : let mut writer = DeltaLayerWriter::new(
581 0 : target_timeline.conf,
582 0 : target_timeline.timeline_id,
583 0 : target_timeline.tenant_shard_id,
584 0 : layer.layer_desc().key_range.start,
585 0 : layer.layer_desc().lsn_range.start..end_lsn,
586 0 : ctx,
587 0 : )
588 0 : .await
589 0 : .with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
590 0 : .map_err(Error::Prepare)?;
591 :
592 0 : let resident = layer.download_and_keep_resident().await.map_err(|e| {
593 0 : if e.is_cancelled() {
594 0 : Error::ShuttingDown
595 : } else {
596 0 : Error::Prepare(e.into())
597 : }
598 0 : })?;
599 :
600 0 : let records = resident
601 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
602 0 : .await
603 0 : .with_context(|| format!("copy lsn prefix of ancestors {layer}"))
604 0 : .map_err(Error::Prepare)?;
605 :
606 0 : drop(resident);
607 0 :
608 0 : tracing::debug!(%layer, records, "copied records");
609 :
610 0 : if records == 0 {
611 0 : drop(writer);
612 0 : // TODO: we might want to store an empty marker in remote storage for this
613 0 : // layer so that we will not needlessly walk `layer` on repeated attempts.
614 0 : Ok(None)
615 : } else {
616 : // reuse the key instead of adding more holes between layers by using the real
617 : // highest key in the layer.
618 0 : let reused_highest_key = layer.layer_desc().key_range.end;
619 0 : let (desc, path) = writer
620 0 : .finish(reused_highest_key, ctx)
621 0 : .await
622 0 : .map_err(Error::Prepare)?;
623 0 : let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
624 0 : .map_err(Error::Prepare)?;
625 :
626 0 : tracing::debug!(%layer, %copied, "new layer produced");
627 :
628 0 : Ok(Some(copied))
629 : }
630 0 : }
631 :
632 : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
633 : /// storage on successful return without the adopted layer being added to `index_part.json`.
634 0 : async fn remote_copy(
635 0 : adopted: &Layer,
636 0 : adoptee: &Arc<Timeline>,
637 0 : generation: Generation,
638 0 : shard_identity: ShardIdentity,
639 0 : cancel: &CancellationToken,
640 0 : ) -> Result<Layer, Error> {
641 0 : // depending if Layer::keep_resident we could hardlink
642 0 :
643 0 : let mut metadata = adopted.metadata();
644 0 : debug_assert!(metadata.generation <= generation);
645 0 : metadata.generation = generation;
646 0 : metadata.shard = shard_identity.shard_index();
647 0 :
648 0 : let owned = crate::tenant::storage_layer::Layer::for_evicted(
649 0 : adoptee.conf,
650 0 : adoptee,
651 0 : adopted.layer_desc().layer_name(),
652 0 : metadata,
653 0 : );
654 0 :
655 0 : adoptee
656 0 : .remote_client
657 0 : .copy_timeline_layer(adopted, &owned, cancel)
658 0 : .await
659 0 : .map(move |()| owned)
660 0 : .map_err(|e| Error::launder(e, Error::Prepare))
661 0 : }
662 :
663 : pub(crate) enum DetachingAndReparenting {
664 : /// All of the following timeline ids were reparented and the timeline ancestor detach must be
665 : /// marked as completed.
666 : Reparented(HashSet<TimelineId>),
667 :
668 : /// Some of the reparentings failed. The timeline ancestor detach must **not** be marked as
669 : /// completed.
670 : ///
671 : /// Nested `must_reset_tenant` is set to true when any restart requiring changes were made.
672 : SomeReparentingFailed { must_reset_tenant: bool },
673 :
674 : /// Detaching and reparentings were completed in a previous attempt. Timeline ancestor detach
675 : /// must be marked as completed.
676 : AlreadyDone(HashSet<TimelineId>),
677 : }
678 :
679 : impl DetachingAndReparenting {
680 0 : pub(crate) fn reset_tenant_required(&self) -> bool {
681 : use DetachingAndReparenting::*;
682 0 : match self {
683 0 : Reparented(_) => true,
684 0 : SomeReparentingFailed { must_reset_tenant } => *must_reset_tenant,
685 0 : AlreadyDone(_) => false,
686 : }
687 0 : }
688 :
689 0 : pub(crate) fn completed(self) -> Option<HashSet<TimelineId>> {
690 : use DetachingAndReparenting::*;
691 0 : match self {
692 0 : Reparented(x) | AlreadyDone(x) => Some(x),
693 0 : SomeReparentingFailed { .. } => None,
694 : }
695 0 : }
696 : }
697 :
698 : /// See [`Timeline::detach_from_ancestor_and_reparent`].
699 0 : pub(super) async fn detach_and_reparent(
700 0 : detached: &Arc<Timeline>,
701 0 : tenant: &Tenant,
702 0 : prepared: PreparedTimelineDetach,
703 0 : _ctx: &RequestContext,
704 0 : ) -> Result<DetachingAndReparenting, Error> {
705 0 : let PreparedTimelineDetach { layers } = prepared;
706 :
707 : #[derive(Debug)]
708 : enum Ancestor {
709 : NotDetached(Arc<Timeline>, Lsn),
710 : Detached(Arc<Timeline>, Lsn),
711 : }
712 :
713 0 : let (recorded_branchpoint, still_ongoing) = {
714 0 : let access = detached.remote_client.initialized_upload_queue()?;
715 0 : let latest = access.latest_uploaded_index_part();
716 0 :
717 0 : (
718 0 : latest.lineage.detached_previous_ancestor(),
719 0 : latest
720 0 : .gc_blocking
721 0 : .as_ref()
722 0 : .is_some_and(|b| b.blocked_by(DetachAncestor)),
723 0 : )
724 0 : };
725 0 : assert!(
726 0 : still_ongoing,
727 0 : "cannot (detach? reparent)? complete if the operation is not still ongoing"
728 : );
729 :
730 0 : let ancestor = match (detached.ancestor_timeline.as_ref(), recorded_branchpoint) {
731 0 : (Some(ancestor), None) => {
732 0 : assert!(
733 0 : !layers.is_empty(),
734 0 : "there should always be at least one layer to inherit"
735 : );
736 0 : Ancestor::NotDetached(ancestor.clone(), detached.ancestor_lsn)
737 : }
738 : (Some(_), Some(_)) => {
739 0 : panic!(
740 0 : "it should be impossible to get to here without having gone through the tenant reset; if the tenant was reset, then the ancestor_timeline would be None"
741 0 : );
742 : }
743 0 : (None, Some((ancestor_id, ancestor_lsn))) => {
744 0 : // it has been either:
745 0 : // - detached but still exists => we can try reparenting
746 0 : // - detached and deleted
747 0 : //
748 0 : // either way, we must complete
749 0 : assert!(
750 0 : layers.is_empty(),
751 0 : "no layers should had been copied as detach is done"
752 : );
753 :
754 0 : let existing = tenant.timelines.lock().unwrap().get(&ancestor_id).cloned();
755 :
756 0 : if let Some(ancestor) = existing {
757 0 : Ancestor::Detached(ancestor, ancestor_lsn)
758 : } else {
759 0 : let direct_children = reparented_direct_children(detached, tenant)?;
760 0 : return Ok(DetachingAndReparenting::AlreadyDone(direct_children));
761 : }
762 : }
763 : (None, None) => {
764 : // TODO: make sure there are no `?` before tenant_reset from after a questionmark from
765 : // here.
766 0 : panic!(
767 0 : "bug: detach_and_reparent called on a timeline which has not been detached or which has no live ancestor"
768 0 : );
769 : }
770 : };
771 :
772 : // publish the prepared layers before we reparent any of the timelines, so that on restart
773 : // reparented timelines find layers. also do the actual detaching.
774 : //
775 : // if we crash after this operation, a retry will allow reparenting the remaining timelines as
776 : // gc is blocked.
777 :
778 0 : let (ancestor, ancestor_lsn, was_detached) = match ancestor {
779 0 : Ancestor::NotDetached(ancestor, ancestor_lsn) => {
780 0 : // this has to complete before any reparentings because otherwise they would not have
781 0 : // layers on the new parent.
782 0 : detached
783 0 : .remote_client
784 0 : .schedule_adding_existing_layers_to_index_detach_and_wait(
785 0 : &layers,
786 0 : (ancestor.timeline_id, ancestor_lsn),
787 0 : )
788 0 : .await
789 0 : .context("publish layers and detach ancestor")
790 0 : .map_err(|e| Error::launder(e, Error::DetachReparent))?;
791 :
792 0 : tracing::info!(
793 0 : ancestor=%ancestor.timeline_id,
794 0 : %ancestor_lsn,
795 0 : inherited_layers=%layers.len(),
796 0 : "detached from ancestor"
797 : );
798 0 : (ancestor, ancestor_lsn, true)
799 : }
800 0 : Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn, false),
801 : };
802 :
803 0 : let mut tasks = tokio::task::JoinSet::new();
804 0 :
805 0 : // Returns a single permit semaphore which will be used to make one reparenting succeed,
806 0 : // others will fail as if those timelines had been stopped for whatever reason.
807 0 : #[cfg(feature = "testing")]
808 0 : let failpoint_sem = || -> Option<Arc<Semaphore>> {
809 0 : fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
810 0 : Arc::new(Semaphore::new(1))
811 0 : ));
812 0 : None
813 0 : }();
814 0 :
815 0 : // because we are now keeping the slot in progress, it is unlikely that there will be any
816 0 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
817 0 : {
818 0 : let g = tenant.timelines.lock().unwrap();
819 0 : reparentable_timelines(g.values(), detached, &ancestor, ancestor_lsn)
820 0 : .cloned()
821 0 : .for_each(|timeline| {
822 : // important in this scope: we are holding the Tenant::timelines lock
823 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
824 0 : let new_parent = detached.timeline_id;
825 0 : #[cfg(feature = "testing")]
826 0 : let failpoint_sem = failpoint_sem.clone();
827 0 :
828 0 : tasks.spawn(
829 0 : async move {
830 0 : let res = async {
831 : #[cfg(feature = "testing")]
832 0 : if let Some(failpoint_sem) = failpoint_sem {
833 0 : let _permit = failpoint_sem.acquire().await.map_err(|_| {
834 0 : anyhow::anyhow!(
835 0 : "failpoint: timeline-detach-ancestor::allow_one_reparented",
836 0 : )
837 0 : })?;
838 0 : failpoint_sem.close();
839 0 : }
840 :
841 0 : timeline
842 0 : .remote_client
843 0 : .schedule_reparenting_and_wait(&new_parent)
844 0 : .await
845 0 : }
846 0 : .await;
847 :
848 0 : match res {
849 : Ok(()) => {
850 0 : tracing::info!("reparented");
851 0 : Some(timeline)
852 : }
853 0 : Err(e) => {
854 0 : // with the use of tenant slot, raced timeline deletion is the most
855 0 : // likely reason.
856 0 : tracing::warn!("reparenting failed: {e:#}");
857 0 : None
858 : }
859 : }
860 0 : }
861 0 : .instrument(span),
862 0 : );
863 0 : });
864 0 : }
865 0 :
866 0 : let reparenting_candidates = tasks.len();
867 0 : let mut reparented = HashSet::with_capacity(tasks.len());
868 :
869 0 : while let Some(res) = tasks.join_next().await {
870 0 : match res {
871 0 : Ok(Some(timeline)) => {
872 0 : assert!(
873 0 : reparented.insert(timeline.timeline_id),
874 0 : "duplicate reparenting? timeline_id={}",
875 0 : timeline.timeline_id
876 : );
877 : }
878 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
879 : // just ignore failures now, we can retry
880 0 : Ok(None) => {}
881 0 : Err(je) if je.is_panic() => {}
882 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
883 : }
884 : }
885 :
886 0 : let reparented_all = reparenting_candidates == reparented.len();
887 0 :
888 0 : if reparented_all {
889 0 : Ok(DetachingAndReparenting::Reparented(reparented))
890 : } else {
891 0 : tracing::info!(
892 0 : reparented = reparented.len(),
893 0 : candidates = reparenting_candidates,
894 0 : "failed to reparent all candidates; they can be retried after the tenant_reset",
895 : );
896 :
897 0 : let must_reset_tenant = !reparented.is_empty() || was_detached;
898 0 : Ok(DetachingAndReparenting::SomeReparentingFailed { must_reset_tenant })
899 : }
900 0 : }
901 :
902 0 : pub(super) async fn complete(
903 0 : detached: &Arc<Timeline>,
904 0 : tenant: &Tenant,
905 0 : mut attempt: Attempt,
906 0 : _ctx: &RequestContext,
907 0 : ) -> Result<(), Error> {
908 0 : assert_eq!(detached.timeline_id, attempt.timeline_id);
909 :
910 0 : if attempt.gate_entered.is_none() {
911 0 : let entered = detached.gate.enter().map_err(|_| Error::ShuttingDown)?;
912 0 : attempt.gate_entered = Some(entered);
913 0 : } else {
914 0 : // Some(gate_entered) means the tenant was not restarted, as is not required
915 0 : }
916 :
917 0 : assert!(detached.ancestor_timeline.is_none());
918 :
919 : // this should be an 503 at least...?
920 0 : fail::fail_point!(
921 0 : "timeline-detach-ancestor::complete_before_uploading",
922 0 : |_| Err(Error::Failpoint(
923 0 : "timeline-detach-ancestor::complete_before_uploading"
924 0 : ))
925 0 : );
926 :
927 0 : tenant
928 0 : .gc_block
929 0 : .remove(
930 0 : detached,
931 0 : crate::tenant::remote_timeline_client::index::GcBlockingReason::DetachAncestor,
932 0 : )
933 0 : .await
934 0 : .map_err(|e| Error::launder(e, Error::Complete))?;
935 :
936 0 : Ok(())
937 0 : }
938 :
939 : /// Query against a locked `Tenant::timelines`.
940 0 : fn reparentable_timelines<'a, I>(
941 0 : timelines: I,
942 0 : detached: &'a Arc<Timeline>,
943 0 : ancestor: &'a Arc<Timeline>,
944 0 : ancestor_lsn: Lsn,
945 0 : ) -> impl Iterator<Item = &'a Arc<Timeline>> + 'a
946 0 : where
947 0 : I: Iterator<Item = &'a Arc<Timeline>> + 'a,
948 0 : {
949 0 : timelines.filter_map(move |tl| {
950 0 : if Arc::ptr_eq(tl, detached) {
951 0 : return None;
952 0 : }
953 :
954 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
955 0 : let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
956 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
957 0 :
958 0 : let is_deleting = tl
959 0 : .delete_progress
960 0 : .try_lock()
961 0 : .map(|flow| !flow.is_not_started())
962 0 : .unwrap_or(true);
963 0 :
964 0 : if is_same && is_earlier && !is_deleting {
965 0 : Some(tl)
966 : } else {
967 0 : None
968 : }
969 0 : })
970 0 : }
971 :
972 0 : fn check_no_archived_children_of_ancestor(
973 0 : tenant: &Tenant,
974 0 : detached: &Arc<Timeline>,
975 0 : ancestor: &Arc<Timeline>,
976 0 : ancestor_lsn: Lsn,
977 0 : ) -> Result<(), Error> {
978 0 : let timelines = tenant.timelines.lock().unwrap();
979 0 : let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
980 0 : for timeline in reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn) {
981 0 : if timeline.is_archived() == Some(true) {
982 0 : return Err(Error::Archived(timeline.timeline_id));
983 0 : }
984 : }
985 0 : for timeline_offloaded in timelines_offloaded.values() {
986 0 : if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
987 0 : continue;
988 0 : }
989 : // This forbids the detach ancestor feature if flattened timelines are present,
990 : // even if the ancestor_lsn is from after the branchpoint of the detached timeline.
991 : // But as per current design, we don't record the ancestor_lsn of flattened timelines.
992 : // This is a bit unfortunate, but as of writing this we don't support flattening
993 : // anyway. Maybe we can evolve the data model in the future.
994 0 : if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
995 0 : let is_earlier = retain_lsn <= ancestor_lsn;
996 0 : if !is_earlier {
997 0 : continue;
998 0 : }
999 0 : }
1000 0 : return Err(Error::Archived(timeline_offloaded.timeline_id));
1001 : }
1002 0 : Ok(())
1003 0 : }
|