Line data Source code
1 : use std::sync::Arc;
2 :
3 : use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
4 : use crate::{
5 : context::{DownloadBehavior, RequestContext},
6 : task_mgr::TaskKind,
7 : tenant::{
8 : storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
9 : Tenant,
10 : },
11 : virtual_file::{MaybeFatalIo, VirtualFile},
12 : };
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::Instrument;
15 : use utils::{completion, generation::Generation, http::error::ApiError, id::TimelineId, lsn::Lsn};
16 :
17 0 : #[derive(Debug, thiserror::Error)]
18 : pub(crate) enum Error {
19 : #[error("no ancestors")]
20 : NoAncestor,
21 : #[error("too many ancestors")]
22 : TooManyAncestors,
23 : #[error("shutting down, please retry later")]
24 : ShuttingDown,
25 : #[error("flushing failed")]
26 : FlushAncestor(#[source] FlushLayerError),
27 : #[error("layer download failed")]
28 : RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
29 : #[error("copying LSN prefix locally failed")]
30 : CopyDeltaPrefix(#[source] anyhow::Error),
31 : #[error("upload rewritten layer")]
32 : UploadRewritten(#[source] anyhow::Error),
33 :
34 : #[error("ancestor is already being detached by: {}", .0)]
35 : OtherTimelineDetachOngoing(TimelineId),
36 :
37 : #[error("remote copying layer failed")]
38 : CopyFailed(#[source] anyhow::Error),
39 :
40 : #[error("unexpected error")]
41 : Unexpected(#[source] anyhow::Error),
42 : }
43 :
44 : impl From<Error> for ApiError {
45 0 : fn from(value: Error) -> Self {
46 0 : match value {
47 0 : e @ Error::NoAncestor => ApiError::Conflict(e.to_string()),
48 : // TODO: ApiError converts the anyhow using debug formatting ... just stop using ApiError?
49 0 : e @ Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{}", e)),
50 0 : Error::ShuttingDown => ApiError::ShuttingDown,
51 : Error::OtherTimelineDetachOngoing(_) => {
52 0 : ApiError::ResourceUnavailable("other timeline detach is already ongoing".into())
53 : }
54 : // All of these contain shutdown errors, in fact, it's the most common
55 0 : e @ Error::FlushAncestor(_)
56 0 : | e @ Error::RewrittenDeltaDownloadFailed(_)
57 0 : | e @ Error::CopyDeltaPrefix(_)
58 0 : | e @ Error::UploadRewritten(_)
59 0 : | e @ Error::CopyFailed(_)
60 0 : | e @ Error::Unexpected(_) => ApiError::InternalServerError(e.into()),
61 : }
62 0 : }
63 : }
64 :
65 : pub(crate) struct PreparedTimelineDetach {
66 : layers: Vec<Layer>,
67 : }
68 :
69 : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
70 : #[derive(Debug)]
71 : pub(crate) struct Options {
72 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
73 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
74 : }
75 :
76 : impl Default for Options {
77 0 : fn default() -> Self {
78 0 : Self {
79 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
80 0 : copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
81 0 : }
82 0 : }
83 : }
84 :
85 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
86 0 : pub(super) async fn prepare(
87 0 : detached: &Arc<Timeline>,
88 0 : tenant: &Tenant,
89 0 : options: Options,
90 0 : ctx: &RequestContext,
91 0 : ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
92 : use Error::*;
93 :
94 0 : let Some((ancestor, ancestor_lsn)) = detached
95 0 : .ancestor_timeline
96 0 : .as_ref()
97 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
98 : else {
99 : // TODO: check if we have already been detached; for this we need to read the stored data
100 : // on remote client, for that we need a follow-up which makes uploads cheaper and maintains
101 : // a projection of the commited data.
102 : //
103 : // the error is wrong per openapi
104 0 : return Err(NoAncestor);
105 : };
106 :
107 0 : if !ancestor_lsn.is_valid() {
108 0 : return Err(NoAncestor);
109 0 : }
110 0 :
111 0 : if ancestor.ancestor_timeline.is_some() {
112 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
113 : // not to, at least initially
114 0 : return Err(TooManyAncestors);
115 0 : }
116 0 :
117 0 : // before we acquire the gate, we must mark the ancestor as having a detach operation
118 0 : // ongoing which will block other concurrent detach operations so we don't get to ackward
119 0 : // situations where there would be two branches trying to reparent earlier branches.
120 0 : let (guard, barrier) = completion::channel();
121 0 :
122 0 : {
123 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
124 0 : if let Some((tl, other)) = guard.as_ref() {
125 0 : if !other.is_ready() {
126 0 : return Err(OtherTimelineDetachOngoing(*tl));
127 0 : }
128 0 : }
129 0 : *guard = Some((detached.timeline_id, barrier));
130 : }
131 :
132 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
133 :
134 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
135 0 : let span =
136 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
137 0 : async {
138 0 : let started_at = std::time::Instant::now();
139 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
140 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
141 :
142 0 : let res =
143 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
144 0 : .await;
145 :
146 0 : let res = match res {
147 0 : Ok(res) => res,
148 0 : Err(_elapsed) => {
149 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
150 0 : freeze_and_flush.await
151 : }
152 : };
153 :
154 0 : res.map_err(FlushAncestor)?;
155 :
156 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
157 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
158 0 : tracing::info!(
159 0 : elapsed_ms = started_at.elapsed().as_millis(),
160 0 : "froze and flushed the ancestor"
161 : );
162 0 : Ok(())
163 0 : }
164 0 : .instrument(span)
165 0 : .await?;
166 0 : }
167 :
168 0 : let end_lsn = ancestor_lsn + 1;
169 :
170 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
171 0 : // we do not need to start from our layers, because they can only be layers that come
172 0 : // *after* ancestor_lsn
173 0 : let layers = tokio::select! {
174 : guard = ancestor.layers.read() => guard,
175 : _ = detached.cancel.cancelled() => {
176 : return Err(ShuttingDown);
177 : }
178 : _ = ancestor.cancel.cancelled() => {
179 : return Err(ShuttingDown);
180 : }
181 0 : };
182 0 :
183 0 : // between retries, these can change if compaction or gc ran in between. this will mean
184 0 : // we have to redo work.
185 0 : partition_work(ancestor_lsn, &layers)
186 0 : };
187 0 :
188 0 : // TODO: layers are already sorted by something: use that to determine how much of remote
189 0 : // copies are already done.
190 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
191 :
192 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
193 0 : let mut new_layers: Vec<Layer> =
194 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
195 0 :
196 0 : {
197 0 : tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
198 :
199 0 : let mut tasks = tokio::task::JoinSet::new();
200 0 :
201 0 : let mut wrote_any = false;
202 0 :
203 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(
204 0 : options.rewrite_concurrency.get(),
205 0 : ));
206 :
207 0 : for layer in straddling_branchpoint {
208 0 : let limiter = limiter.clone();
209 0 : let timeline = detached.clone();
210 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
211 0 :
212 0 : tasks.spawn(async move {
213 0 : let _permit = limiter.acquire().await;
214 0 : let copied =
215 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
216 0 : .await?;
217 0 : Ok(copied)
218 0 : });
219 0 : }
220 :
221 0 : while let Some(res) = tasks.join_next().await {
222 0 : match res {
223 0 : Ok(Ok(Some(copied))) => {
224 0 : wrote_any = true;
225 0 : tracing::info!(layer=%copied, "rewrote and uploaded");
226 0 : new_layers.push(copied);
227 : }
228 0 : Ok(Ok(None)) => {}
229 0 : Ok(Err(e)) => return Err(e),
230 0 : Err(je) => return Err(Unexpected(je.into())),
231 : }
232 : }
233 :
234 : // FIXME: the fsync should be mandatory, after both rewrites and copies
235 0 : if wrote_any {
236 0 : let timeline_dir = VirtualFile::open(
237 0 : &detached
238 0 : .conf
239 0 : .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
240 0 : ctx,
241 0 : )
242 0 : .await
243 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
244 0 : timeline_dir
245 0 : .sync_all()
246 0 : .await
247 0 : .fatal_err("VirtualFile::sync_all timeline dir");
248 0 : }
249 : }
250 :
251 0 : let mut tasks = tokio::task::JoinSet::new();
252 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
253 :
254 0 : for adopted in rest_of_historic {
255 0 : let limiter = limiter.clone();
256 0 : let timeline = detached.clone();
257 0 :
258 0 : tasks.spawn(
259 0 : async move {
260 0 : let _permit = limiter.acquire().await;
261 0 : let owned =
262 0 : remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
263 0 : tracing::info!(layer=%owned, "remote copied");
264 0 : Ok(owned)
265 0 : }
266 0 : .in_current_span(),
267 0 : );
268 0 : }
269 :
270 0 : while let Some(res) = tasks.join_next().await {
271 0 : match res {
272 0 : Ok(Ok(owned)) => {
273 0 : new_layers.push(owned);
274 0 : }
275 0 : Ok(Err(failed)) => {
276 0 : return Err(failed);
277 : }
278 0 : Err(je) => return Err(Unexpected(je.into())),
279 : }
280 : }
281 :
282 : // TODO: fsync directory again if we hardlinked something
283 :
284 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
285 0 :
286 0 : Ok((guard, prepared))
287 0 : }
288 :
289 0 : fn partition_work(
290 0 : ancestor_lsn: Lsn,
291 0 : source_layermap: &LayerManager,
292 0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
293 0 : let mut straddling_branchpoint = vec![];
294 0 : let mut rest_of_historic = vec![];
295 0 :
296 0 : let mut later_by_lsn = 0;
297 :
298 0 : for desc in source_layermap.layer_map().iter_historic_layers() {
299 : // off by one chances here:
300 : // - start is inclusive
301 : // - end is exclusive
302 0 : if desc.lsn_range.start > ancestor_lsn {
303 0 : later_by_lsn += 1;
304 0 : continue;
305 0 : }
306 :
307 0 : let target = if desc.lsn_range.start <= ancestor_lsn
308 0 : && desc.lsn_range.end > ancestor_lsn
309 0 : && desc.is_delta
310 : {
311 : // TODO: image layer at Lsn optimization
312 0 : &mut straddling_branchpoint
313 : } else {
314 0 : &mut rest_of_historic
315 : };
316 :
317 0 : target.push(source_layermap.get_from_desc(&desc));
318 : }
319 :
320 0 : (later_by_lsn, straddling_branchpoint, rest_of_historic)
321 0 : }
322 :
323 0 : async fn upload_rewritten_layer(
324 0 : end_lsn: Lsn,
325 0 : layer: &Layer,
326 0 : target: &Arc<Timeline>,
327 0 : cancel: &CancellationToken,
328 0 : ctx: &RequestContext,
329 0 : ) -> Result<Option<Layer>, Error> {
330 : use Error::UploadRewritten;
331 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
332 :
333 0 : let Some(copied) = copied else {
334 0 : return Ok(None);
335 : };
336 :
337 : // FIXME: better shuttingdown error
338 0 : target
339 0 : .remote_client
340 0 : .upload_layer_file(&copied, cancel)
341 0 : .await
342 0 : .map_err(UploadRewritten)?;
343 :
344 0 : Ok(Some(copied.into()))
345 0 : }
346 :
347 0 : async fn copy_lsn_prefix(
348 0 : end_lsn: Lsn,
349 0 : layer: &Layer,
350 0 : target_timeline: &Arc<Timeline>,
351 0 : ctx: &RequestContext,
352 0 : ) -> Result<Option<ResidentLayer>, Error> {
353 0 : use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
354 0 :
355 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
356 :
357 0 : let mut writer = DeltaLayerWriter::new(
358 0 : target_timeline.conf,
359 0 : target_timeline.timeline_id,
360 0 : target_timeline.tenant_shard_id,
361 0 : layer.layer_desc().key_range.start,
362 0 : layer.layer_desc().lsn_range.start..end_lsn,
363 0 : ctx,
364 0 : )
365 0 : .await
366 0 : .map_err(CopyDeltaPrefix)?;
367 :
368 0 : let resident = layer
369 0 : .download_and_keep_resident()
370 0 : .await
371 : // likely shutdown
372 0 : .map_err(RewrittenDeltaDownloadFailed)?;
373 :
374 0 : let records = resident
375 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
376 0 : .await
377 0 : .map_err(CopyDeltaPrefix)?;
378 :
379 0 : drop(resident);
380 0 :
381 0 : tracing::debug!(%layer, records, "copied records");
382 :
383 0 : if records == 0 {
384 0 : drop(writer);
385 0 : // TODO: we might want to store an empty marker in remote storage for this
386 0 : // layer so that we will not needlessly walk `layer` on repeated attempts.
387 0 : Ok(None)
388 : } else {
389 : // reuse the key instead of adding more holes between layers by using the real
390 : // highest key in the layer.
391 0 : let reused_highest_key = layer.layer_desc().key_range.end;
392 0 : let copied = writer
393 0 : .finish(reused_highest_key, target_timeline, ctx)
394 0 : .await
395 0 : .map_err(CopyDeltaPrefix)?;
396 :
397 0 : tracing::debug!(%layer, %copied, "new layer produced");
398 :
399 0 : Ok(Some(copied))
400 : }
401 0 : }
402 :
403 : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
404 : /// storage on successful return without the adopted layer being added to `index_part.json`.
405 0 : async fn remote_copy(
406 0 : adopted: &Layer,
407 0 : adoptee: &Arc<Timeline>,
408 0 : generation: Generation,
409 0 : cancel: &CancellationToken,
410 0 : ) -> Result<Layer, Error> {
411 0 : use Error::CopyFailed;
412 0 :
413 0 : // depending if Layer::keep_resident we could hardlink
414 0 :
415 0 : let mut metadata = adopted.metadata();
416 0 : debug_assert!(metadata.generation <= generation);
417 0 : metadata.generation = generation;
418 0 :
419 0 : let owned = crate::tenant::storage_layer::Layer::for_evicted(
420 0 : adoptee.conf,
421 0 : adoptee,
422 0 : adopted.layer_desc().layer_name(),
423 0 : metadata,
424 0 : );
425 0 :
426 0 : // FIXME: better shuttingdown error
427 0 : adoptee
428 0 : .remote_client
429 0 : .copy_timeline_layer(adopted, &owned, cancel)
430 0 : .await
431 0 : .map(move |()| owned)
432 0 : .map_err(CopyFailed)
433 0 : }
434 :
435 : /// See [`Timeline::complete_detaching_timeline_ancestor`].
436 0 : pub(super) async fn complete(
437 0 : detached: &Arc<Timeline>,
438 0 : tenant: &Tenant,
439 0 : prepared: PreparedTimelineDetach,
440 0 : _ctx: &RequestContext,
441 0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
442 0 : let PreparedTimelineDetach { layers } = prepared;
443 0 :
444 0 : let ancestor = detached
445 0 : .get_ancestor_timeline()
446 0 : .expect("must still have a ancestor");
447 0 : let ancestor_lsn = detached.get_ancestor_lsn();
448 0 :
449 0 : // publish the prepared layers before we reparent any of the timelines, so that on restart
450 0 : // reparented timelines find layers. also do the actual detaching.
451 0 : //
452 0 : // if we crash after this operation, we will at least come up having detached a timeline, but
453 0 : // we cannot go back and reparent the timelines which would had been reparented in normal
454 0 : // execution.
455 0 : //
456 0 : // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
457 0 : // which could give us a completely wrong layer combination.
458 0 : detached
459 0 : .remote_client
460 0 : .schedule_adding_existing_layers_to_index_detach_and_wait(
461 0 : &layers,
462 0 : (ancestor.timeline_id, ancestor_lsn),
463 0 : )
464 0 : .await?;
465 :
466 0 : let mut tasks = tokio::task::JoinSet::new();
467 0 :
468 0 : // because we are now keeping the slot in progress, it is unlikely that there will be any
469 0 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
470 0 : tenant
471 0 : .timelines
472 0 : .lock()
473 0 : .unwrap()
474 0 : .values()
475 0 : .filter_map(|tl| {
476 0 : if Arc::ptr_eq(tl, detached) {
477 0 : return None;
478 0 : }
479 0 :
480 0 : if !tl.is_active() {
481 0 : return None;
482 0 : }
483 :
484 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
485 0 : let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
486 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
487 0 :
488 0 : let is_deleting = tl
489 0 : .delete_progress
490 0 : .try_lock()
491 0 : .map(|flow| !flow.is_not_started())
492 0 : .unwrap_or(true);
493 0 :
494 0 : if is_same && is_earlier && !is_deleting {
495 0 : Some(tl.clone())
496 : } else {
497 0 : None
498 : }
499 0 : })
500 0 : .for_each(|timeline| {
501 : // important in this scope: we are holding the Tenant::timelines lock
502 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
503 0 : let new_parent = detached.timeline_id;
504 0 :
505 0 : tasks.spawn(
506 0 : async move {
507 0 : let res = timeline
508 0 : .remote_client
509 0 : .schedule_reparenting_and_wait(&new_parent)
510 0 : .await;
511 :
512 0 : match res {
513 0 : Ok(()) => Some(timeline),
514 0 : Err(e) => {
515 0 : // with the use of tenant slot, we no longer expect these.
516 0 : tracing::warn!("reparenting failed: {e:#}");
517 0 : None
518 : }
519 : }
520 0 : }
521 0 : .instrument(span),
522 0 : );
523 0 : });
524 0 :
525 0 : let reparenting_candidates = tasks.len();
526 0 : let mut reparented = Vec::with_capacity(tasks.len());
527 :
528 0 : while let Some(res) = tasks.join_next().await {
529 0 : match res {
530 0 : Ok(Some(timeline)) => {
531 0 : tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
532 0 : reparented.push(timeline.timeline_id);
533 : }
534 0 : Ok(None) => {
535 0 : // lets just ignore this for now. one or all reparented timelines could had
536 0 : // started deletion, and that is fine.
537 0 : }
538 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
539 0 : Err(je) if je.is_panic() => {
540 0 : // ignore; it's better to continue with a single reparenting failing (or even
541 0 : // all of them) in order to get to the goal state.
542 0 : //
543 0 : // these timelines will never be reparentable, but they can be always detached as
544 0 : // separate tree roots.
545 0 : }
546 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
547 : }
548 : }
549 :
550 0 : if reparenting_candidates != reparented.len() {
551 0 : tracing::info!("failed to reparent some candidates");
552 0 : }
553 :
554 0 : Ok(reparented)
555 0 : }
|