Line data Source code
1 : use std::{collections::hash_map::Entry, fs, sync::Arc};
2 :
3 : use anyhow::Context;
4 : use camino::Utf8PathBuf;
5 : use tracing::{error, info, info_span};
6 : use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
7 :
8 : use crate::{
9 : context::RequestContext,
10 : import_datadir,
11 : tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
12 : };
13 :
14 : use super::Timeline;
15 :
16 : /// A timeline with some of its files on disk, being initialized.
17 : /// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
18 : /// its local files are removed. If we crash while this class exists, then the timeline's local
19 : /// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
20 : ///
21 : /// The caller is responsible for proper timeline data filling before the final init.
22 : #[must_use]
23 : pub struct UninitializedTimeline<'t> {
24 : pub(crate) owning_tenant: &'t Tenant,
25 : timeline_id: TimelineId,
26 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
27 : }
28 :
29 : impl<'t> UninitializedTimeline<'t> {
30 412 : pub(crate) fn new(
31 412 : owning_tenant: &'t Tenant,
32 412 : timeline_id: TimelineId,
33 412 : raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
34 412 : ) -> Self {
35 412 : Self {
36 412 : owning_tenant,
37 412 : timeline_id,
38 412 : raw_timeline,
39 412 : }
40 412 : }
41 :
42 : /// Finish timeline creation: insert it into the Tenant's timelines map
43 : ///
44 : /// This function launches the flush loop if not already done.
45 : ///
46 : /// The caller is responsible for activating the timeline (function `.activate()`).
47 404 : pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
48 404 : let timeline_id = self.timeline_id;
49 404 : let tenant_shard_id = self.owning_tenant.tenant_shard_id;
50 404 :
51 404 : if self.raw_timeline.is_none() {
52 0 : return Err(anyhow::anyhow!(
53 0 : "No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
54 0 : ));
55 404 : }
56 404 :
57 404 : // Check that the caller initialized disk_consistent_lsn
58 404 : let new_disk_consistent_lsn = self
59 404 : .raw_timeline
60 404 : .as_ref()
61 404 : .expect("checked above")
62 404 : .0
63 404 : .get_disk_consistent_lsn();
64 404 :
65 404 : anyhow::ensure!(
66 404 : new_disk_consistent_lsn.is_valid(),
67 0 : "new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
68 : );
69 :
70 404 : let mut timelines = self.owning_tenant.timelines.lock().unwrap();
71 404 : match timelines.entry(timeline_id) {
72 0 : Entry::Occupied(_) => anyhow::bail!(
73 0 : "Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
74 0 : ),
75 404 : Entry::Vacant(v) => {
76 404 : // after taking here should be no fallible operations, because the drop guard will not
77 404 : // cleanup after and would block for example the tenant deletion
78 404 : let (new_timeline, _create_guard) =
79 404 : self.raw_timeline.take().expect("already checked");
80 404 :
81 404 : v.insert(Arc::clone(&new_timeline));
82 404 :
83 404 : new_timeline.maybe_spawn_flush_loop();
84 404 :
85 404 : Ok(new_timeline)
86 : }
87 : }
88 404 : }
89 :
90 0 : pub(crate) fn finish_creation_myself(&mut self) -> (Arc<Timeline>, TimelineCreateGuard) {
91 0 : self.raw_timeline.take().expect("already checked")
92 0 : }
93 :
94 : /// Prepares timeline data by loading it from the basebackup archive.
95 0 : pub(crate) async fn import_basebackup_from_tar(
96 0 : self,
97 0 : tenant: Arc<Tenant>,
98 0 : copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
99 0 : base_lsn: Lsn,
100 0 : broker_client: storage_broker::BrokerClientChannel,
101 0 : ctx: &RequestContext,
102 0 : ) -> anyhow::Result<Arc<Timeline>> {
103 0 : let raw_timeline = self.raw_timeline()?;
104 :
105 0 : import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
106 0 : .await
107 0 : .context("Failed to import basebackup")?;
108 :
109 : // Flush the new layer files to disk, before we make the timeline as available to
110 : // the outside world.
111 : //
112 : // Flush loop needs to be spawned in order to be able to flush.
113 0 : raw_timeline.maybe_spawn_flush_loop();
114 0 :
115 0 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
116 0 : anyhow::bail!("failpoint before-checkpoint-new-timeline");
117 0 : });
118 :
119 0 : raw_timeline
120 0 : .freeze_and_flush()
121 0 : .await
122 0 : .context("Failed to flush after basebackup import")?;
123 :
124 : // All the data has been imported. Insert the Timeline into the tenant's timelines map
125 0 : let tl = self.finish_creation()?;
126 0 : tl.activate(tenant, broker_client, None, ctx);
127 0 : Ok(tl)
128 0 : }
129 :
130 184 : pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
131 184 : Ok(&self
132 184 : .raw_timeline
133 184 : .as_ref()
134 184 : .with_context(|| {
135 0 : format!(
136 0 : "No raw timeline {}/{} found",
137 0 : self.owning_tenant.tenant_shard_id, self.timeline_id
138 0 : )
139 184 : })?
140 : .0)
141 184 : }
142 : }
143 :
144 : impl Drop for UninitializedTimeline<'_> {
145 410 : fn drop(&mut self) {
146 410 : if let Some((_, create_guard)) = self.raw_timeline.take() {
147 6 : let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
148 6 : // This is unusual, but can happen harmlessly if the pageserver is stopped while
149 6 : // creating a timeline.
150 6 : info!("Timeline got dropped without initializing, cleaning its files");
151 6 : cleanup_timeline_directory(create_guard);
152 404 : }
153 410 : }
154 : }
155 :
156 6 : pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
157 6 : let timeline_path = &create_guard.timeline_path;
158 6 : match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
159 : Ok(()) => {
160 6 : info!("Timeline dir {timeline_path:?} removed successfully")
161 : }
162 0 : Err(e) => {
163 0 : error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
164 : }
165 : }
166 : // Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
167 : // timeline creation attempts under this TimelineId to proceed
168 6 : drop(create_guard);
169 6 : }
170 :
171 : /// A guard for timeline creations in process: as long as this object exists, the timeline ID
172 : /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
173 : #[must_use]
174 : pub(crate) struct TimelineCreateGuard {
175 : pub(crate) _tenant_gate_guard: GateGuard,
176 : pub(crate) owning_tenant: Arc<Tenant>,
177 : pub(crate) timeline_id: TimelineId,
178 : pub(crate) timeline_path: Utf8PathBuf,
179 : pub(crate) idempotency: CreateTimelineIdempotency,
180 : }
181 :
182 : /// Errors when acquiring exclusive access to a timeline ID for creation
183 0 : #[derive(thiserror::Error, Debug)]
184 : pub(crate) enum TimelineExclusionError {
185 : #[error("Already exists")]
186 : AlreadyExists {
187 : existing: TimelineOrOffloaded,
188 : arg: CreateTimelineIdempotency,
189 : },
190 : #[error("Already creating")]
191 : AlreadyCreating,
192 : #[error("Shutting down")]
193 : ShuttingDown,
194 :
195 : // e.g. I/O errors, or some failure deep in postgres initdb
196 : #[error(transparent)]
197 : Other(#[from] anyhow::Error),
198 : }
199 :
200 : impl TimelineCreateGuard {
201 418 : pub(crate) fn new(
202 418 : owning_tenant: &Arc<Tenant>,
203 418 : timeline_id: TimelineId,
204 418 : timeline_path: Utf8PathBuf,
205 418 : idempotency: CreateTimelineIdempotency,
206 418 : allow_offloaded: bool,
207 418 : ) -> Result<Self, TimelineExclusionError> {
208 418 : let _tenant_gate_guard = owning_tenant
209 418 : .gate
210 418 : .enter()
211 418 : .map_err(|_| TimelineExclusionError::ShuttingDown)?;
212 :
213 : // Lock order: this is the only place we take both locks. During drop() we only
214 : // lock creating_timelines
215 418 : let timelines = owning_tenant.timelines.lock().unwrap();
216 418 : let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
217 418 : let mut creating_timelines: std::sync::MutexGuard<
218 418 : '_,
219 418 : std::collections::HashSet<TimelineId>,
220 418 : > = owning_tenant.timelines_creating.lock().unwrap();
221 :
222 418 : if let Some(existing) = timelines.get(&timeline_id) {
223 2 : return Err(TimelineExclusionError::AlreadyExists {
224 2 : existing: TimelineOrOffloaded::Timeline(existing.clone()),
225 2 : arg: idempotency,
226 2 : });
227 416 : }
228 416 : if !allow_offloaded {
229 416 : if let Some(existing) = timelines_offloaded.get(&timeline_id) {
230 0 : return Err(TimelineExclusionError::AlreadyExists {
231 0 : existing: TimelineOrOffloaded::Offloaded(existing.clone()),
232 0 : arg: idempotency,
233 0 : });
234 416 : }
235 0 : }
236 416 : if creating_timelines.contains(&timeline_id) {
237 0 : return Err(TimelineExclusionError::AlreadyCreating);
238 416 : }
239 416 : creating_timelines.insert(timeline_id);
240 416 : drop(creating_timelines);
241 416 : drop(timelines_offloaded);
242 416 : drop(timelines);
243 416 : Ok(Self {
244 416 : _tenant_gate_guard,
245 416 : owning_tenant: Arc::clone(owning_tenant),
246 416 : timeline_id,
247 416 : timeline_path,
248 416 : idempotency,
249 416 : })
250 418 : }
251 : }
252 :
253 : impl Drop for TimelineCreateGuard {
254 414 : fn drop(&mut self) {
255 414 : self.owning_tenant
256 414 : .timelines_creating
257 414 : .lock()
258 414 : .unwrap()
259 414 : .remove(&self.timeline_id);
260 414 : }
261 : }
|