Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{Context, bail};
4 : use importbucket_client::{ControlFile, RemoteStorageWrapper};
5 : use pageserver_api::models::ShardImportStatus;
6 : use remote_storage::RemotePath;
7 : use tokio::task::JoinHandle;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::info;
10 : use utils::lsn::Lsn;
11 :
12 : use super::Timeline;
13 : use crate::context::RequestContext;
14 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
15 : use crate::tenant::metadata::TimelineMetadata;
16 :
17 : mod flow;
18 : mod importbucket_client;
19 : mod importbucket_format;
20 : pub(crate) mod index_part_format;
21 :
22 : pub(crate) struct ImportingTimeline {
23 : pub import_task_handle: JoinHandle<()>,
24 : pub timeline: Arc<Timeline>,
25 : }
26 :
27 : impl ImportingTimeline {
28 0 : pub(crate) async fn shutdown(self) {
29 0 : self.import_task_handle.abort();
30 0 : let _ = self.import_task_handle.await;
31 :
32 0 : self.timeline.remote_client.shutdown().await;
33 0 : }
34 : }
35 :
36 0 : pub async fn doit(
37 0 : timeline: &Arc<Timeline>,
38 0 : index_part: index_part_format::Root,
39 0 : ctx: &RequestContext,
40 0 : cancel: CancellationToken,
41 0 : ) -> anyhow::Result<()> {
42 0 : let index_part_format::Root::V1(v1) = index_part;
43 : let index_part_format::InProgress {
44 0 : location,
45 : idempotency_key: _,
46 : started_at: _,
47 0 : } = match v1 {
48 0 : index_part_format::V1::Done(_) => return Ok(()),
49 0 : index_part_format::V1::InProgress(in_progress) => in_progress,
50 0 : };
51 0 :
52 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
53 :
54 0 : let shard_status = storcon_client
55 0 : .get_timeline_import_status(
56 0 : timeline.tenant_shard_id,
57 0 : timeline.timeline_id,
58 0 : timeline.generation,
59 0 : )
60 0 : .await
61 0 : .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
62 :
63 0 : info!(?shard_status, "peeking shard status");
64 0 : match shard_status {
65 0 : ShardImportStatus::InProgress(maybe_progress) => {
66 0 : let storage =
67 0 : importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
68 :
69 0 : let control_file_res = if maybe_progress.is_none() {
70 : // Only prepare the import once when there's no progress.
71 0 : prepare_import(timeline, storage.clone(), &cancel).await
72 : } else {
73 0 : storage.get_control_file().await
74 : };
75 :
76 0 : let control_file = match control_file_res {
77 0 : Ok(cf) => cf,
78 0 : Err(err) => {
79 0 : return Err(
80 0 : terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
81 : );
82 : }
83 : };
84 :
85 0 : let res = flow::run(
86 0 : timeline.clone(),
87 0 : control_file,
88 0 : storage.clone(),
89 0 : maybe_progress,
90 0 : ctx,
91 0 : )
92 0 : .await;
93 0 : if let Err(err) = res {
94 : return Err(
95 0 : terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
96 : );
97 0 : }
98 0 :
99 0 : timeline
100 0 : .remote_client
101 0 : .schedule_index_upload_for_file_changes()?;
102 0 : timeline.remote_client.wait_completion().await?;
103 :
104 : // Communicate that shard is done.
105 : // Ensure at-least-once delivery of the upcall to storage controller
106 : // before we mark the task as done and never come here again.
107 : //
108 : // Note that we do not mark the import complete in the index part now.
109 : // This happens in [`Tenant::finalize_importing_timeline`] in response
110 : // to the storage controller calling
111 : // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
112 0 : storcon_client
113 0 : .put_timeline_import_status(
114 0 : timeline.tenant_shard_id,
115 0 : timeline.timeline_id,
116 0 : timeline.generation,
117 0 : ShardImportStatus::Done,
118 0 : )
119 0 : .await
120 0 : .map_err(|_err| {
121 0 : anyhow::anyhow!("Shut down while putting timeline import status")
122 0 : })?;
123 : }
124 0 : ShardImportStatus::Error(err) => {
125 0 : info!(
126 0 : "shard status indicates that the shard is done (error), skipping import {}",
127 : err
128 : );
129 : }
130 : ShardImportStatus::Done => {
131 0 : info!("shard status indicates that the shard is done (success), skipping import");
132 : }
133 : }
134 :
135 0 : Ok(())
136 0 : }
137 :
138 0 : async fn prepare_import(
139 0 : timeline: &Arc<Timeline>,
140 0 : storage: RemoteStorageWrapper,
141 0 : cancel: &CancellationToken,
142 0 : ) -> anyhow::Result<ControlFile> {
143 0 : // Wipe the slate clean before starting the import as a precaution.
144 0 : // This method is only called when there's no recorded checkpoint for the import
145 0 : // in the storage controller.
146 0 : //
147 0 : // Note that this is split-brain safe (two imports for same timeline shards running in
148 0 : // different generations) because we go through the usual deletion path, including deletion queue.
149 0 : info!("wipe the slate clean");
150 : {
151 : // TODO: do we need to hold GC lock for this?
152 0 : let mut guard = timeline.layers.write().await;
153 0 : assert!(
154 0 : guard.layer_map()?.open_layer.is_none(),
155 0 : "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
156 : );
157 0 : let all_layers_keys = guard.all_persistent_layers();
158 0 : let all_layers: Vec<_> = all_layers_keys
159 0 : .iter()
160 0 : .map(|key| guard.get_from_key(key))
161 0 : .collect();
162 0 : let open = guard.open_mut().context("open_mut")?;
163 :
164 0 : timeline.remote_client.schedule_gc_update(&all_layers)?;
165 0 : open.finish_gc_timeline(&all_layers);
166 0 : }
167 0 :
168 0 : //
169 0 : // Wait for pgdata to finish uploading
170 0 : //
171 0 : info!("wait for pgdata to reach status 'done'");
172 0 : let status_prefix = RemotePath::from_string("status").unwrap();
173 0 : let pgdata_status_key = status_prefix.join("pgdata");
174 : loop {
175 0 : let res = async {
176 0 : let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
177 0 : .get_json(&pgdata_status_key)
178 0 : .await
179 0 : .context("get pgdata status")?;
180 0 : info!(?pgdata_status, "peeking pgdata status");
181 0 : if pgdata_status.map(|st| st.done).unwrap_or(false) {
182 0 : Ok(())
183 : } else {
184 0 : Err(anyhow::anyhow!("pgdata not done yet"))
185 : }
186 0 : }
187 0 : .await;
188 0 : match res {
189 0 : Ok(_) => break,
190 0 : Err(err) => {
191 0 : info!(?err, "indefinitely waiting for pgdata to finish");
192 0 : if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
193 0 : .await
194 0 : .is_ok()
195 : {
196 0 : bail!("cancelled while waiting for pgdata");
197 0 : }
198 : }
199 : }
200 : }
201 :
202 0 : let control_file = storage.get_control_file().await?;
203 0 : let base_lsn = control_file.base_lsn();
204 0 :
205 0 : info!("update TimelineMetadata based on LSNs from control file");
206 0 : {
207 0 : let pg_version = control_file.pg_version();
208 0 : async move {
209 0 : // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
210 0 : // checkpoint record, and prev_record_lsn should point to its beginning.
211 0 : // We should read the real end of the record from the WAL, but here we
212 0 : // just fake it.
213 0 : let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
214 0 : let prev_record_lsn = base_lsn;
215 0 : let metadata = TimelineMetadata::new(
216 0 : disk_consistent_lsn,
217 0 : Some(prev_record_lsn),
218 0 : None, // no ancestor
219 0 : Lsn(0), // no ancestor lsn
220 0 : base_lsn, // latest_gc_cutoff_lsn
221 0 : base_lsn, // initdb_lsn
222 0 : pg_version,
223 0 : );
224 0 :
225 0 : let _start_lsn = disk_consistent_lsn + 1;
226 0 :
227 0 : timeline
228 0 : .remote_client
229 0 : .schedule_index_upload_for_full_metadata_update(&metadata)?;
230 :
231 0 : timeline.remote_client.wait_completion().await?;
232 :
233 0 : anyhow::Ok(())
234 0 : }
235 0 : }
236 0 : .await?;
237 :
238 0 : Ok(control_file)
239 0 : }
240 :
241 0 : async fn terminate_flow_with_error(
242 0 : timeline: &Arc<Timeline>,
243 0 : error: anyhow::Error,
244 0 : storcon_client: &StorageControllerUpcallClient,
245 0 : cancel: &CancellationToken,
246 0 : ) -> anyhow::Error {
247 0 : // The import task is a aborted on tenant shutdown, so in principle, it should
248 0 : // never be cancelled. To be on the safe side, check the cancellation tokens
249 0 : // before marking the import as failed.
250 0 : if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
251 0 : let notify_res = storcon_client
252 0 : .put_timeline_import_status(
253 0 : timeline.tenant_shard_id,
254 0 : timeline.timeline_id,
255 0 : timeline.generation,
256 0 : ShardImportStatus::Error(format!("{error:#}")),
257 0 : )
258 0 : .await;
259 :
260 0 : if let Err(_notify_error) = notify_res {
261 : // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
262 : // forever internally, so errors returned by it can only be due to cancellation.
263 0 : info!("failed to notify storcon about permanent import error");
264 0 : }
265 :
266 : // Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
267 0 : error
268 : } else {
269 0 : anyhow::anyhow!("Import task cancelled")
270 : }
271 0 : }
|