Line data Source code
1 : use std::sync::Arc;
2 :
3 : use super::{layer_manager::LayerManager, Timeline};
4 : use crate::{
5 : context::{DownloadBehavior, RequestContext},
6 : task_mgr::TaskKind,
7 : tenant::{
8 : storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
9 : Tenant,
10 : },
11 : virtual_file::{MaybeFatalIo, VirtualFile},
12 : };
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::Instrument;
15 : use utils::{completion, generation::Generation, id::TimelineId, lsn::Lsn};
16 :
17 0 : #[derive(Debug, thiserror::Error)]
18 : pub(crate) enum Error {
19 : #[error("no ancestors")]
20 : NoAncestor,
21 : #[error("too many ancestors")]
22 : TooManyAncestors,
23 : #[error("shutting down, please retry later")]
24 : ShuttingDown,
25 : #[error("flushing failed")]
26 : FlushAncestor(#[source] anyhow::Error),
27 : #[error("layer download failed")]
28 : RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
29 : #[error("copying LSN prefix locally failed")]
30 : CopyDeltaPrefix(#[source] anyhow::Error),
31 : #[error("upload rewritten layer")]
32 : UploadRewritten(#[source] anyhow::Error),
33 :
34 : #[error("ancestor is already being detached by: {}", .0)]
35 : OtherTimelineDetachOngoing(TimelineId),
36 :
37 : #[error("remote copying layer failed")]
38 : CopyFailed(#[source] anyhow::Error),
39 :
40 : #[error("unexpected error")]
41 : Unexpected(#[source] anyhow::Error),
42 : }
43 :
44 : pub(crate) struct PreparedTimelineDetach {
45 : layers: Vec<Layer>,
46 : }
47 :
48 : /// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
49 : #[derive(Debug)]
50 : pub(crate) struct Options {
51 : pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
52 : pub(crate) copy_concurrency: std::num::NonZeroUsize,
53 : }
54 :
55 : impl Default for Options {
56 0 : fn default() -> Self {
57 0 : Self {
58 0 : rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
59 0 : copy_concurrency: std::num::NonZeroUsize::new(100).unwrap(),
60 0 : }
61 0 : }
62 : }
63 :
64 : /// See [`Timeline::prepare_to_detach_from_ancestor`]
65 0 : pub(super) async fn prepare(
66 0 : detached: &Arc<Timeline>,
67 0 : tenant: &Tenant,
68 0 : options: Options,
69 0 : ctx: &RequestContext,
70 0 : ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
71 : use Error::*;
72 :
73 0 : let Some((ancestor, ancestor_lsn)) = detached
74 0 : .ancestor_timeline
75 0 : .as_ref()
76 0 : .map(|tl| (tl.clone(), detached.ancestor_lsn))
77 : else {
78 0 : return Err(NoAncestor);
79 : };
80 :
81 0 : if !ancestor_lsn.is_valid() {
82 0 : return Err(NoAncestor);
83 0 : }
84 0 :
85 0 : if ancestor.ancestor_timeline.is_some() {
86 : // non-technical requirement; we could flatten N ancestors just as easily but we chose
87 : // not to
88 0 : return Err(TooManyAncestors);
89 0 : }
90 0 :
91 0 : // before we acquire the gate, we must mark the ancestor as having a detach operation
92 0 : // ongoing which will block other concurrent detach operations so we don't get to ackward
93 0 : // situations where there would be two branches trying to reparent earlier branches.
94 0 : let (guard, barrier) = completion::channel();
95 0 :
96 0 : {
97 0 : let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
98 0 : if let Some((tl, other)) = guard.as_ref() {
99 0 : if !other.is_ready() {
100 0 : return Err(OtherTimelineDetachOngoing(*tl));
101 0 : }
102 0 : }
103 0 : *guard = Some((detached.timeline_id, barrier));
104 : }
105 :
106 0 : let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
107 :
108 0 : if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
109 0 : let span =
110 0 : tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
111 0 : async {
112 0 : let started_at = std::time::Instant::now();
113 0 : let freeze_and_flush = ancestor.freeze_and_flush0();
114 0 : let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
115 :
116 0 : let res =
117 0 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
118 0 : .await;
119 :
120 0 : let res = match res {
121 0 : Ok(res) => res,
122 0 : Err(_elapsed) => {
123 0 : tracing::info!("freezing and flushing ancestor is still ongoing");
124 0 : freeze_and_flush.await
125 : }
126 : };
127 :
128 0 : res.map_err(FlushAncestor)?;
129 :
130 : // we do not need to wait for uploads to complete but we do need `struct Layer`,
131 : // copying delta prefix is unsupported currently for `InMemoryLayer`.
132 0 : tracing::info!(
133 0 : elapsed_ms = started_at.elapsed().as_millis(),
134 0 : "froze and flushed the ancestor"
135 : );
136 0 : Ok(())
137 0 : }
138 0 : .instrument(span)
139 0 : .await?;
140 0 : }
141 :
142 0 : let end_lsn = ancestor_lsn + 1;
143 :
144 0 : let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
145 0 : // we do not need to start from our layers, because they can only be layers that come
146 0 : // *after* ancestor_lsn
147 0 : let layers = tokio::select! {
148 : guard = ancestor.layers.read() => guard,
149 : _ = detached.cancel.cancelled() => {
150 : return Err(ShuttingDown);
151 : }
152 : _ = ancestor.cancel.cancelled() => {
153 : return Err(ShuttingDown);
154 : }
155 0 : };
156 0 :
157 0 : // between retries, these can change if compaction or gc ran in between. this will mean
158 0 : // we have to redo work.
159 0 : partition_work(ancestor_lsn, &layers)
160 0 : };
161 0 :
162 0 : // TODO: layers are already sorted by something: use that to determine how much of remote
163 0 : // copies are already done.
164 0 : tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
165 :
166 : // TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
167 0 : let mut new_layers: Vec<Layer> =
168 0 : Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
169 0 :
170 0 : {
171 0 : tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
172 :
173 0 : let mut tasks = tokio::task::JoinSet::new();
174 0 :
175 0 : let mut wrote_any = false;
176 0 :
177 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(
178 0 : options.rewrite_concurrency.get(),
179 0 : ));
180 :
181 0 : for layer in straddling_branchpoint {
182 0 : let limiter = limiter.clone();
183 0 : let timeline = detached.clone();
184 0 : let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
185 0 :
186 0 : tasks.spawn(async move {
187 0 : let _permit = limiter.acquire().await;
188 0 : let copied =
189 0 : upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
190 0 : .await?;
191 0 : Ok(copied)
192 0 : });
193 0 : }
194 :
195 0 : while let Some(res) = tasks.join_next().await {
196 0 : match res {
197 0 : Ok(Ok(Some(copied))) => {
198 0 : wrote_any = true;
199 0 : tracing::info!(layer=%copied, "rewrote and uploaded");
200 0 : new_layers.push(copied);
201 : }
202 0 : Ok(Ok(None)) => {}
203 0 : Ok(Err(e)) => return Err(e),
204 0 : Err(je) => return Err(Unexpected(je.into())),
205 : }
206 : }
207 :
208 : // FIXME: the fsync should be mandatory, after both rewrites and copies
209 0 : if wrote_any {
210 0 : let timeline_dir = VirtualFile::open(
211 0 : &detached
212 0 : .conf
213 0 : .timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
214 0 : ctx,
215 0 : )
216 0 : .await
217 0 : .fatal_err("VirtualFile::open for timeline dir fsync");
218 0 : timeline_dir
219 0 : .sync_all()
220 0 : .await
221 0 : .fatal_err("VirtualFile::sync_all timeline dir");
222 0 : }
223 : }
224 :
225 0 : let mut tasks = tokio::task::JoinSet::new();
226 0 : let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
227 :
228 0 : for adopted in rest_of_historic {
229 0 : let limiter = limiter.clone();
230 0 : let timeline = detached.clone();
231 0 :
232 0 : tasks.spawn(
233 0 : async move {
234 0 : let _permit = limiter.acquire().await;
235 0 : let owned =
236 0 : remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
237 0 : tracing::info!(layer=%owned, "remote copied");
238 0 : Ok(owned)
239 0 : }
240 0 : .in_current_span(),
241 0 : );
242 0 : }
243 :
244 0 : while let Some(res) = tasks.join_next().await {
245 0 : match res {
246 0 : Ok(Ok(owned)) => {
247 0 : new_layers.push(owned);
248 0 : }
249 0 : Ok(Err(failed)) => {
250 0 : return Err(failed);
251 : }
252 0 : Err(je) => return Err(Unexpected(je.into())),
253 : }
254 : }
255 :
256 : // TODO: fsync directory again if we hardlinked something
257 :
258 0 : let prepared = PreparedTimelineDetach { layers: new_layers };
259 0 :
260 0 : Ok((guard, prepared))
261 0 : }
262 :
263 0 : fn partition_work(
264 0 : ancestor_lsn: Lsn,
265 0 : source_layermap: &LayerManager,
266 0 : ) -> (usize, Vec<Layer>, Vec<Layer>) {
267 0 : let mut straddling_branchpoint = vec![];
268 0 : let mut rest_of_historic = vec![];
269 0 :
270 0 : let mut later_by_lsn = 0;
271 :
272 0 : for desc in source_layermap.layer_map().iter_historic_layers() {
273 : // off by one chances here:
274 : // - start is inclusive
275 : // - end is exclusive
276 0 : if desc.lsn_range.start > ancestor_lsn {
277 0 : later_by_lsn += 1;
278 0 : continue;
279 0 : }
280 :
281 0 : let target = if desc.lsn_range.start <= ancestor_lsn
282 0 : && desc.lsn_range.end > ancestor_lsn
283 0 : && desc.is_delta
284 : {
285 : // TODO: image layer at Lsn optimization
286 0 : &mut straddling_branchpoint
287 : } else {
288 0 : &mut rest_of_historic
289 : };
290 :
291 0 : target.push(source_layermap.get_from_desc(&desc));
292 : }
293 :
294 0 : (later_by_lsn, straddling_branchpoint, rest_of_historic)
295 0 : }
296 :
297 0 : async fn upload_rewritten_layer(
298 0 : end_lsn: Lsn,
299 0 : layer: &Layer,
300 0 : target: &Arc<Timeline>,
301 0 : cancel: &CancellationToken,
302 0 : ctx: &RequestContext,
303 0 : ) -> Result<Option<Layer>, Error> {
304 : use Error::UploadRewritten;
305 0 : let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
306 :
307 0 : let Some(copied) = copied else {
308 0 : return Ok(None);
309 : };
310 :
311 : // FIXME: better shuttingdown error
312 0 : target
313 0 : .remote_client
314 0 : .upload_layer_file(&copied, cancel)
315 0 : .await
316 0 : .map_err(UploadRewritten)?;
317 :
318 0 : Ok(Some(copied.into()))
319 0 : }
320 :
321 0 : async fn copy_lsn_prefix(
322 0 : end_lsn: Lsn,
323 0 : layer: &Layer,
324 0 : target_timeline: &Arc<Timeline>,
325 0 : ctx: &RequestContext,
326 0 : ) -> Result<Option<ResidentLayer>, Error> {
327 0 : use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
328 0 :
329 0 : tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
330 :
331 0 : let mut writer = DeltaLayerWriter::new(
332 0 : target_timeline.conf,
333 0 : target_timeline.timeline_id,
334 0 : target_timeline.tenant_shard_id,
335 0 : layer.layer_desc().key_range.start,
336 0 : layer.layer_desc().lsn_range.start..end_lsn,
337 0 : ctx,
338 0 : )
339 0 : .await
340 0 : .map_err(CopyDeltaPrefix)?;
341 :
342 0 : let resident = layer
343 0 : .download_and_keep_resident()
344 0 : .await
345 : // likely shutdown
346 0 : .map_err(RewrittenDeltaDownloadFailed)?;
347 :
348 0 : let records = resident
349 0 : .copy_delta_prefix(&mut writer, end_lsn, ctx)
350 0 : .await
351 0 : .map_err(CopyDeltaPrefix)?;
352 :
353 0 : drop(resident);
354 0 :
355 0 : tracing::debug!(%layer, records, "copied records");
356 :
357 0 : if records == 0 {
358 0 : drop(writer);
359 0 : // TODO: we might want to store an empty marker in remote storage for this
360 0 : // layer so that we will not needlessly walk `layer` on repeated attempts.
361 0 : Ok(None)
362 : } else {
363 : // reuse the key instead of adding more holes between layers by using the real
364 : // highest key in the layer.
365 0 : let reused_highest_key = layer.layer_desc().key_range.end;
366 0 : let copied = writer
367 0 : .finish(reused_highest_key, target_timeline, ctx)
368 0 : .await
369 0 : .map_err(CopyDeltaPrefix)?;
370 :
371 0 : tracing::debug!(%layer, %copied, "new layer produced");
372 :
373 0 : Ok(Some(copied))
374 : }
375 0 : }
376 :
377 : /// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
378 : /// storage on successful return without the adopted layer being added to `index_part.json`.
379 0 : async fn remote_copy(
380 0 : adopted: &Layer,
381 0 : adoptee: &Arc<Timeline>,
382 0 : generation: Generation,
383 0 : cancel: &CancellationToken,
384 0 : ) -> Result<Layer, Error> {
385 0 : use Error::CopyFailed;
386 0 :
387 0 : // depending if Layer::keep_resident we could hardlink
388 0 :
389 0 : let mut metadata = adopted.metadata();
390 0 : debug_assert!(metadata.generation <= generation);
391 0 : metadata.generation = generation;
392 0 :
393 0 : let owned = crate::tenant::storage_layer::Layer::for_evicted(
394 0 : adoptee.conf,
395 0 : adoptee,
396 0 : adopted.layer_desc().layer_name(),
397 0 : metadata,
398 0 : );
399 0 :
400 0 : // FIXME: better shuttingdown error
401 0 : adoptee
402 0 : .remote_client
403 0 : .copy_timeline_layer(adopted, &owned, cancel)
404 0 : .await
405 0 : .map(move |()| owned)
406 0 : .map_err(CopyFailed)
407 0 : }
408 :
409 : /// See [`Timeline::complete_detaching_timeline_ancestor`].
410 0 : pub(super) async fn complete(
411 0 : detached: &Arc<Timeline>,
412 0 : tenant: &Tenant,
413 0 : prepared: PreparedTimelineDetach,
414 0 : _ctx: &RequestContext,
415 0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
416 0 : let PreparedTimelineDetach { layers } = prepared;
417 0 :
418 0 : let ancestor = detached
419 0 : .get_ancestor_timeline()
420 0 : .expect("must still have a ancestor");
421 0 : let ancestor_lsn = detached.get_ancestor_lsn();
422 0 :
423 0 : // publish the prepared layers before we reparent any of the timelines, so that on restart
424 0 : // reparented timelines find layers. also do the actual detaching.
425 0 : //
426 0 : // if we crash after this operation, we will at least come up having detached a timeline, but
427 0 : // we cannot go back and reparent the timelines which would had been reparented in normal
428 0 : // execution.
429 0 : //
430 0 : // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
431 0 : // which could give us a completely wrong layer combination.
432 0 : detached
433 0 : .remote_client
434 0 : .schedule_adding_existing_layers_to_index_detach_and_wait(
435 0 : &layers,
436 0 : (ancestor.timeline_id, ancestor_lsn),
437 0 : )
438 0 : .await?;
439 :
440 0 : let mut tasks = tokio::task::JoinSet::new();
441 0 :
442 0 : // because we are now keeping the slot in progress, it is unlikely that there will be any
443 0 : // timeline deletions during this time. if we raced one, then we'll just ignore it.
444 0 : tenant
445 0 : .timelines
446 0 : .lock()
447 0 : .unwrap()
448 0 : .values()
449 0 : .filter_map(|tl| {
450 0 : if Arc::ptr_eq(tl, detached) {
451 0 : return None;
452 0 : }
453 0 :
454 0 : if !tl.is_active() {
455 0 : return None;
456 0 : }
457 :
458 0 : let tl_ancestor = tl.ancestor_timeline.as_ref()?;
459 0 : let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
460 0 : let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
461 0 :
462 0 : let is_deleting = tl
463 0 : .delete_progress
464 0 : .try_lock()
465 0 : .map(|flow| !flow.is_not_started())
466 0 : .unwrap_or(true);
467 0 :
468 0 : if is_same && is_earlier && !is_deleting {
469 0 : Some(tl.clone())
470 : } else {
471 0 : None
472 : }
473 0 : })
474 0 : .for_each(|timeline| {
475 : // important in this scope: we are holding the Tenant::timelines lock
476 0 : let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
477 0 : let new_parent = detached.timeline_id;
478 0 :
479 0 : tasks.spawn(
480 0 : async move {
481 0 : let res = timeline
482 0 : .remote_client
483 0 : .schedule_reparenting_and_wait(&new_parent)
484 0 : .await;
485 :
486 0 : match res {
487 0 : Ok(()) => Some(timeline),
488 0 : Err(e) => {
489 0 : // with the use of tenant slot, we no longer expect these.
490 0 : tracing::warn!("reparenting failed: {e:#}");
491 0 : None
492 : }
493 : }
494 0 : }
495 0 : .instrument(span),
496 0 : );
497 0 : });
498 0 :
499 0 : let reparenting_candidates = tasks.len();
500 0 : let mut reparented = Vec::with_capacity(tasks.len());
501 :
502 0 : while let Some(res) = tasks.join_next().await {
503 0 : match res {
504 0 : Ok(Some(timeline)) => {
505 0 : tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
506 0 : reparented.push(timeline.timeline_id);
507 : }
508 0 : Ok(None) => {
509 0 : // lets just ignore this for now. one or all reparented timelines could had
510 0 : // started deletion, and that is fine.
511 0 : }
512 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
513 0 : Err(je) if je.is_panic() => {
514 0 : // ignore; it's better to continue with a single reparenting failing (or even
515 0 : // all of them) in order to get to the goal state.
516 0 : //
517 0 : // these timelines will never be reparentable, but they can be always detached as
518 0 : // separate tree roots.
519 0 : }
520 0 : Err(je) => tracing::error!("unexpected join error: {je:?}"),
521 : }
522 : }
523 :
524 0 : if reparenting_candidates != reparented.len() {
525 0 : tracing::info!("failed to reparent some candidates");
526 0 : }
527 :
528 0 : Ok(reparented)
529 0 : }
|