Line data Source code
1 : use std::collections::hash_map::Entry;
2 : use std::fs;
3 : use std::future::Future;
4 : use std::sync::Arc;
5 :
6 : use anyhow::Context;
7 : use camino::Utf8PathBuf;
8 : use tracing::{error, info, info_span};
9 : use utils::fs_ext;
10 : use utils::id::TimelineId;
11 : use utils::lsn::Lsn;
12 : use utils::sync::gate::GateGuard;
13 :
14 : use super::Timeline;
15 : use crate::context::RequestContext;
16 : use crate::import_datadir;
17 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
18 : use crate::tenant::{
19 : CreateTimelineError, CreateTimelineIdempotency, TenantShard, TimelineOrOffloaded,
20 : };
21 :
22 : /// A timeline with some of its files on disk, being initialized.
23 : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
24 : /// its local files are removed. If we crash while this class exists, then the timeline's local
25 : /// state is cleaned up during [`TenantShard::clean_up_timelines`], because the timeline's content isn't in remote storage.
26 : ///
27 : /// The caller is responsible for proper timeline data filling before the final init.
28 : #[must_use]
29 : pub struct UninitializedTimeline<'t> {
30 : pub(crate) owning_tenant: &'t TenantShard,
31 : timeline_id: TimelineId,
32 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
33 : /// Whether we spawned the inner Timeline's tasks such that we must later shut it down
34 : /// if aborting the timeline creation
35 : needs_shutdown: bool,
36 : }
37 :
38 : impl<'t> UninitializedTimeline<'t> {
39 2748 : pub(crate) fn new(
40 2748 : owning_tenant: &'t TenantShard,
41 2748 : timeline_id: TimelineId,
42 2748 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
43 2748 : ) -> Self {
44 2748 : Self {
45 2748 : owning_tenant,
46 2748 : timeline_id,
47 2748 : raw_timeline,
48 2748 : needs_shutdown: false,
49 2748 : }
50 2748 : }
51 :
52 : /// When writing data to this timeline during creation, use this wrapper: it will take care of
53 : /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
54 : /// later.
55 12 : pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
56 12 : where
57 12 : F: FnOnce(Arc<Timeline>) -> Fut,
58 12 : Fut: Future<Output = Result<(), CreateTimelineError>>,
59 12 : {
60 12 : debug_assert_current_span_has_tenant_and_timeline_id();
61 12 :
62 12 : // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
63 12 : self.needs_shutdown = true;
64 :
65 12 : let timeline = self.raw_timeline()?;
66 :
67 : // Spawn flush loop so that the Timeline is ready to accept writes
68 12 : timeline.maybe_spawn_flush_loop();
69 :
70 : // Invoke the provided function, which will write some data into the new timeline
71 12 : if let Err(e) = f(timeline.clone()).await {
72 0 : self.abort().await;
73 0 : return Err(e.into());
74 12 : }
75 :
76 : // Flush the underlying timeline's ephemeral layers to disk
77 12 : if let Err(e) = timeline
78 12 : .freeze_and_flush()
79 12 : .await
80 12 : .context("Failed to flush after timeline creation writes")
81 : {
82 0 : self.abort().await;
83 0 : return Err(e);
84 12 : }
85 12 :
86 12 : Ok(())
87 12 : }
88 :
89 0 : pub(crate) async fn abort(&self) {
90 0 : if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
91 0 : raw_timeline.shutdown(super::ShutdownMode::Hard).await;
92 0 : }
93 0 : }
94 :
95 : /// Finish timeline creation: insert it into the Tenant's timelines map
96 : ///
97 : /// This function launches the flush loop if not already done.
98 : ///
99 : /// The caller is responsible for activating the timeline (function `.activate()`).
100 2700 : pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
101 2700 : let timeline_id = self.timeline_id;
102 2700 : let tenant_shard_id = self.owning_tenant.tenant_shard_id;
103 2700 :
104 2700 : if self.raw_timeline.is_none() {
105 0 : self.abort().await;
106 :
107 0 : return Err(anyhow::anyhow!(
108 0 : "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
109 0 : ));
110 2700 : }
111 2700 :
112 2700 : // Check that the caller initialized disk_consistent_lsn
113 2700 : let new_disk_consistent_lsn = self
114 2700 : .raw_timeline
115 2700 : .as_ref()
116 2700 : .expect("checked above")
117 2700 : .0
118 2700 : .get_disk_consistent_lsn();
119 2700 :
120 2700 : if !new_disk_consistent_lsn.is_valid() {
121 0 : self.abort().await;
122 :
123 0 : return Err(anyhow::anyhow!(
124 0 : "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
125 0 : ));
126 2700 : }
127 2700 :
128 2700 : let mut timelines = self.owning_tenant.timelines.lock().unwrap();
129 2700 : match timelines.entry(timeline_id) {
130 : Entry::Occupied(_) => {
131 : // Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline.
132 : //
133 : // We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should
134 : // skip trying to delete the timeline directory too.
135 0 : anyhow::bail!(
136 0 : "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
137 0 : )
138 : }
139 2700 : Entry::Vacant(v) => {
140 2700 : // after taking here should be no fallible operations, because the drop guard will not
141 2700 : // cleanup after and would block for example the tenant deletion
142 2700 : let (new_timeline, _create_guard) =
143 2700 : self.raw_timeline.take().expect("already checked");
144 2700 :
145 2700 : v.insert(Arc::clone(&new_timeline));
146 2700 :
147 2700 : new_timeline.maybe_spawn_flush_loop();
148 2700 :
149 2700 : Ok(new_timeline)
150 : }
151 : }
152 2700 : }
153 :
154 0 : pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
155 0 : self.raw_timeline.take().expect("already checked")
156 0 : }
157 :
158 : /// Prepares timeline data by loading it from the basebackup archive.
159 0 : pub(crate) async fn import_basebackup_from_tar(
160 0 : mut self,
161 0 : tenant: Arc<TenantShard>,
162 0 : copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
163 0 : base_lsn: Lsn,
164 0 : broker_client: storage_broker::BrokerClientChannel,
165 0 : ctx: &RequestContext,
166 0 : ) -> anyhow::Result<Arc<Timeline>> {
167 0 : self.write(|raw_timeline| async move {
168 0 : import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
169 0 : .await
170 0 : .context("Failed to import basebackup")
171 0 : .map_err(CreateTimelineError::Other)?;
172 :
173 0 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
174 0 : Err(CreateTimelineError::Other(anyhow::anyhow!(
175 0 : "failpoint before-checkpoint-new-timeline"
176 0 : )))
177 0 : });
178 :
179 0 : Ok(())
180 0 : })
181 0 : .await?;
182 :
183 : // All the data has been imported. Insert the Timeline into the tenant's timelines map
184 0 : let tl = self.finish_creation().await?;
185 0 : tl.activate(tenant, broker_client, None, ctx);
186 0 : Ok(tl)
187 0 : }
188 :
189 1344 : pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
190 1344 : Ok(&self
191 1344 : .raw_timeline
192 1344 : .as_ref()
193 1344 : .with_context(|| {
194 0 : format!(
195 0 : "No raw timeline {}/{} found",
196 0 : self.owning_tenant.tenant_shard_id, self.timeline_id
197 0 : )
198 1344 : })?
199 : .0)
200 1344 : }
201 : }
202 :
203 : impl Drop for UninitializedTimeline<'_> {
204 2736 : fn drop(&mut self) {
205 2736 : if let Some((timeline, create_guard)) = self.raw_timeline.take() {
206 36 : let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
207 36 : if self.needs_shutdown && !timeline.gate.close_complete() {
208 : // This should not happen: caller should call [`Self::abort`] on failures
209 0 : tracing::warn!(
210 0 : "Timeline not shut down after initialization failure, cannot clean up files"
211 : );
212 : } else {
213 : // This is unusual, but can happen harmlessly if the pageserver is stopped while
214 : // creating a timeline.
215 36 : info!("Timeline got dropped without initializing, cleaning its files");
216 36 : cleanup_timeline_directory(create_guard);
217 : }
218 2700 : }
219 2736 : }
220 : }
221 :
222 36 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
223 36 : let timeline_path = &create_guard.timeline_path;
224 36 : match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
225 : Ok(()) => {
226 36 : info!("Timeline dir {timeline_path:?} removed successfully")
227 : }
228 0 : Err(e) => {
229 0 : error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
230 : }
231 : }
232 : // Having cleaned up, we can release this TimelineId in `[TenantShard::timelines_creating]` to allow other
233 : // timeline creation attempts under this TimelineId to proceed
234 36 : drop(create_guard);
235 36 : }
236 :
237 : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
238 : /// is kept in `[TenantShard::timelines_creating]` to exclude concurrent attempts to create the same timeline.
239 : #[must_use]
240 : pub(crate) struct TimelineCreateGuard {
241 : pub(crate) _tenant_gate_guard: GateGuard,
242 : pub(crate) owning_tenant: Arc<TenantShard>,
243 : pub(crate) timeline_id: TimelineId,
244 : pub(crate) timeline_path: Utf8PathBuf,
245 : pub(crate) idempotency: CreateTimelineIdempotency,
246 : }
247 :
248 : /// Errors when acquiring exclusive access to a timeline ID for creation
249 : #[derive(thiserror::Error, Debug)]
250 : pub(crate) enum TimelineExclusionError {
251 : #[error("Already exists")]
252 : AlreadyExists {
253 : existing: TimelineOrOffloaded,
254 : arg: CreateTimelineIdempotency,
255 : },
256 : #[error("Already creating")]
257 : AlreadyCreating,
258 : #[error("Shutting down")]
259 : ShuttingDown,
260 :
261 : // e.g. I/O errors, or some failure deep in postgres initdb
262 : #[error(transparent)]
263 : Other(#[from] anyhow::Error),
264 : }
265 :
266 : impl TimelineCreateGuard {
267 2784 : pub(crate) fn new(
268 2784 : owning_tenant: &Arc<TenantShard>,
269 2784 : timeline_id: TimelineId,
270 2784 : timeline_path: Utf8PathBuf,
271 2784 : idempotency: CreateTimelineIdempotency,
272 2784 : allow_offloaded: bool,
273 2784 : ) -> Result<Self, TimelineExclusionError> {
274 2784 : let _tenant_gate_guard = owning_tenant
275 2784 : .gate
276 2784 : .enter()
277 2784 : .map_err(|_| TimelineExclusionError::ShuttingDown)?;
278 :
279 : // Lock order: this is the only place we take both locks. During drop() we only
280 : // lock creating_timelines
281 2784 : let timelines = owning_tenant.timelines.lock().unwrap();
282 2784 : let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
283 2784 : let mut creating_timelines: std::sync::MutexGuard<
284 2784 : '_,
285 2784 : std::collections::HashSet<TimelineId>,
286 2784 : > = owning_tenant.timelines_creating.lock().unwrap();
287 :
288 2784 : if let Some(existing) = timelines.get(&timeline_id) {
289 12 : return Err(TimelineExclusionError::AlreadyExists {
290 12 : existing: TimelineOrOffloaded::Timeline(existing.clone()),
291 12 : arg: idempotency,
292 12 : });
293 2772 : }
294 2772 : if !allow_offloaded {
295 2772 : if let Some(existing) = timelines_offloaded.get(&timeline_id) {
296 0 : return Err(TimelineExclusionError::AlreadyExists {
297 0 : existing: TimelineOrOffloaded::Offloaded(existing.clone()),
298 0 : arg: idempotency,
299 0 : });
300 2772 : }
301 0 : }
302 2772 : if creating_timelines.contains(&timeline_id) {
303 0 : return Err(TimelineExclusionError::AlreadyCreating);
304 2772 : }
305 2772 : creating_timelines.insert(timeline_id);
306 2772 : drop(creating_timelines);
307 2772 : drop(timelines_offloaded);
308 2772 : drop(timelines);
309 2772 : Ok(Self {
310 2772 : _tenant_gate_guard,
311 2772 : owning_tenant: Arc::clone(owning_tenant),
312 2772 : timeline_id,
313 2772 : timeline_path,
314 2772 : idempotency,
315 2772 : })
316 2784 : }
317 : }
318 :
319 : impl Drop for TimelineCreateGuard {
320 2760 : fn drop(&mut self) {
321 2760 : self.owning_tenant
322 2760 : .timelines_creating
323 2760 : .lock()
324 2760 : .unwrap()
325 2760 : .remove(&self.timeline_id);
326 2760 : }
327 : }
|