Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{Context, bail};
4 : use importbucket_client::{ControlFile, RemoteStorageWrapper};
5 : use pageserver_api::models::ShardImportStatus;
6 : use remote_storage::RemotePath;
7 : use tokio::task::JoinHandle;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::info;
10 : use utils::lsn::Lsn;
11 : use utils::pausable_failpoint;
12 : use utils::sync::gate::Gate;
13 :
14 : use super::{Timeline, TimelineDeleteProgress};
15 : use crate::context::RequestContext;
16 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
17 : use crate::tenant::metadata::TimelineMetadata;
18 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
19 :
20 : mod flow;
21 : mod importbucket_client;
22 : mod importbucket_format;
23 : pub(crate) mod index_part_format;
24 :
25 : pub struct ImportingTimeline {
26 : pub import_task_handle: JoinHandle<()>,
27 : pub import_task_gate: Gate,
28 : pub timeline: Arc<Timeline>,
29 : pub delete_progress: TimelineDeleteProgress,
30 : }
31 :
32 : impl std::fmt::Debug for ImportingTimeline {
33 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 0 : write!(f, "ImportingTimeline<{}>", self.timeline.timeline_id)
35 0 : }
36 : }
37 :
38 : impl ImportingTimeline {
39 0 : pub async fn shutdown(&self) {
40 0 : self.import_task_handle.abort();
41 0 : self.import_task_gate.close().await;
42 :
43 0 : self.timeline.remote_client.shutdown().await;
44 0 : }
45 : }
46 :
47 0 : pub async fn doit(
48 0 : timeline: &Arc<Timeline>,
49 0 : index_part: index_part_format::Root,
50 0 : ctx: &RequestContext,
51 0 : cancel: CancellationToken,
52 0 : ) -> anyhow::Result<()> {
53 0 : let index_part_format::Root::V1(v1) = index_part;
54 : let index_part_format::InProgress {
55 0 : location,
56 : idempotency_key: _,
57 : started_at: _,
58 0 : } = match v1 {
59 0 : index_part_format::V1::Done(_) => return Ok(()),
60 0 : index_part_format::V1::InProgress(in_progress) => in_progress,
61 : };
62 :
63 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
64 :
65 0 : let shard_status = storcon_client
66 0 : .get_timeline_import_status(
67 0 : timeline.tenant_shard_id,
68 0 : timeline.timeline_id,
69 0 : timeline.generation,
70 0 : )
71 0 : .await
72 0 : .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
73 :
74 0 : info!(?shard_status, "peeking shard status");
75 0 : match shard_status {
76 0 : ShardImportStatus::InProgress(maybe_progress) => {
77 0 : let storage =
78 0 : importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
79 :
80 0 : let control_file_res = if maybe_progress.is_none() {
81 : // Only prepare the import once when there's no progress.
82 0 : prepare_import(timeline, storage.clone(), &cancel).await
83 : } else {
84 0 : storage.get_control_file().await
85 : };
86 :
87 0 : let control_file = match control_file_res {
88 0 : Ok(cf) => cf,
89 0 : Err(err) => {
90 : return Err(
91 0 : terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
92 : );
93 : }
94 : };
95 :
96 0 : let res = flow::run(
97 0 : timeline.clone(),
98 0 : control_file,
99 0 : storage.clone(),
100 0 : maybe_progress,
101 0 : ctx,
102 0 : )
103 0 : .await;
104 0 : if let Err(err) = res {
105 : return Err(
106 0 : terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
107 : );
108 0 : }
109 :
110 0 : tracing::info!("Import plan executed. Flushing remote changes and notifying storcon");
111 :
112 0 : timeline
113 0 : .remote_client
114 0 : .schedule_index_upload_for_file_changes()?;
115 0 : timeline.remote_client.wait_completion().await?;
116 :
117 0 : pausable_failpoint!("import-timeline-pre-success-notify-pausable");
118 :
119 : // Communicate that shard is done.
120 : // Ensure at-least-once delivery of the upcall to storage controller
121 : // before we mark the task as done and never come here again.
122 : //
123 : // Note that we do not mark the import complete in the index part now.
124 : // This happens in [`Tenant::finalize_importing_timeline`] in response
125 : // to the storage controller calling
126 : // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
127 0 : storcon_client
128 0 : .put_timeline_import_status(
129 0 : timeline.tenant_shard_id,
130 0 : timeline.timeline_id,
131 0 : timeline.generation,
132 0 : ShardImportStatus::Done,
133 0 : )
134 0 : .await
135 0 : .map_err(|_err| {
136 0 : anyhow::anyhow!("Shut down while putting timeline import status")
137 0 : })?;
138 : }
139 0 : ShardImportStatus::Error(err) => {
140 0 : info!(
141 0 : "shard status indicates that the shard is done (error), skipping import {}",
142 : err
143 : );
144 : }
145 : ShardImportStatus::Done => {
146 0 : info!("shard status indicates that the shard is done (success), skipping import");
147 : }
148 : }
149 :
150 0 : Ok(())
151 0 : }
152 :
153 0 : async fn prepare_import(
154 0 : timeline: &Arc<Timeline>,
155 0 : storage: RemoteStorageWrapper,
156 0 : cancel: &CancellationToken,
157 0 : ) -> anyhow::Result<ControlFile> {
158 : // Wipe the slate clean before starting the import as a precaution.
159 : // This method is only called when there's no recorded checkpoint for the import
160 : // in the storage controller.
161 : //
162 : // Note that this is split-brain safe (two imports for same timeline shards running in
163 : // different generations) because we go through the usual deletion path, including deletion queue.
164 0 : info!("wipe the slate clean");
165 : {
166 : // TODO: do we need to hold GC lock for this?
167 0 : let mut guard = timeline
168 0 : .layers
169 0 : .write(LayerManagerLockHolder::ImportPgData)
170 0 : .await;
171 0 : assert!(
172 0 : guard.layer_map()?.open_layer.is_none(),
173 0 : "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
174 : );
175 0 : let all_layers_keys = guard.all_persistent_layers();
176 0 : let all_layers: Vec<_> = all_layers_keys
177 0 : .iter()
178 0 : .map(|key| guard.get_from_key(key))
179 0 : .collect();
180 0 : let open = guard.open_mut().context("open_mut")?;
181 :
182 0 : timeline.remote_client.schedule_gc_update(&all_layers)?;
183 0 : open.finish_gc_timeline(&all_layers);
184 : }
185 :
186 : //
187 : // Wait for pgdata to finish uploading
188 : //
189 0 : info!("wait for pgdata to reach status 'done'");
190 0 : let status_prefix = RemotePath::from_string("status").unwrap();
191 0 : let pgdata_status_key = status_prefix.join("pgdata");
192 : loop {
193 0 : let res = async {
194 0 : let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
195 0 : .get_json(&pgdata_status_key)
196 0 : .await
197 0 : .context("get pgdata status")?;
198 0 : info!(?pgdata_status, "peeking pgdata status");
199 0 : if pgdata_status.map(|st| st.done).unwrap_or(false) {
200 0 : Ok(())
201 : } else {
202 0 : Err(anyhow::anyhow!("pgdata not done yet"))
203 : }
204 0 : }
205 0 : .await;
206 0 : match res {
207 0 : Ok(_) => break,
208 0 : Err(_err) => {
209 0 : info!("indefinitely waiting for pgdata to finish");
210 0 : if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
211 0 : .await
212 0 : .is_ok()
213 : {
214 0 : bail!("cancelled while waiting for pgdata");
215 0 : }
216 : }
217 : }
218 : }
219 :
220 0 : let control_file = storage.get_control_file().await?;
221 0 : let base_lsn = control_file.base_lsn();
222 :
223 0 : info!("update TimelineMetadata based on LSNs from control file");
224 : {
225 0 : let pg_version = control_file.pg_version();
226 0 : async move {
227 : // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
228 : // checkpoint record, and prev_record_lsn should point to its beginning.
229 : // We should read the real end of the record from the WAL, but here we
230 : // just fake it.
231 0 : let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
232 0 : let prev_record_lsn = base_lsn;
233 0 : let metadata = TimelineMetadata::new(
234 0 : disk_consistent_lsn,
235 0 : Some(prev_record_lsn),
236 0 : None, // no ancestor
237 0 : Lsn(0), // no ancestor lsn
238 0 : base_lsn, // latest_gc_cutoff_lsn
239 0 : base_lsn, // initdb_lsn
240 0 : pg_version,
241 : );
242 :
243 0 : let _start_lsn = disk_consistent_lsn + 1;
244 :
245 0 : timeline
246 0 : .remote_client
247 0 : .schedule_index_upload_for_full_metadata_update(&metadata)?;
248 :
249 0 : timeline.remote_client.wait_completion().await?;
250 :
251 0 : anyhow::Ok(())
252 0 : }
253 : }
254 0 : .await?;
255 :
256 0 : Ok(control_file)
257 0 : }
258 :
259 0 : async fn terminate_flow_with_error(
260 0 : timeline: &Arc<Timeline>,
261 0 : error: anyhow::Error,
262 0 : storcon_client: &StorageControllerUpcallClient,
263 0 : cancel: &CancellationToken,
264 0 : ) -> anyhow::Error {
265 : // The import task is a aborted on tenant shutdown, so in principle, it should
266 : // never be cancelled. To be on the safe side, check the cancellation tokens
267 : // before marking the import as failed.
268 0 : if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
269 0 : let notify_res = storcon_client
270 0 : .put_timeline_import_status(
271 0 : timeline.tenant_shard_id,
272 0 : timeline.timeline_id,
273 0 : timeline.generation,
274 0 : ShardImportStatus::Error(format!("{error:#}")),
275 0 : )
276 0 : .await;
277 :
278 0 : if let Err(_notify_error) = notify_res {
279 : // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
280 : // forever internally, so errors returned by it can only be due to cancellation.
281 0 : info!("failed to notify storcon about permanent import error");
282 0 : }
283 :
284 : // Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
285 0 : error
286 : } else {
287 0 : anyhow::anyhow!("Import task cancelled")
288 : }
289 0 : }
|