Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{bail, Context};
4 : use remote_storage::RemotePath;
5 : use tokio_util::sync::CancellationToken;
6 : use tracing::{info, info_span, Instrument};
7 : use utils::lsn::Lsn;
8 :
9 : use crate::{context::RequestContext, tenant::metadata::TimelineMetadata};
10 :
11 : use super::Timeline;
12 :
13 : mod flow;
14 : mod importbucket_client;
15 : mod importbucket_format;
16 : pub(crate) mod index_part_format;
17 : pub(crate) mod upcall_api;
18 :
19 0 : pub async fn doit(
20 0 : timeline: &Arc<Timeline>,
21 0 : index_part: index_part_format::Root,
22 0 : ctx: &RequestContext,
23 0 : cancel: CancellationToken,
24 0 : ) -> anyhow::Result<()> {
25 0 : let index_part_format::Root::V1(v1) = index_part;
26 : let index_part_format::InProgress {
27 0 : location,
28 0 : idempotency_key,
29 0 : started_at,
30 0 : } = match v1 {
31 0 : index_part_format::V1::Done(_) => return Ok(()),
32 0 : index_part_format::V1::InProgress(in_progress) => in_progress,
33 : };
34 :
35 0 : let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
36 :
37 0 : info!("get spec early so we know we'll be able to upcall when done");
38 0 : let Some(spec) = storage.get_spec().await? else {
39 0 : bail!("spec not found")
40 : };
41 :
42 0 : let upcall_client =
43 0 : upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
44 :
45 : //
46 : // send an early progress update to clean up k8s job early and generate potentially useful logs
47 : //
48 0 : info!("send early progress update");
49 0 : upcall_client
50 0 : .send_progress_until_success(&spec)
51 0 : .instrument(info_span!("early_progress_update"))
52 0 : .await?;
53 :
54 0 : let status_prefix = RemotePath::from_string("status").unwrap();
55 0 :
56 0 : //
57 0 : // See if shard is done.
58 0 : // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
59 0 : //
60 0 : let shard_status_key =
61 0 : status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
62 0 : let shard_status: Option<importbucket_format::ShardStatus> =
63 0 : storage.get_json(&shard_status_key).await?;
64 0 : info!(?shard_status, "peeking shard status");
65 0 : if shard_status.map(|st| st.done).unwrap_or(false) {
66 0 : info!("shard status indicates that the shard is done, skipping import");
67 : } else {
68 : // TODO: checkpoint the progress into the IndexPart instead of restarting
69 : // from the beginning.
70 :
71 : //
72 : // Wipe the slate clean - the flow does not allow resuming.
73 : // We can implement resuming in the future by checkpointing the progress into the IndexPart.
74 : //
75 0 : info!("wipe the slate clean");
76 : {
77 : // TODO: do we need to hold GC lock for this?
78 0 : let mut guard = timeline.layers.write().await;
79 0 : assert!(
80 0 : guard.layer_map()?.open_layer.is_none(),
81 0 : "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
82 : );
83 0 : let all_layers_keys = guard.all_persistent_layers();
84 0 : let all_layers: Vec<_> = all_layers_keys
85 0 : .iter()
86 0 : .map(|key| guard.get_from_key(key))
87 0 : .collect();
88 0 : let open = guard.open_mut().context("open_mut")?;
89 :
90 0 : timeline.remote_client.schedule_gc_update(&all_layers)?;
91 0 : open.finish_gc_timeline(&all_layers);
92 0 : }
93 0 :
94 0 : //
95 0 : // Wait for pgdata to finish uploading
96 0 : //
97 0 : info!("wait for pgdata to reach status 'done'");
98 0 : let pgdata_status_key = status_prefix.join("pgdata");
99 : loop {
100 0 : let res = async {
101 0 : let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
102 0 : .get_json(&pgdata_status_key)
103 0 : .await
104 0 : .context("get pgdata status")?;
105 0 : info!(?pgdata_status, "peeking pgdata status");
106 0 : if pgdata_status.map(|st| st.done).unwrap_or(false) {
107 0 : Ok(())
108 : } else {
109 0 : Err(anyhow::anyhow!("pgdata not done yet"))
110 : }
111 0 : }
112 0 : .await;
113 0 : match res {
114 0 : Ok(_) => break,
115 0 : Err(err) => {
116 0 : info!(?err, "indefintely waiting for pgdata to finish");
117 0 : if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
118 0 : .await
119 0 : .is_ok()
120 : {
121 0 : bail!("cancelled while waiting for pgdata");
122 0 : }
123 : }
124 : }
125 : }
126 :
127 : //
128 : // Do the import
129 : //
130 0 : info!("do the import");
131 0 : let control_file = storage.get_control_file().await?;
132 0 : let base_lsn = control_file.base_lsn();
133 0 :
134 0 : info!("update TimelineMetadata based on LSNs from control file");
135 : {
136 0 : let pg_version = control_file.pg_version();
137 0 : let _ctx: &RequestContext = ctx;
138 0 : async move {
139 0 : // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
140 0 : // checkpoint record, and prev_record_lsn should point to its beginning.
141 0 : // We should read the real end of the record from the WAL, but here we
142 0 : // just fake it.
143 0 : let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
144 0 : let prev_record_lsn = base_lsn;
145 0 : let metadata = TimelineMetadata::new(
146 0 : disk_consistent_lsn,
147 0 : Some(prev_record_lsn),
148 0 : None, // no ancestor
149 0 : Lsn(0), // no ancestor lsn
150 0 : base_lsn, // latest_gc_cutoff_lsn
151 0 : base_lsn, // initdb_lsn
152 0 : pg_version,
153 0 : );
154 0 :
155 0 : let _start_lsn = disk_consistent_lsn + 1;
156 0 :
157 0 : timeline
158 0 : .remote_client
159 0 : .schedule_index_upload_for_full_metadata_update(&metadata)?;
160 :
161 0 : timeline.remote_client.wait_completion().await?;
162 :
163 0 : anyhow::Ok(())
164 0 : }
165 : }
166 0 : .await?;
167 :
168 0 : flow::run(
169 0 : timeline.clone(),
170 0 : base_lsn,
171 0 : control_file,
172 0 : storage.clone(),
173 0 : ctx,
174 0 : )
175 0 : .await?;
176 :
177 : //
178 : // Communicate that shard is done.
179 : //
180 0 : storage
181 0 : .put_json(
182 0 : &shard_status_key,
183 0 : &importbucket_format::ShardStatus { done: true },
184 0 : )
185 0 : .await
186 0 : .context("put shard status")?;
187 : }
188 :
189 : //
190 : // Ensure at-least-once deliver of the upcall to cplane
191 : // before we mark the task as done and never come here again.
192 : //
193 0 : info!("send final progress update");
194 0 : upcall_client
195 0 : .send_progress_until_success(&spec)
196 0 : .instrument(info_span!("final_progress_update"))
197 0 : .await?;
198 :
199 : //
200 : // Mark as done in index_part.
201 : // This makes subsequent timeline loads enter the normal load code path
202 : // instead of spawning the import task and calling this here function.
203 : //
204 0 : info!("mark import as complete in index part");
205 0 : timeline
206 0 : .remote_client
207 0 : .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
208 0 : index_part_format::V1::Done(index_part_format::Done {
209 0 : idempotency_key,
210 0 : started_at,
211 0 : finished_at: chrono::Utc::now().naive_utc(),
212 0 : }),
213 0 : )))?;
214 :
215 0 : timeline.remote_client.wait_completion().await?;
216 :
217 0 : Ok(())
218 0 : }
|