Line data Source code
1 : use std::collections::hash_map::Entry;
2 : use std::fs;
3 : use std::future::Future;
4 : use std::sync::Arc;
5 :
6 : use anyhow::Context;
7 : use camino::Utf8PathBuf;
8 : use tracing::{error, info, info_span};
9 : use utils::fs_ext;
10 : use utils::id::TimelineId;
11 : use utils::lsn::Lsn;
12 : use utils::sync::gate::GateGuard;
13 :
14 : use super::Timeline;
15 : use crate::context::RequestContext;
16 : use crate::import_datadir;
17 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
18 : use crate::tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded};
19 :
20 : /// A timeline with some of its files on disk, being initialized.
21 : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
22 : /// its local files are removed. If we crash while this class exists, then the timeline's local
23 : /// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
24 : ///
25 : /// The caller is responsible for proper timeline data filling before the final init.
26 : #[must_use]
27 : pub struct UninitializedTimeline<'t> {
28 : pub(crate) owning_tenant: &'t Tenant,
29 : timeline_id: TimelineId,
30 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
31 : /// Whether we spawned the inner Timeline's tasks such that we must later shut it down
32 : /// if aborting the timeline creation
33 : needs_shutdown: bool,
34 : }
35 :
36 : impl<'t> UninitializedTimeline<'t> {
37 892 : pub(crate) fn new(
38 892 : owning_tenant: &'t Tenant,
39 892 : timeline_id: TimelineId,
40 892 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
41 892 : ) -> Self {
42 892 : Self {
43 892 : owning_tenant,
44 892 : timeline_id,
45 892 : raw_timeline,
46 892 : needs_shutdown: false,
47 892 : }
48 892 : }
49 :
50 : /// When writing data to this timeline during creation, use this wrapper: it will take care of
51 : /// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
52 : /// later.
53 4 : pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
54 4 : where
55 4 : F: FnOnce(Arc<Timeline>) -> Fut,
56 4 : Fut: Future<Output = Result<(), CreateTimelineError>>,
57 4 : {
58 4 : debug_assert_current_span_has_tenant_and_timeline_id();
59 4 :
60 4 : // Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
61 4 : self.needs_shutdown = true;
62 :
63 4 : let timeline = self.raw_timeline()?;
64 :
65 : // Spawn flush loop so that the Timeline is ready to accept writes
66 4 : timeline.maybe_spawn_flush_loop();
67 :
68 : // Invoke the provided function, which will write some data into the new timeline
69 4 : if let Err(e) = f(timeline.clone()).await {
70 0 : self.abort().await;
71 0 : return Err(e.into());
72 4 : }
73 :
74 : // Flush the underlying timeline's ephemeral layers to disk
75 4 : if let Err(e) = timeline
76 4 : .freeze_and_flush()
77 4 : .await
78 4 : .context("Failed to flush after timeline creation writes")
79 : {
80 0 : self.abort().await;
81 0 : return Err(e);
82 4 : }
83 4 :
84 4 : Ok(())
85 4 : }
86 :
87 0 : pub(crate) async fn abort(&self) {
88 0 : if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
89 0 : raw_timeline.shutdown(super::ShutdownMode::Hard).await;
90 0 : }
91 0 : }
92 :
93 : /// Finish timeline creation: insert it into the Tenant's timelines map
94 : ///
95 : /// This function launches the flush loop if not already done.
96 : ///
97 : /// The caller is responsible for activating the timeline (function `.activate()`).
98 876 : pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
99 876 : let timeline_id = self.timeline_id;
100 876 : let tenant_shard_id = self.owning_tenant.tenant_shard_id;
101 876 :
102 876 : if self.raw_timeline.is_none() {
103 0 : self.abort().await;
104 :
105 0 : return Err(anyhow::anyhow!(
106 0 : "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
107 0 : ));
108 876 : }
109 876 :
110 876 : // Check that the caller initialized disk_consistent_lsn
111 876 : let new_disk_consistent_lsn = self
112 876 : .raw_timeline
113 876 : .as_ref()
114 876 : .expect("checked above")
115 876 : .0
116 876 : .get_disk_consistent_lsn();
117 876 :
118 876 : if !new_disk_consistent_lsn.is_valid() {
119 0 : self.abort().await;
120 :
121 0 : return Err(anyhow::anyhow!(
122 0 : "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
123 0 : ));
124 876 : }
125 876 :
126 876 : let mut timelines = self.owning_tenant.timelines.lock().unwrap();
127 876 : match timelines.entry(timeline_id) {
128 : Entry::Occupied(_) => {
129 : // Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline.
130 : //
131 : // We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should
132 : // skip trying to delete the timeline directory too.
133 0 : anyhow::bail!(
134 0 : "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
135 0 : )
136 : }
137 876 : Entry::Vacant(v) => {
138 876 : // after taking here should be no fallible operations, because the drop guard will not
139 876 : // cleanup after and would block for example the tenant deletion
140 876 : let (new_timeline, _create_guard) =
141 876 : self.raw_timeline.take().expect("already checked");
142 876 :
143 876 : v.insert(Arc::clone(&new_timeline));
144 876 :
145 876 : new_timeline.maybe_spawn_flush_loop();
146 876 :
147 876 : Ok(new_timeline)
148 : }
149 : }
150 876 : }
151 :
152 0 : pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
153 0 : self.raw_timeline.take().expect("already checked")
154 0 : }
155 :
156 : /// Prepares timeline data by loading it from the basebackup archive.
157 0 : pub(crate) async fn import_basebackup_from_tar(
158 0 : mut self,
159 0 : tenant: Arc<Tenant>,
160 0 : copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
161 0 : base_lsn: Lsn,
162 0 : broker_client: storage_broker::BrokerClientChannel,
163 0 : ctx: &RequestContext,
164 0 : ) -> anyhow::Result<Arc<Timeline>> {
165 0 : self.write(|raw_timeline| async move {
166 0 : import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
167 0 : .await
168 0 : .context("Failed to import basebackup")
169 0 : .map_err(CreateTimelineError::Other)?;
170 :
171 0 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
172 0 : Err(CreateTimelineError::Other(anyhow::anyhow!(
173 0 : "failpoint before-checkpoint-new-timeline"
174 0 : )))
175 0 : });
176 :
177 0 : Ok(())
178 0 : })
179 0 : .await?;
180 :
181 : // All the data has been imported. Insert the Timeline into the tenant's timelines map
182 0 : let tl = self.finish_creation().await?;
183 0 : tl.activate(tenant, broker_client, None, ctx);
184 0 : Ok(tl)
185 0 : }
186 :
187 436 : pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
188 436 : Ok(&self
189 436 : .raw_timeline
190 436 : .as_ref()
191 436 : .with_context(|| {
192 0 : format!(
193 0 : "No raw timeline {}/{} found",
194 0 : self.owning_tenant.tenant_shard_id, self.timeline_id
195 0 : )
196 436 : })?
197 : .0)
198 436 : }
199 : }
200 :
201 : impl Drop for UninitializedTimeline<'_> {
202 888 : fn drop(&mut self) {
203 888 : if let Some((timeline, create_guard)) = self.raw_timeline.take() {
204 12 : let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
205 12 : if self.needs_shutdown && !timeline.gate.close_complete() {
206 : // This should not happen: caller should call [`Self::abort`] on failures
207 0 : tracing::warn!(
208 0 : "Timeline not shut down after initialization failure, cannot clean up files"
209 : );
210 : } else {
211 : // This is unusual, but can happen harmlessly if the pageserver is stopped while
212 : // creating a timeline.
213 12 : info!("Timeline got dropped without initializing, cleaning its files");
214 12 : cleanup_timeline_directory(create_guard);
215 : }
216 876 : }
217 888 : }
218 : }
219 :
220 12 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
221 12 : let timeline_path = &create_guard.timeline_path;
222 12 : match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
223 : Ok(()) => {
224 12 : info!("Timeline dir {timeline_path:?} removed successfully")
225 : }
226 0 : Err(e) => {
227 0 : error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
228 : }
229 : }
230 : // Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
231 : // timeline creation attempts under this TimelineId to proceed
232 12 : drop(create_guard);
233 12 : }
234 :
235 : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
236 : /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
237 : #[must_use]
238 : pub(crate) struct TimelineCreateGuard {
239 : pub(crate) _tenant_gate_guard: GateGuard,
240 : pub(crate) owning_tenant: Arc<Tenant>,
241 : pub(crate) timeline_id: TimelineId,
242 : pub(crate) timeline_path: Utf8PathBuf,
243 : pub(crate) idempotency: CreateTimelineIdempotency,
244 : }
245 :
246 : /// Errors when acquiring exclusive access to a timeline ID for creation
247 : #[derive(thiserror::Error, Debug)]
248 : pub(crate) enum TimelineExclusionError {
249 : #[error("Already exists")]
250 : AlreadyExists {
251 : existing: TimelineOrOffloaded,
252 : arg: CreateTimelineIdempotency,
253 : },
254 : #[error("Already creating")]
255 : AlreadyCreating,
256 : #[error("Shutting down")]
257 : ShuttingDown,
258 :
259 : // e.g. I/O errors, or some failure deep in postgres initdb
260 : #[error(transparent)]
261 : Other(#[from] anyhow::Error),
262 : }
263 :
264 : impl TimelineCreateGuard {
265 904 : pub(crate) fn new(
266 904 : owning_tenant: &Arc<Tenant>,
267 904 : timeline_id: TimelineId,
268 904 : timeline_path: Utf8PathBuf,
269 904 : idempotency: CreateTimelineIdempotency,
270 904 : allow_offloaded: bool,
271 904 : ) -> Result<Self, TimelineExclusionError> {
272 904 : let _tenant_gate_guard = owning_tenant
273 904 : .gate
274 904 : .enter()
275 904 : .map_err(|_| TimelineExclusionError::ShuttingDown)?;
276 :
277 : // Lock order: this is the only place we take both locks. During drop() we only
278 : // lock creating_timelines
279 904 : let timelines = owning_tenant.timelines.lock().unwrap();
280 904 : let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
281 904 : let mut creating_timelines: std::sync::MutexGuard<
282 904 : '_,
283 904 : std::collections::HashSet<TimelineId>,
284 904 : > = owning_tenant.timelines_creating.lock().unwrap();
285 :
286 904 : if let Some(existing) = timelines.get(&timeline_id) {
287 4 : return Err(TimelineExclusionError::AlreadyExists {
288 4 : existing: TimelineOrOffloaded::Timeline(existing.clone()),
289 4 : arg: idempotency,
290 4 : });
291 900 : }
292 900 : if !allow_offloaded {
293 900 : if let Some(existing) = timelines_offloaded.get(&timeline_id) {
294 0 : return Err(TimelineExclusionError::AlreadyExists {
295 0 : existing: TimelineOrOffloaded::Offloaded(existing.clone()),
296 0 : arg: idempotency,
297 0 : });
298 900 : }
299 0 : }
300 900 : if creating_timelines.contains(&timeline_id) {
301 0 : return Err(TimelineExclusionError::AlreadyCreating);
302 900 : }
303 900 : creating_timelines.insert(timeline_id);
304 900 : drop(creating_timelines);
305 900 : drop(timelines_offloaded);
306 900 : drop(timelines);
307 900 : Ok(Self {
308 900 : _tenant_gate_guard,
309 900 : owning_tenant: Arc::clone(owning_tenant),
310 900 : timeline_id,
311 900 : timeline_path,
312 900 : idempotency,
313 900 : })
314 904 : }
315 : }
316 :
317 : impl Drop for TimelineCreateGuard {
318 896 : fn drop(&mut self) {
319 896 : self.owning_tenant
320 896 : .timelines_creating
321 896 : .lock()
322 896 : .unwrap()
323 896 : .remove(&self.timeline_id);
324 896 : }
325 : }
|