Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{Context, bail};
4 : use importbucket_client::{ControlFile, RemoteStorageWrapper};
5 : use pageserver_api::models::ShardImportStatus;
6 : use remote_storage::RemotePath;
7 : use tokio::task::JoinHandle;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::info;
10 : use utils::lsn::Lsn;
11 :
12 : use super::Timeline;
13 : use crate::context::RequestContext;
14 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
15 : use crate::tenant::metadata::TimelineMetadata;
16 :
17 : mod flow;
18 : mod importbucket_client;
19 : mod importbucket_format;
20 : pub(crate) mod index_part_format;
21 :
22 : pub(crate) struct ImportingTimeline {
23 : pub import_task_handle: JoinHandle<()>,
24 : pub timeline: Arc<Timeline>,
25 : }
26 :
27 : impl ImportingTimeline {
28 0 : pub(crate) fn shutdown(self) {
29 0 : self.import_task_handle.abort();
30 0 : }
31 : }
32 :
33 0 : pub async fn doit(
34 0 : timeline: &Arc<Timeline>,
35 0 : index_part: index_part_format::Root,
36 0 : ctx: &RequestContext,
37 0 : cancel: CancellationToken,
38 0 : ) -> anyhow::Result<()> {
39 0 : let index_part_format::Root::V1(v1) = index_part;
40 : let index_part_format::InProgress {
41 0 : location,
42 : idempotency_key: _,
43 : started_at: _,
44 0 : } = match v1 {
45 0 : index_part_format::V1::Done(_) => return Ok(()),
46 0 : index_part_format::V1::InProgress(in_progress) => in_progress,
47 0 : };
48 0 :
49 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
50 :
51 0 : let shard_status = storcon_client
52 0 : .get_timeline_import_status(
53 0 : timeline.tenant_shard_id,
54 0 : timeline.timeline_id,
55 0 : timeline.generation,
56 0 : )
57 0 : .await
58 0 : .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
59 :
60 0 : info!(?shard_status, "peeking shard status");
61 0 : match shard_status {
62 0 : ShardImportStatus::InProgress(maybe_progress) => {
63 0 : let storage =
64 0 : importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
65 :
66 0 : let control_file_res = if maybe_progress.is_none() {
67 : // Only prepare the import once when there's no progress.
68 0 : prepare_import(timeline, storage.clone(), &cancel).await
69 : } else {
70 0 : storage.get_control_file().await
71 : };
72 :
73 0 : let control_file = match control_file_res {
74 0 : Ok(cf) => cf,
75 0 : Err(err) => {
76 0 : return Err(
77 0 : terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
78 : );
79 : }
80 : };
81 :
82 0 : let res = flow::run(
83 0 : timeline.clone(),
84 0 : control_file,
85 0 : storage.clone(),
86 0 : maybe_progress,
87 0 : ctx,
88 0 : )
89 0 : .await;
90 0 : if let Err(err) = res {
91 : return Err(
92 0 : terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
93 : );
94 0 : }
95 0 :
96 0 : // Communicate that shard is done.
97 0 : // Ensure at-least-once delivery of the upcall to storage controller
98 0 : // before we mark the task as done and never come here again.
99 0 : //
100 0 : // Note that we do not mark the import complete in the index part now.
101 0 : // This happens in [`Tenant::finalize_importing_timeline`] in response
102 0 : // to the storage controller calling
103 0 : // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
104 0 : storcon_client
105 0 : .put_timeline_import_status(
106 0 : timeline.tenant_shard_id,
107 0 : timeline.timeline_id,
108 0 : timeline.generation,
109 0 : ShardImportStatus::Done,
110 0 : )
111 0 : .await
112 0 : .map_err(|_err| {
113 0 : anyhow::anyhow!("Shut down while putting timeline import status")
114 0 : })?;
115 : }
116 0 : ShardImportStatus::Error(err) => {
117 0 : info!(
118 0 : "shard status indicates that the shard is done (error), skipping import {}",
119 : err
120 : );
121 : }
122 : ShardImportStatus::Done => {
123 0 : info!("shard status indicates that the shard is done (success), skipping import");
124 : }
125 : }
126 :
127 0 : Ok(())
128 0 : }
129 :
130 0 : async fn prepare_import(
131 0 : timeline: &Arc<Timeline>,
132 0 : storage: RemoteStorageWrapper,
133 0 : cancel: &CancellationToken,
134 0 : ) -> anyhow::Result<ControlFile> {
135 0 : // Wipe the slate clean before starting the import as a precaution.
136 0 : // This method is only called when there's no recorded checkpoint for the import
137 0 : // in the storage controller.
138 0 : //
139 0 : // Note that this is split-brain safe (two imports for same timeline shards running in
140 0 : // different generations) because we go through the usual deletion path, including deletion queue.
141 0 : info!("wipe the slate clean");
142 : {
143 : // TODO: do we need to hold GC lock for this?
144 0 : let mut guard = timeline.layers.write().await;
145 0 : assert!(
146 0 : guard.layer_map()?.open_layer.is_none(),
147 0 : "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
148 : );
149 0 : let all_layers_keys = guard.all_persistent_layers();
150 0 : let all_layers: Vec<_> = all_layers_keys
151 0 : .iter()
152 0 : .map(|key| guard.get_from_key(key))
153 0 : .collect();
154 0 : let open = guard.open_mut().context("open_mut")?;
155 :
156 0 : timeline.remote_client.schedule_gc_update(&all_layers)?;
157 0 : open.finish_gc_timeline(&all_layers);
158 0 : }
159 0 :
160 0 : //
161 0 : // Wait for pgdata to finish uploading
162 0 : //
163 0 : info!("wait for pgdata to reach status 'done'");
164 0 : let status_prefix = RemotePath::from_string("status").unwrap();
165 0 : let pgdata_status_key = status_prefix.join("pgdata");
166 : loop {
167 0 : let res = async {
168 0 : let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
169 0 : .get_json(&pgdata_status_key)
170 0 : .await
171 0 : .context("get pgdata status")?;
172 0 : info!(?pgdata_status, "peeking pgdata status");
173 0 : if pgdata_status.map(|st| st.done).unwrap_or(false) {
174 0 : Ok(())
175 : } else {
176 0 : Err(anyhow::anyhow!("pgdata not done yet"))
177 : }
178 0 : }
179 0 : .await;
180 0 : match res {
181 0 : Ok(_) => break,
182 0 : Err(err) => {
183 0 : info!(?err, "indefinitely waiting for pgdata to finish");
184 0 : if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
185 0 : .await
186 0 : .is_ok()
187 : {
188 0 : bail!("cancelled while waiting for pgdata");
189 0 : }
190 : }
191 : }
192 : }
193 :
194 0 : let control_file = storage.get_control_file().await?;
195 0 : let base_lsn = control_file.base_lsn();
196 0 :
197 0 : info!("update TimelineMetadata based on LSNs from control file");
198 0 : {
199 0 : let pg_version = control_file.pg_version();
200 0 : async move {
201 0 : // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
202 0 : // checkpoint record, and prev_record_lsn should point to its beginning.
203 0 : // We should read the real end of the record from the WAL, but here we
204 0 : // just fake it.
205 0 : let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
206 0 : let prev_record_lsn = base_lsn;
207 0 : let metadata = TimelineMetadata::new(
208 0 : disk_consistent_lsn,
209 0 : Some(prev_record_lsn),
210 0 : None, // no ancestor
211 0 : Lsn(0), // no ancestor lsn
212 0 : base_lsn, // latest_gc_cutoff_lsn
213 0 : base_lsn, // initdb_lsn
214 0 : pg_version,
215 0 : );
216 0 :
217 0 : let _start_lsn = disk_consistent_lsn + 1;
218 0 :
219 0 : timeline
220 0 : .remote_client
221 0 : .schedule_index_upload_for_full_metadata_update(&metadata)?;
222 :
223 0 : timeline.remote_client.wait_completion().await?;
224 :
225 0 : anyhow::Ok(())
226 0 : }
227 0 : }
228 0 : .await?;
229 :
230 0 : Ok(control_file)
231 0 : }
232 :
233 0 : async fn terminate_flow_with_error(
234 0 : timeline: &Arc<Timeline>,
235 0 : error: anyhow::Error,
236 0 : storcon_client: &StorageControllerUpcallClient,
237 0 : cancel: &CancellationToken,
238 0 : ) -> anyhow::Error {
239 0 : // The import task is a aborted on tenant shutdown, so in principle, it should
240 0 : // never be cancelled. To be on the safe side, check the cancellation tokens
241 0 : // before marking the import as failed.
242 0 : if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
243 0 : let notify_res = storcon_client
244 0 : .put_timeline_import_status(
245 0 : timeline.tenant_shard_id,
246 0 : timeline.timeline_id,
247 0 : timeline.generation,
248 0 : ShardImportStatus::Error(format!("{error:#}")),
249 0 : )
250 0 : .await;
251 :
252 0 : if let Err(_notify_error) = notify_res {
253 : // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
254 : // forever internally, so errors returned by it can only be due to cancellation.
255 0 : info!("failed to notify storcon about permanent import error");
256 0 : }
257 :
258 : // Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
259 0 : error
260 : } else {
261 0 : anyhow::anyhow!("Import task cancelled")
262 : }
263 0 : }
|