Line data Source code
1 : use std::{collections::hash_map::Entry, fs, sync::Arc};
2 :
3 : use anyhow::Context;
4 : use camino::Utf8PathBuf;
5 : use tracing::{error, info, info_span};
6 : use utils::{fs_ext, id::TimelineId, lsn::Lsn};
7 :
8 : use crate::{context::RequestContext, import_datadir, tenant::Tenant};
9 :
10 : use super::Timeline;
11 :
12 : /// A timeline with some of its files on disk, being initialized.
13 : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
14 : /// its local files are removed. If we crash while this class exists, then the timeline's local
15 : /// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
16 : ///
17 : /// The caller is responsible for proper timeline data filling before the final init.
18 : #[must_use]
19 : pub struct UninitializedTimeline<'t> {
20 : pub(crate) owning_tenant: &'t Tenant,
21 : timeline_id: TimelineId,
22 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard<'t>)>,
23 : }
24 :
25 : impl<'t> UninitializedTimeline<'t> {
26 1224 : pub(crate) fn new(
27 1224 : owning_tenant: &'t Tenant,
28 1224 : timeline_id: TimelineId,
29 1224 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard<'t>)>,
30 1224 : ) -> Self {
31 1224 : Self {
32 1224 : owning_tenant,
33 1224 : timeline_id,
34 1224 : raw_timeline,
35 1224 : }
36 1224 : }
37 :
38 : /// Finish timeline creation: insert it into the Tenant's timelines map
39 : ///
40 : /// This function launches the flush loop if not already done.
41 : ///
42 : /// The caller is responsible for activating the timeline (function `.activate()`).
43 1200 : pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
44 1200 : let timeline_id = self.timeline_id;
45 1200 : let tenant_shard_id = self.owning_tenant.tenant_shard_id;
46 1200 :
47 1200 : if self.raw_timeline.is_none() {
48 0 : return Err(anyhow::anyhow!(
49 0 : "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
50 0 : ));
51 1200 : }
52 1200 :
53 1200 : // Check that the caller initialized disk_consistent_lsn
54 1200 : let new_disk_consistent_lsn = self
55 1200 : .raw_timeline
56 1200 : .as_ref()
57 1200 : .expect("checked above")
58 1200 : .0
59 1200 : .get_disk_consistent_lsn();
60 1200 :
61 1200 : anyhow::ensure!(
62 1200 : new_disk_consistent_lsn.is_valid(),
63 0 : "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
64 : );
65 :
66 1200 : let mut timelines = self.owning_tenant.timelines.lock().unwrap();
67 1200 : match timelines.entry(timeline_id) {
68 0 : Entry::Occupied(_) => anyhow::bail!(
69 0 : "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
70 0 : ),
71 1200 : Entry::Vacant(v) => {
72 1200 : // after taking here should be no fallible operations, because the drop guard will not
73 1200 : // cleanup after and would block for example the tenant deletion
74 1200 : let (new_timeline, _create_guard) =
75 1200 : self.raw_timeline.take().expect("already checked");
76 1200 :
77 1200 : v.insert(Arc::clone(&new_timeline));
78 1200 :
79 1200 : new_timeline.maybe_spawn_flush_loop();
80 1200 :
81 1200 : Ok(new_timeline)
82 : }
83 : }
84 1200 : }
85 :
86 : /// Prepares timeline data by loading it from the basebackup archive.
87 0 : pub(crate) async fn import_basebackup_from_tar(
88 0 : self,
89 0 : tenant: Arc<Tenant>,
90 0 : copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
91 0 : base_lsn: Lsn,
92 0 : broker_client: storage_broker::BrokerClientChannel,
93 0 : ctx: &RequestContext,
94 0 : ) -> anyhow::Result<Arc<Timeline>> {
95 0 : let raw_timeline = self.raw_timeline()?;
96 :
97 0 : import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
98 0 : .await
99 0 : .context("Failed to import basebackup")?;
100 :
101 : // Flush the new layer files to disk, before we make the timeline as available to
102 : // the outside world.
103 : //
104 : // Flush loop needs to be spawned in order to be able to flush.
105 0 : raw_timeline.maybe_spawn_flush_loop();
106 0 :
107 0 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
108 0 : anyhow::bail!("failpoint before-checkpoint-new-timeline");
109 0 : });
110 :
111 0 : raw_timeline
112 0 : .freeze_and_flush()
113 0 : .await
114 0 : .context("Failed to flush after basebackup import")?;
115 :
116 : // All the data has been imported. Insert the Timeline into the tenant's timelines map
117 0 : let tl = self.finish_creation()?;
118 0 : tl.activate(tenant, broker_client, None, ctx);
119 0 : Ok(tl)
120 0 : }
121 :
122 546 : pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
123 546 : Ok(&self
124 546 : .raw_timeline
125 546 : .as_ref()
126 546 : .with_context(|| {
127 0 : format!(
128 0 : "No raw timeline {}/{} found",
129 0 : self.owning_tenant.tenant_shard_id, self.timeline_id
130 0 : )
131 546 : })?
132 : .0)
133 546 : }
134 : }
135 :
136 : impl Drop for UninitializedTimeline<'_> {
137 1218 : fn drop(&mut self) {
138 1218 : if let Some((_, create_guard)) = self.raw_timeline.take() {
139 18 : let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
140 18 : error!("Timeline got dropped without initializing, cleaning its files");
141 18 : cleanup_timeline_directory(create_guard);
142 1200 : }
143 1218 : }
144 : }
145 :
146 18 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
147 18 : let timeline_path = &create_guard.timeline_path;
148 18 : match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
149 : Ok(()) => {
150 18 : info!("Timeline dir {timeline_path:?} removed successfully")
151 : }
152 0 : Err(e) => {
153 0 : error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
154 : }
155 : }
156 : // Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
157 : // timeline creation attempts under this TimelineId to proceed
158 18 : drop(create_guard);
159 18 : }
160 :
161 : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
162 : /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
163 : #[must_use]
164 : pub(crate) struct TimelineCreateGuard<'t> {
165 : owning_tenant: &'t Tenant,
166 : timeline_id: TimelineId,
167 : pub(crate) timeline_path: Utf8PathBuf,
168 : }
169 :
170 : /// Errors when acquiring exclusive access to a timeline ID for creation
171 6 : #[derive(thiserror::Error, Debug)]
172 : pub(crate) enum TimelineExclusionError {
173 : #[error("Already exists")]
174 : AlreadyExists(Arc<Timeline>),
175 : #[error("Already creating")]
176 : AlreadyCreating,
177 :
178 : // e.g. I/O errors, or some failure deep in postgres initdb
179 : #[error(transparent)]
180 : Other(#[from] anyhow::Error),
181 : }
182 :
183 : impl<'t> TimelineCreateGuard<'t> {
184 1242 : pub(crate) fn new(
185 1242 : owning_tenant: &'t Tenant,
186 1242 : timeline_id: TimelineId,
187 1242 : timeline_path: Utf8PathBuf,
188 1242 : ) -> Result<Self, TimelineExclusionError> {
189 1242 : // Lock order: this is the only place we take both locks. During drop() we only
190 1242 : // lock creating_timelines
191 1242 : let timelines = owning_tenant.timelines.lock().unwrap();
192 1242 : let mut creating_timelines: std::sync::MutexGuard<
193 1242 : '_,
194 1242 : std::collections::HashSet<TimelineId>,
195 1242 : > = owning_tenant.timelines_creating.lock().unwrap();
196 :
197 1242 : if let Some(existing) = timelines.get(&timeline_id) {
198 6 : Err(TimelineExclusionError::AlreadyExists(existing.clone()))
199 1236 : } else if creating_timelines.contains(&timeline_id) {
200 0 : Err(TimelineExclusionError::AlreadyCreating)
201 : } else {
202 1236 : creating_timelines.insert(timeline_id);
203 1236 : Ok(Self {
204 1236 : owning_tenant,
205 1236 : timeline_id,
206 1236 : timeline_path,
207 1236 : })
208 : }
209 1242 : }
210 : }
211 :
212 : impl Drop for TimelineCreateGuard<'_> {
213 1230 : fn drop(&mut self) {
214 1230 : self.owning_tenant
215 1230 : .timelines_creating
216 1230 : .lock()
217 1230 : .unwrap()
218 1230 : .remove(&self.timeline_id);
219 1230 : }
220 : }
|