Line data Source code
1 : use std::{collections::hash_map::Entry, fs, future::Future, sync::Arc};
2 :
3 : use anyhow::Context;
4 : use camino::Utf8PathBuf;
5 : use tracing::{error, info, info_span};
6 : use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
7 :
8 : use crate::{
9 : context::RequestContext,
10 : import_datadir,
11 : span::debug_assert_current_span_has_tenant_and_timeline_id,
12 : tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
13 : };
14 :
15 : use super::Timeline;
16 :
17 : /// A timeline with some of its files on disk, being initialized.
18 : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
19 : /// its local files are removed. If we crash while this class exists, then the timeline's local
20 : /// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
21 : ///
22 : /// The caller is responsible for proper timeline data filling before the final init.
23 : #[must_use]
24 : pub struct UninitializedTimeline<'t> {
25 : pub(crate) owning_tenant: &'t Tenant,
26 : timeline_id: TimelineId,
27 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
28 : /// Whether we spawned the inner Timeline's tasks such that we must later shut it down
29 : /// if aborting the timeline creation
30 : needs_shutdown: bool,
31 : }
32 :
33 : impl<'t> UninitializedTimeline<'t> {
34 884 : pub(crate) fn new(
35 884 : owning_tenant: &'t Tenant,
36 884 : timeline_id: TimelineId,
37 884 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
38 884 : ) -> Self {
39 884 : Self {
40 884 : owning_tenant,
41 884 : timeline_id,
42 884 : raw_timeline,
43 884 : needs_shutdown: false,
44 884 : }
45 884 : }
46 :
47 : /// When writing data to this timeline during creation, use this wrapper: it will take care of
48 : /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
49 : /// later.
50 4 : pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
51 4 : where
52 4 : F: FnOnce(Arc<Timeline>) -> Fut,
53 4 : Fut: Future<Output = Result<(), CreateTimelineError>>,
54 4 : {
55 4 : debug_assert_current_span_has_tenant_and_timeline_id();
56 4 :
57 4 : // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
58 4 : self.needs_shutdown = true;
59 :
60 4 : let timeline = self.raw_timeline()?;
61 :
62 : // Spawn flush loop so that the Timeline is ready to accept writes
63 4 : timeline.maybe_spawn_flush_loop();
64 :
65 : // Invoke the provided function, which will write some data into the new timeline
66 4 : if let Err(e) = f(timeline.clone()).await {
67 0 : self.abort().await;
68 0 : return Err(e.into());
69 4 : }
70 :
71 : // Flush the underlying timeline's ephemeral layers to disk
72 4 : if let Err(e) = timeline
73 4 : .freeze_and_flush()
74 4 : .await
75 4 : .context("Failed to flush after timeline creation writes")
76 : {
77 0 : self.abort().await;
78 0 : return Err(e);
79 4 : }
80 4 :
81 4 : Ok(())
82 4 : }
83 :
84 0 : pub(crate) async fn abort(&self) {
85 0 : if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
86 0 : raw_timeline.shutdown(super::ShutdownMode::Hard).await;
87 0 : }
88 0 : }
89 :
90 : /// Finish timeline creation: insert it into the Tenant's timelines map
91 : ///
92 : /// This function launches the flush loop if not already done.
93 : ///
94 : /// The caller is responsible for activating the timeline (function `.activate()`).
95 868 : pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
96 868 : let timeline_id = self.timeline_id;
97 868 : let tenant_shard_id = self.owning_tenant.tenant_shard_id;
98 868 :
99 868 : if self.raw_timeline.is_none() {
100 0 : self.abort().await;
101 :
102 0 : return Err(anyhow::anyhow!(
103 0 : "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
104 0 : ));
105 868 : }
106 868 :
107 868 : // Check that the caller initialized disk_consistent_lsn
108 868 : let new_disk_consistent_lsn = self
109 868 : .raw_timeline
110 868 : .as_ref()
111 868 : .expect("checked above")
112 868 : .0
113 868 : .get_disk_consistent_lsn();
114 868 :
115 868 : if !new_disk_consistent_lsn.is_valid() {
116 0 : self.abort().await;
117 :
118 0 : return Err(anyhow::anyhow!(
119 0 : "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
120 0 : ));
121 868 : }
122 868 :
123 868 : let mut timelines = self.owning_tenant.timelines.lock().unwrap();
124 868 : match timelines.entry(timeline_id) {
125 : Entry::Occupied(_) => {
126 : // Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline.
127 : //
128 : // We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should
129 : // skip trying to delete the timeline directory too.
130 0 : anyhow::bail!(
131 0 : "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
132 0 : )
133 : }
134 868 : Entry::Vacant(v) => {
135 868 : // after taking here should be no fallible operations, because the drop guard will not
136 868 : // cleanup after and would block for example the tenant deletion
137 868 : let (new_timeline, _create_guard) =
138 868 : self.raw_timeline.take().expect("already checked");
139 868 :
140 868 : v.insert(Arc::clone(&new_timeline));
141 868 :
142 868 : new_timeline.maybe_spawn_flush_loop();
143 868 :
144 868 : Ok(new_timeline)
145 : }
146 : }
147 868 : }
148 :
149 0 : pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
150 0 : self.raw_timeline.take().expect("already checked")
151 0 : }
152 :
153 : /// Prepares timeline data by loading it from the basebackup archive.
154 0 : pub(crate) async fn import_basebackup_from_tar(
155 0 : mut self,
156 0 : tenant: Arc<Tenant>,
157 0 : copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
158 0 : base_lsn: Lsn,
159 0 : broker_client: storage_broker::BrokerClientChannel,
160 0 : ctx: &RequestContext,
161 0 : ) -> anyhow::Result<Arc<Timeline>> {
162 0 : self.write(|raw_timeline| async move {
163 0 : import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
164 0 : .await
165 0 : .context("Failed to import basebackup")
166 0 : .map_err(CreateTimelineError::Other)?;
167 :
168 0 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
169 0 : Err(CreateTimelineError::Other(anyhow::anyhow!(
170 0 : "failpoint before-checkpoint-new-timeline"
171 0 : )))
172 0 : });
173 :
174 0 : Ok(())
175 0 : })
176 0 : .await?;
177 :
178 : // All the data has been imported. Insert the Timeline into the tenant's timelines map
179 0 : let tl = self.finish_creation().await?;
180 0 : tl.activate(tenant, broker_client, None, ctx);
181 0 : Ok(tl)
182 0 : }
183 :
184 428 : pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
185 428 : Ok(&self
186 428 : .raw_timeline
187 428 : .as_ref()
188 428 : .with_context(|| {
189 0 : format!(
190 0 : "No raw timeline {}/{} found",
191 0 : self.owning_tenant.tenant_shard_id, self.timeline_id
192 0 : )
193 428 : })?
194 : .0)
195 428 : }
196 : }
197 :
198 : impl Drop for UninitializedTimeline<'_> {
199 880 : fn drop(&mut self) {
200 880 : if let Some((timeline, create_guard)) = self.raw_timeline.take() {
201 12 : let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
202 12 : if self.needs_shutdown && !timeline.gate.close_complete() {
203 : // This should not happen: caller should call [`Self::abort`] on failures
204 0 : tracing::warn!(
205 0 : "Timeline not shut down after initialization failure, cannot clean up files"
206 : );
207 : } else {
208 : // This is unusual, but can happen harmlessly if the pageserver is stopped while
209 : // creating a timeline.
210 12 : info!("Timeline got dropped without initializing, cleaning its files");
211 12 : cleanup_timeline_directory(create_guard);
212 : }
213 868 : }
214 880 : }
215 : }
216 :
217 12 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
218 12 : let timeline_path = &create_guard.timeline_path;
219 12 : match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
220 : Ok(()) => {
221 12 : info!("Timeline dir {timeline_path:?} removed successfully")
222 : }
223 0 : Err(e) => {
224 0 : error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
225 : }
226 : }
227 : // Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
228 : // timeline creation attempts under this TimelineId to proceed
229 12 : drop(create_guard);
230 12 : }
231 :
232 : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
233 : /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
234 : #[must_use]
235 : pub(crate) struct TimelineCreateGuard {
236 : pub(crate) _tenant_gate_guard: GateGuard,
237 : pub(crate) owning_tenant: Arc<Tenant>,
238 : pub(crate) timeline_id: TimelineId,
239 : pub(crate) timeline_path: Utf8PathBuf,
240 : pub(crate) idempotency: CreateTimelineIdempotency,
241 : }
242 :
243 : /// Errors when acquiring exclusive access to a timeline ID for creation
244 : #[derive(thiserror::Error, Debug)]
245 : pub(crate) enum TimelineExclusionError {
246 : #[error("Already exists")]
247 : AlreadyExists {
248 : existing: TimelineOrOffloaded,
249 : arg: CreateTimelineIdempotency,
250 : },
251 : #[error("Already creating")]
252 : AlreadyCreating,
253 : #[error("Shutting down")]
254 : ShuttingDown,
255 :
256 : // e.g. I/O errors, or some failure deep in postgres initdb
257 : #[error(transparent)]
258 : Other(#[from] anyhow::Error),
259 : }
260 :
261 : impl TimelineCreateGuard {
262 896 : pub(crate) fn new(
263 896 : owning_tenant: &Arc<Tenant>,
264 896 : timeline_id: TimelineId,
265 896 : timeline_path: Utf8PathBuf,
266 896 : idempotency: CreateTimelineIdempotency,
267 896 : allow_offloaded: bool,
268 896 : ) -> Result<Self, TimelineExclusionError> {
269 896 : let _tenant_gate_guard = owning_tenant
270 896 : .gate
271 896 : .enter()
272 896 : .map_err(|_| TimelineExclusionError::ShuttingDown)?;
273 :
274 : // Lock order: this is the only place we take both locks. During drop() we only
275 : // lock creating_timelines
276 896 : let timelines = owning_tenant.timelines.lock().unwrap();
277 896 : let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
278 896 : let mut creating_timelines: std::sync::MutexGuard<
279 896 : '_,
280 896 : std::collections::HashSet<TimelineId>,
281 896 : > = owning_tenant.timelines_creating.lock().unwrap();
282 :
283 896 : if let Some(existing) = timelines.get(&timeline_id) {
284 4 : return Err(TimelineExclusionError::AlreadyExists {
285 4 : existing: TimelineOrOffloaded::Timeline(existing.clone()),
286 4 : arg: idempotency,
287 4 : });
288 892 : }
289 892 : if !allow_offloaded {
290 892 : if let Some(existing) = timelines_offloaded.get(&timeline_id) {
291 0 : return Err(TimelineExclusionError::AlreadyExists {
292 0 : existing: TimelineOrOffloaded::Offloaded(existing.clone()),
293 0 : arg: idempotency,
294 0 : });
295 892 : }
296 0 : }
297 892 : if creating_timelines.contains(&timeline_id) {
298 0 : return Err(TimelineExclusionError::AlreadyCreating);
299 892 : }
300 892 : creating_timelines.insert(timeline_id);
301 892 : drop(creating_timelines);
302 892 : drop(timelines_offloaded);
303 892 : drop(timelines);
304 892 : Ok(Self {
305 892 : _tenant_gate_guard,
306 892 : owning_tenant: Arc::clone(owning_tenant),
307 892 : timeline_id,
308 892 : timeline_path,
309 892 : idempotency,
310 892 : })
311 896 : }
312 : }
313 :
314 : impl Drop for TimelineCreateGuard {
315 888 : fn drop(&mut self) {
316 888 : self.owning_tenant
317 888 : .timelines_creating
318 888 : .lock()
319 888 : .unwrap()
320 888 : .remove(&self.timeline_id);
321 888 : }
322 : }
|