Line data Source code
1 : use std::sync::Arc;
2 :
3 : use super::{layer_manager::LayerManager, Timeline};
4 : use crate::{
5 : context::{DownloadBehavior, RequestContext},
6 : task_mgr::TaskKind,
7 : tenant::{
8 : storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
9 : Tenant,
10 : },
11 : virtual_file::{MaybeFatalIo, VirtualFile},
12 : };
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::Instrument;
15 : use utils::{completion, generation::Generation, id::TimelineId, lsn::Lsn};
16 :
17 0 : #[derive(Debug, thiserror::Error)]
18 : pub(crate) enum Error {
19 : #[error("no ancestors")]
20 : NoAncestor,
21 : #[error("too many ancestors")]
22 : TooManyAncestors,
23 : #[error("shutting down, please retry later")]
24 : ShuttingDown,
25 : #[error("detached timeline must receive writes before the operation")]
26 : DetachedTimelineNeedsWrites,
27 : #[error("flushing failed")]
28 : FlushAncestor(#[source] anyhow::Error),
29 : #[error("layer download failed")]
30 : RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
31 : #[error("copying LSN prefix locally failed")]
32 : CopyDeltaPrefix(#[source] anyhow::Error),
33 : #[error("upload rewritten layer")]
34 : UploadRewritten(#[source] anyhow::Error),
35 :
36 : #[error("ancestor is already being detached by: {}", .0)]
37 : OtherTimelineDetachOngoing(TimelineId),
38 :
39 : #[error("remote copying layer failed")]
40 : CopyFailed(#[source] anyhow::Error),
41 :
42 : #[error("unexpected error")]
43 : Unexpected(#[source] anyhow::Error),
44 : }
45 :
46 : pub(crate) struct PreparedTimelineDetach {
47 : layers: Vec<Layer>,
48 : }
49 :
50 : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
51 : #[derive(Debug)]
52 : pub(crate) struct Options {
53 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
54 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
55 : }
56 :
57 : impl Default for Options {
58 0 : fn default() -> Self {
59 0 : Self {
60 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
61 0 : copy_concurrency: std::num::NonZeroUsize::new(10).unwrap(),
62 0 : }
63 0 : }
64 : }
65 :
66 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
67 0 : pub(super) async fn prepare(
68 0 : detached: &Arc<Timeline>,
69 0 : tenant: &Tenant,
70 0 : options: Options,
71 0 : ctx: &RequestContext,
72 0 : ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
73 0 : use Error::*;
74 0 :
75 0 : if detached.remote_client.as_ref().is_none() {
76 0 : unimplemented!("no new code for running without remote storage");
77 0 : }
78 :
79 0 : let Some((ancestor, ancestor_lsn)) = detached
80 0 : .ancestor_timeline
81 0 : .as_ref()
82 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
83 : else {
84 0 : return Err(NoAncestor);
85 : };
86 :
87 0 : if !ancestor_lsn.is_valid() {
88 0 : return Err(NoAncestor);
89 0 : }
90 0 :
91 0 : if ancestor.ancestor_timeline.is_some() {
92 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
93 : // not to
94 0 : return Err(TooManyAncestors);
95 0 : }
96 0 :
97 0 : if detached.get_prev_record_lsn() == Lsn::INVALID
98 0 : || detached.disk_consistent_lsn.load() == ancestor_lsn
99 : {
100 : // this is to avoid a problem that after detaching we would be unable to start up the
101 : // compute because of "PREV_LSN: invalid".
102 0 : return Err(DetachedTimelineNeedsWrites);
103 0 : }
104 0 :
105 0 : // before we acquire the gate, we must mark the ancestor as having a detach operation
106 0 : // ongoing which will block other concurrent detach operations so we don't get to ackward
107 0 : // situations where there would be two branches trying to reparent earlier branches.
108 0 : let (guard, barrier) = completion::channel();
109 0 :
110 0 : {
111 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
112 0 : if let Some((tl, other)) = guard.as_ref() {
113 0 : if !other.is_ready() {
114 0 : return Err(OtherTimelineDetachOngoing(*tl));
115 0 : }
116 0 : }
117 0 : *guard = Some((detached.timeline_id, barrier));
118 : }
119 :
120 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
121 :
122 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
123 0 : let span =
124 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
125 0 : async {
126 0 : let started_at = std::time::Instant::now();
127 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
128 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
129 :
130 0 : let res =
131 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
132 0 : .await;
133 :
134 0 : let res = match res {
135 0 : Ok(res) => res,
136 0 : Err(_elapsed) => {
137 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
138 0 : freeze_and_flush.await
139 : }
140 : };
141 :
142 0 : res.map_err(FlushAncestor)?;
143 :
144 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
145 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
146 0 : tracing::info!(
147 0 : elapsed_ms = started_at.elapsed().as_millis(),
148 0 : "froze and flushed the ancestor"
149 : );
150 0 : Ok(())
151 0 : }
152 0 : .instrument(span)
153 0 : .await?;
154 0 : }
155 :
156 0 : let end_lsn = ancestor_lsn + 1;
157 :
158 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
159 0 : // we do not need to start from our layers, because they can only be layers that come
160 0 : // *after* ancestor_lsn
161 0 : let layers = tokio::select! {
162 : guard = ancestor.layers.read() => guard,
163 : _ = detached.cancel.cancelled() => {
164 : return Err(ShuttingDown);
165 : }
166 : _ = ancestor.cancel.cancelled() => {
167 : return Err(ShuttingDown);
168 : }
169 0 : };
170 0 :
171 0 : // between retries, these can change if compaction or gc ran in between. this will mean
172 0 : // we have to redo work.
173 0 : partition_work(ancestor_lsn, &layers)
174 0 : };
175 0 :
176 0 : // TODO: layers are already sorted by something: use that to determine how much of remote
177 0 : // copies are already done.
178 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
179 :
180 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
181 0 : let mut new_layers: Vec<Layer> =
182 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
183 0 :
184 0 : {
185 0 : tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
186 :
187 0 : let mut tasks = tokio::task::JoinSet::new();
188 0 :
189 0 : let mut wrote_any = false;
190 0 :
191 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(
192 0 : options.rewrite_concurrency.get(),
193 0 : ));
194 :
195 0 : for layer in straddling_branchpoint {
196 0 : let limiter = limiter.clone();
197 0 : let timeline = detached.clone();
198 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
199 0 :
200 0 : tasks.spawn(async move {
201 0 : let _permit = limiter.acquire().await;
202 0 : let copied =
203 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
204 0 : .await?;
205 0 : Ok(copied)
206 0 : });
207 0 : }
208 :
209 0 : while let Some(res) = tasks.join_next().await {
210 0 : match res {
211 0 : Ok(Ok(Some(copied))) => {
212 0 : wrote_any = true;
213 0 : tracing::info!(layer=%copied, "rewrote and uploaded");
214 0 : new_layers.push(copied);
215 : }
216 0 : Ok(Ok(None)) => {}
217 0 : Ok(Err(e)) => return Err(e),
218 0 : Err(je) => return Err(Unexpected(je.into())),
219 : }
220 : }
221 :
222 : // FIXME: the fsync should be mandatory, after both rewrites and copies
223 0 : if wrote_any {
224 0 : let timeline_dir = VirtualFile::open(
225 0 : &detached
226 0 : .conf
227 0 : .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
228 0 : )
229 0 : .await
230 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
231 0 : timeline_dir
232 0 : .sync_all()
233 0 : .await
234 0 : .fatal_err("VirtualFile::sync_all timeline dir");
235 0 : }
236 : }
237 :
238 0 : let mut tasks = tokio::task::JoinSet::new();
239 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
240 :
241 0 : for adopted in rest_of_historic {
242 0 : let limiter = limiter.clone();
243 0 : let timeline = detached.clone();
244 0 :
245 0 : tasks.spawn(
246 0 : async move {
247 0 : let _permit = limiter.acquire().await;
248 0 : let owned =
249 0 : remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
250 0 : tracing::info!(layer=%owned, "remote copied");
251 0 : Ok(owned)
252 0 : }
253 0 : .in_current_span(),
254 0 : );
255 0 : }
256 :
257 0 : while let Some(res) = tasks.join_next().await {
258 0 : match res {
259 0 : Ok(Ok(owned)) => {
260 0 : new_layers.push(owned);
261 0 : }
262 0 : Ok(Err(failed)) => {
263 0 : return Err(failed);
264 : }
265 0 : Err(je) => return Err(Unexpected(je.into())),
266 : }
267 : }
268 :
269 : // TODO: fsync directory again if we hardlinked something
270 :
271 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
272 0 :
273 0 : Ok((guard, prepared))
274 0 : }
275 :
276 0 : fn partition_work(
277 0 : ancestor_lsn: Lsn,
278 0 : source_layermap: &LayerManager,
279 0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
280 0 : let mut straddling_branchpoint = vec![];
281 0 : let mut rest_of_historic = vec![];
282 0 :
283 0 : let mut later_by_lsn = 0;
284 :
285 0 : for desc in source_layermap.layer_map().iter_historic_layers() {
286 : // off by one chances here:
287 : // - start is inclusive
288 : // - end is exclusive
289 0 : if desc.lsn_range.start > ancestor_lsn {
290 0 : later_by_lsn += 1;
291 0 : continue;
292 0 : }
293 :
294 0 : let target = if desc.lsn_range.start <= ancestor_lsn
295 0 : && desc.lsn_range.end > ancestor_lsn
296 0 : && desc.is_delta
297 : {
298 : // TODO: image layer at Lsn optimization
299 0 : &mut straddling_branchpoint
300 : } else {
301 0 : &mut rest_of_historic
302 : };
303 :
304 0 : target.push(source_layermap.get_from_desc(&desc));
305 : }
306 :
307 0 : (later_by_lsn, straddling_branchpoint, rest_of_historic)
308 0 : }
309 :
310 0 : async fn upload_rewritten_layer(
311 0 : end_lsn: Lsn,
312 0 : layer: &Layer,
313 0 : target: &Arc<Timeline>,
314 0 : cancel: &CancellationToken,
315 0 : ctx: &RequestContext,
316 0 : ) -> Result<Option<Layer>, Error> {
317 : use Error::UploadRewritten;
318 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
319 :
320 0 : let Some(copied) = copied else {
321 0 : return Ok(None);
322 : };
323 :
324 : // FIXME: better shuttingdown error
325 0 : target
326 0 : .remote_client
327 0 : .as_ref()
328 0 : .unwrap()
329 0 : .upload_layer_file(&copied, cancel)
330 0 : .await
331 0 : .map_err(UploadRewritten)?;
332 :
333 0 : Ok(Some(copied.into()))
334 0 : }
335 :
336 0 : async fn copy_lsn_prefix(
337 0 : end_lsn: Lsn,
338 0 : layer: &Layer,
339 0 : target_timeline: &Arc<Timeline>,
340 0 : ctx: &RequestContext,
341 0 : ) -> Result<Option<ResidentLayer>, Error> {
342 0 : use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
343 0 :
344 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
345 :
346 0 : let mut writer = DeltaLayerWriter::new(
347 0 : target_timeline.conf,
348 0 : target_timeline.timeline_id,
349 0 : target_timeline.tenant_shard_id,
350 0 : layer.layer_desc().key_range.start,
351 0 : layer.layer_desc().lsn_range.start..end_lsn,
352 0 : )
353 0 : .await
354 0 : .map_err(CopyDeltaPrefix)?;
355 :
356 0 : let resident = layer
357 0 : .download_and_keep_resident()
358 0 : .await
359 : // likely shutdown
360 0 : .map_err(RewrittenDeltaDownloadFailed)?;
361 :
362 0 : let records = resident
363 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
364 0 : .await
365 0 : .map_err(CopyDeltaPrefix)?;
366 :
367 0 : drop(resident);
368 0 :
369 0 : tracing::debug!(%layer, records, "copied records");
370 :
371 0 : if records == 0 {
372 0 : drop(writer);
373 0 : // TODO: we might want to store an empty marker in remote storage for this
374 0 : // layer so that we will not needlessly walk `layer` on repeated attempts.
375 0 : Ok(None)
376 : } else {
377 : // reuse the key instead of adding more holes between layers by using the real
378 : // highest key in the layer.
379 0 : let reused_highest_key = layer.layer_desc().key_range.end;
380 0 : let copied = writer
381 0 : .finish(reused_highest_key, target_timeline, ctx)
382 0 : .await
383 0 : .map_err(CopyDeltaPrefix)?;
384 :
385 0 : tracing::debug!(%layer, %copied, "new layer produced");
386 :
387 0 : Ok(Some(copied))
388 : }
389 0 : }
390 :
391 : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
392 : /// storage on successful return without the adopted layer being added to `index_part.json`.
393 0 : async fn remote_copy(
394 0 : adopted: &Layer,
395 0 : adoptee: &Arc<Timeline>,
396 0 : generation: Generation,
397 0 : cancel: &CancellationToken,
398 0 : ) -> Result<Layer, Error> {
399 0 : use Error::CopyFailed;
400 0 :
401 0 : // depending if Layer::keep_resident we could hardlink
402 0 :
403 0 : let mut metadata = adopted.metadata();
404 0 : debug_assert!(metadata.generation <= generation);
405 0 : metadata.generation = generation;
406 0 :
407 0 : let owned = crate::tenant::storage_layer::Layer::for_evicted(
408 0 : adoptee.conf,
409 0 : adoptee,
410 0 : adopted.layer_desc().layer_name(),
411 0 : metadata,
412 0 : );
413 0 :
414 0 : // FIXME: better shuttingdown error
415 0 : adoptee
416 0 : .remote_client
417 0 : .as_ref()
418 0 : .unwrap()
419 0 : .copy_timeline_layer(adopted, &owned, cancel)
420 0 : .await
421 0 : .map(move |()| owned)
422 0 : .map_err(CopyFailed)
423 0 : }
424 :
425 : /// See [`Timeline::complete_detaching_timeline_ancestor`].
426 0 : pub(super) async fn complete(
427 0 : detached: &Arc<Timeline>,
428 0 : tenant: &Tenant,
429 0 : prepared: PreparedTimelineDetach,
430 0 : _ctx: &RequestContext,
431 0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
432 0 : let rtc = detached
433 0 : .remote_client
434 0 : .as_ref()
435 0 : .expect("has to have a remote timeline client for timeline ancestor detach");
436 0 :
437 0 : let PreparedTimelineDetach { layers } = prepared;
438 0 :
439 0 : let ancestor = detached
440 0 : .get_ancestor_timeline()
441 0 : .expect("must still have a ancestor");
442 0 : let ancestor_lsn = detached.get_ancestor_lsn();
443 0 :
444 0 : // publish the prepared layers before we reparent any of the timelines, so that on restart
445 0 : // reparented timelines find layers. also do the actual detaching.
446 0 : //
447 0 : // if we crash after this operation, we will at least come up having detached a timeline, but
448 0 : // we cannot go back and reparent the timelines which would had been reparented in normal
449 0 : // execution.
450 0 : //
451 0 : // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
452 0 : // which could give us a completely wrong layer combination.
453 0 : rtc.schedule_adding_existing_layers_to_index_detach_and_wait(
454 0 : &layers,
455 0 : (ancestor.timeline_id, ancestor_lsn),
456 0 : )
457 0 : .await?;
458 :
459 0 : let mut tasks = tokio::task::JoinSet::new();
460 0 :
461 0 : // because we are now keeping the slot in progress, it is unlikely that there will be any
462 0 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
463 0 : tenant
464 0 : .timelines
465 0 : .lock()
466 0 : .unwrap()
467 0 : .values()
468 0 : .filter_map(|tl| {
469 0 : if Arc::ptr_eq(tl, detached) {
470 0 : return None;
471 0 : }
472 0 :
473 0 : if !tl.is_active() {
474 0 : return None;
475 0 : }
476 :
477 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
478 0 : let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
479 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
480 0 :
481 0 : let is_deleting = tl
482 0 : .delete_progress
483 0 : .try_lock()
484 0 : .map(|flow| !flow.is_not_started())
485 0 : .unwrap_or(true);
486 0 :
487 0 : if is_same && is_earlier && !is_deleting {
488 0 : Some(tl.clone())
489 : } else {
490 0 : None
491 : }
492 0 : })
493 0 : .for_each(|timeline| {
494 : // important in this scope: we are holding the Tenant::timelines lock
495 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
496 0 : let new_parent = detached.timeline_id;
497 0 :
498 0 : tasks.spawn(
499 0 : async move {
500 0 : let res = timeline
501 0 : .remote_client
502 0 : .as_ref()
503 0 : .expect("reparented has to have remote client because detached has one")
504 0 : .schedule_reparenting_and_wait(&new_parent)
505 0 : .await;
506 :
507 0 : match res {
508 0 : Ok(()) => Some(timeline),
509 0 : Err(e) => {
510 0 : // with the use of tenant slot, we no longer expect these.
511 0 : tracing::warn!("reparenting failed: {e:#}");
512 0 : None
513 : }
514 : }
515 0 : }
516 0 : .instrument(span),
517 0 : );
518 0 : });
519 0 :
520 0 : let reparenting_candidates = tasks.len();
521 0 : let mut reparented = Vec::with_capacity(tasks.len());
522 :
523 0 : while let Some(res) = tasks.join_next().await {
524 0 : match res {
525 0 : Ok(Some(timeline)) => {
526 0 : tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
527 0 : reparented.push(timeline.timeline_id);
528 : }
529 0 : Ok(None) => {
530 0 : // lets just ignore this for now. one or all reparented timelines could had
531 0 : // started deletion, and that is fine.
532 0 : }
533 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
534 0 : Err(je) if je.is_panic() => {
535 0 : // ignore; it's better to continue with a single reparenting failing (or even
536 0 : // all of them) in order to get to the goal state.
537 0 : //
538 0 : // these timelines will never be reparentable, but they can be always detached as
539 0 : // separate tree roots.
540 0 : }
541 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
542 : }
543 : }
544 :
545 0 : if reparenting_candidates != reparented.len() {
546 0 : tracing::info!("failed to reparent some candidates");
547 0 : }
548 :
549 0 : Ok(reparented)
550 0 : }
|