Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{Context, bail};
4 : use pageserver_api::models::ShardImportStatus;
5 : use remote_storage::RemotePath;
6 : use tokio_util::sync::CancellationToken;
7 : use tracing::info;
8 : use utils::lsn::Lsn;
9 :
10 : use super::Timeline;
11 : use crate::context::RequestContext;
12 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
13 : use crate::tenant::metadata::TimelineMetadata;
14 :
15 : mod flow;
16 : mod importbucket_client;
17 : mod importbucket_format;
18 : pub(crate) mod index_part_format;
19 :
20 0 : pub async fn doit(
21 0 : timeline: &Arc<Timeline>,
22 0 : index_part: index_part_format::Root,
23 0 : ctx: &RequestContext,
24 0 : cancel: CancellationToken,
25 0 : ) -> anyhow::Result<()> {
26 0 : let index_part_format::Root::V1(v1) = index_part;
27 : let index_part_format::InProgress {
28 0 : location,
29 0 : idempotency_key,
30 0 : started_at,
31 0 : } = match v1 {
32 0 : index_part_format::V1::Done(_) => return Ok(()),
33 0 : index_part_format::V1::InProgress(in_progress) => in_progress,
34 : };
35 :
36 0 : let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
37 :
38 0 : let status_prefix = RemotePath::from_string("status").unwrap();
39 0 :
40 0 : //
41 0 : // See if shard is done.
42 0 : // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
43 0 : //
44 0 : let shard_status_key =
45 0 : status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
46 0 : let shard_status: Option<importbucket_format::ShardStatus> =
47 0 : storage.get_json(&shard_status_key).await?;
48 0 : info!(?shard_status, "peeking shard status");
49 0 : if shard_status.map(|st| st.done).unwrap_or(false) {
50 0 : info!("shard status indicates that the shard is done, skipping import");
51 : } else {
52 : // TODO: checkpoint the progress into the IndexPart instead of restarting
53 : // from the beginning.
54 :
55 : //
56 : // Wipe the slate clean - the flow does not allow resuming.
57 : // We can implement resuming in the future by checkpointing the progress into the IndexPart.
58 : //
59 0 : info!("wipe the slate clean");
60 : {
61 : // TODO: do we need to hold GC lock for this?
62 0 : let mut guard = timeline.layers.write().await;
63 0 : assert!(
64 0 : guard.layer_map()?.open_layer.is_none(),
65 0 : "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
66 : );
67 0 : let all_layers_keys = guard.all_persistent_layers();
68 0 : let all_layers: Vec<_> = all_layers_keys
69 0 : .iter()
70 0 : .map(|key| guard.get_from_key(key))
71 0 : .collect();
72 0 : let open = guard.open_mut().context("open_mut")?;
73 :
74 0 : timeline.remote_client.schedule_gc_update(&all_layers)?;
75 0 : open.finish_gc_timeline(&all_layers);
76 0 : }
77 0 :
78 0 : //
79 0 : // Wait for pgdata to finish uploading
80 0 : //
81 0 : info!("wait for pgdata to reach status 'done'");
82 0 : let pgdata_status_key = status_prefix.join("pgdata");
83 : loop {
84 0 : let res = async {
85 0 : let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
86 0 : .get_json(&pgdata_status_key)
87 0 : .await
88 0 : .context("get pgdata status")?;
89 0 : info!(?pgdata_status, "peeking pgdata status");
90 0 : if pgdata_status.map(|st| st.done).unwrap_or(false) {
91 0 : Ok(())
92 : } else {
93 0 : Err(anyhow::anyhow!("pgdata not done yet"))
94 : }
95 0 : }
96 0 : .await;
97 0 : match res {
98 0 : Ok(_) => break,
99 0 : Err(err) => {
100 0 : info!(?err, "indefinitely waiting for pgdata to finish");
101 0 : if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
102 0 : .await
103 0 : .is_ok()
104 : {
105 0 : bail!("cancelled while waiting for pgdata");
106 0 : }
107 : }
108 : }
109 : }
110 :
111 : //
112 : // Do the import
113 : //
114 0 : info!("do the import");
115 0 : let control_file = storage.get_control_file().await?;
116 0 : let base_lsn = control_file.base_lsn();
117 0 :
118 0 : info!("update TimelineMetadata based on LSNs from control file");
119 0 : {
120 0 : let pg_version = control_file.pg_version();
121 0 : let _ctx: &RequestContext = ctx;
122 0 : async move {
123 0 : // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
124 0 : // checkpoint record, and prev_record_lsn should point to its beginning.
125 0 : // We should read the real end of the record from the WAL, but here we
126 0 : // just fake it.
127 0 : let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
128 0 : let prev_record_lsn = base_lsn;
129 0 : let metadata = TimelineMetadata::new(
130 0 : disk_consistent_lsn,
131 0 : Some(prev_record_lsn),
132 0 : None, // no ancestor
133 0 : Lsn(0), // no ancestor lsn
134 0 : base_lsn, // latest_gc_cutoff_lsn
135 0 : base_lsn, // initdb_lsn
136 0 : pg_version,
137 0 : );
138 0 :
139 0 : let _start_lsn = disk_consistent_lsn + 1;
140 0 :
141 0 : timeline
142 0 : .remote_client
143 0 : .schedule_index_upload_for_full_metadata_update(&metadata)?;
144 :
145 0 : timeline.remote_client.wait_completion().await?;
146 :
147 0 : anyhow::Ok(())
148 0 : }
149 0 : }
150 0 : .await?;
151 :
152 0 : flow::run(
153 0 : timeline.clone(),
154 0 : base_lsn,
155 0 : control_file,
156 0 : storage.clone(),
157 0 : ctx,
158 0 : )
159 0 : .await?;
160 :
161 : //
162 : // Communicate that shard is done.
163 : // Ensure at-least-once delivery of the upcall to storage controller
164 : // before we mark the task as done and never come here again.
165 : //
166 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
167 0 : .expect("storcon configured");
168 0 : storcon_client
169 0 : .put_timeline_import_status(
170 0 : timeline.tenant_shard_id,
171 0 : timeline.timeline_id,
172 0 : // TODO(vlad): What about import errors?
173 0 : ShardImportStatus::Done,
174 0 : )
175 0 : .await
176 0 : .map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
177 :
178 0 : storage
179 0 : .put_json(
180 0 : &shard_status_key,
181 0 : &importbucket_format::ShardStatus { done: true },
182 0 : )
183 0 : .await
184 0 : .context("put shard status")?;
185 : }
186 :
187 : //
188 : // Mark as done in index_part.
189 : // This makes subsequent timeline loads enter the normal load code path
190 : // instead of spawning the import task and calling this here function.
191 : //
192 0 : info!("mark import as complete in index part");
193 0 : timeline
194 0 : .remote_client
195 0 : .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
196 0 : index_part_format::V1::Done(index_part_format::Done {
197 0 : idempotency_key,
198 0 : started_at,
199 0 : finished_at: chrono::Utc::now().naive_utc(),
200 0 : }),
201 0 : )))?;
202 :
203 0 : timeline.remote_client.wait_completion().await?;
204 :
205 0 : Ok(())
206 0 : }
|