Line data Source code
1 : use std::time::Duration;
2 : use std::{collections::HashMap, str::FromStr};
3 :
4 : use http_utils::error::ApiError;
5 : use reqwest::Method;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
9 : use tokio_util::sync::CancellationToken;
10 : use utils::sync::gate::Gate;
11 : use utils::{
12 : id::{TenantId, TimelineId},
13 : shard::ShardIndex,
14 : };
15 :
16 : use crate::{persistence::TimelineImportPersistence, service::Config};
17 :
18 0 : #[derive(Deserialize, Serialize, PartialEq, Eq)]
19 : pub(crate) enum TimelineImportState {
20 : Importing,
21 : Idle,
22 : }
23 :
24 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
25 : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
26 :
27 : impl ShardImportStatuses {
28 0 : pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
29 : ShardImportStatuses(
30 0 : shards
31 0 : .into_iter()
32 0 : .map(|ts_id| {
33 0 : (
34 0 : ts_id,
35 0 : ShardImportStatus::InProgress(None::<ShardImportProgress>),
36 0 : )
37 0 : })
38 0 : .collect(),
39 : )
40 0 : }
41 : }
42 :
43 : #[derive(Debug)]
44 : pub(crate) struct TimelineImport {
45 : pub(crate) tenant_id: TenantId,
46 : pub(crate) timeline_id: TimelineId,
47 : pub(crate) shard_statuses: ShardImportStatuses,
48 : }
49 :
50 : pub(crate) enum TimelineImportUpdateFollowUp {
51 : Persist,
52 : None,
53 : }
54 :
55 : #[derive(thiserror::Error, Debug)]
56 : pub(crate) enum TimelineImportFinalizeError {
57 : #[error("Shut down interrupted import finalize")]
58 : ShuttingDown,
59 : #[error("Import finalization was cancelled")]
60 : Cancelled,
61 : #[error("Mismatched shard detected during import finalize: {0}")]
62 : MismatchedShards(ShardIndex),
63 : }
64 :
65 : pub(crate) enum TimelineImportUpdateError {
66 : ImportNotFound {
67 : tenant_id: TenantId,
68 : timeline_id: TimelineId,
69 : },
70 : MismatchedShards,
71 : UnexpectedUpdate,
72 : }
73 :
74 : impl From<TimelineImportUpdateError> for ApiError {
75 0 : fn from(err: TimelineImportUpdateError) -> ApiError {
76 0 : match err {
77 : TimelineImportUpdateError::ImportNotFound {
78 0 : tenant_id,
79 0 : timeline_id,
80 0 : } => ApiError::NotFound(
81 0 : anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
82 0 : ),
83 : TimelineImportUpdateError::MismatchedShards => {
84 0 : ApiError::InternalServerError(anyhow::anyhow!(
85 0 : "Import shards do not match update request, likely a shard split happened during import, this is a bug"
86 0 : ))
87 : }
88 : TimelineImportUpdateError::UnexpectedUpdate => {
89 0 : ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
90 : }
91 : }
92 0 : }
93 : }
94 :
95 : impl TimelineImport {
96 0 : pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
97 0 : let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
98 0 : let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
99 0 : let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
100 :
101 0 : Ok(TimelineImport {
102 0 : tenant_id,
103 0 : timeline_id,
104 0 : shard_statuses,
105 0 : })
106 0 : }
107 :
108 0 : pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
109 0 : TimelineImportPersistence {
110 0 : tenant_id: self.tenant_id.to_string(),
111 0 : timeline_id: self.timeline_id.to_string(),
112 0 : shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
113 0 : }
114 0 : }
115 :
116 0 : pub(crate) fn update(
117 0 : &mut self,
118 0 : shard: ShardIndex,
119 0 : status: ShardImportStatus,
120 0 : ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
121 : use std::collections::hash_map::Entry::*;
122 :
123 0 : match self.shard_statuses.0.entry(shard) {
124 0 : Occupied(mut occ) => {
125 0 : let crnt = occ.get_mut();
126 0 : if *crnt == status {
127 0 : Ok(TimelineImportUpdateFollowUp::None)
128 0 : } else if crnt.is_terminal() && *crnt != status {
129 0 : Err(TimelineImportUpdateError::UnexpectedUpdate)
130 : } else {
131 0 : *crnt = status;
132 0 : Ok(TimelineImportUpdateFollowUp::Persist)
133 : }
134 : }
135 0 : Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
136 : }
137 0 : }
138 :
139 0 : pub(crate) fn is_complete(&self) -> bool {
140 0 : self.shard_statuses
141 0 : .0
142 0 : .values()
143 0 : .all(|status| status.is_terminal())
144 0 : }
145 :
146 0 : pub(crate) fn completion_error(&self) -> Option<String> {
147 0 : assert!(self.is_complete());
148 :
149 0 : let shard_errors: HashMap<_, _> = self
150 0 : .shard_statuses
151 0 : .0
152 0 : .iter()
153 0 : .filter_map(|(shard, status)| {
154 0 : if let ShardImportStatus::Error(err) = status {
155 0 : Some((*shard, err.clone()))
156 : } else {
157 0 : None
158 : }
159 0 : })
160 0 : .collect();
161 :
162 0 : if shard_errors.is_empty() {
163 0 : None
164 : } else {
165 0 : Some(serde_json::to_string(&shard_errors).unwrap())
166 : }
167 0 : }
168 : }
169 :
170 : pub(crate) struct FinalizingImport {
171 : pub(crate) gate: Gate,
172 : pub(crate) cancel: CancellationToken,
173 : }
174 :
175 : pub(crate) type ImportResult = Result<(), String>;
176 :
177 : pub(crate) struct UpcallClient {
178 : authorization_header: Option<String>,
179 : client: reqwest::Client,
180 : cancel: CancellationToken,
181 : base_url: String,
182 : }
183 :
184 : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
185 :
186 0 : #[derive(Serialize, Deserialize, Debug)]
187 : struct ImportCompleteRequest {
188 : tenant_id: TenantId,
189 : timeline_id: TimelineId,
190 : error: Option<String>,
191 : }
192 :
193 : impl UpcallClient {
194 0 : pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
195 0 : let authorization_header = config
196 0 : .control_plane_jwt_token
197 0 : .clone()
198 0 : .map(|jwt| format!("Bearer {jwt}"));
199 :
200 0 : let client = reqwest::ClientBuilder::new()
201 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
202 0 : .build()
203 0 : .expect("Failed to construct HTTP client");
204 :
205 0 : let base_url = config
206 0 : .control_plane_url
207 0 : .clone()
208 0 : .expect("must be configured");
209 :
210 0 : Self {
211 0 : authorization_header,
212 0 : client,
213 0 : cancel,
214 0 : base_url,
215 0 : }
216 0 : }
217 :
218 : /// Notify control plane of a completed import
219 : ///
220 : /// This method guarantees at least once delivery semantics assuming
221 : /// eventual cplane availability. The cplane API is idempotent.
222 0 : pub(crate) async fn notify_import_complete(
223 0 : &self,
224 0 : tenant_id: TenantId,
225 0 : timeline_id: TimelineId,
226 0 : import_result: ImportResult,
227 0 : ) -> anyhow::Result<()> {
228 0 : let endpoint = if self.base_url.ends_with('/') {
229 0 : format!("{}import_complete", self.base_url)
230 : } else {
231 0 : format!("{}/import_complete", self.base_url)
232 : };
233 :
234 0 : let request = self
235 0 : .client
236 0 : .request(Method::PUT, endpoint)
237 0 : .json(&ImportCompleteRequest {
238 0 : tenant_id,
239 0 : timeline_id,
240 0 : error: import_result.err(),
241 0 : })
242 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
243 :
244 0 : let request = if let Some(auth) = &self.authorization_header {
245 0 : request.header(reqwest::header::AUTHORIZATION, auth)
246 : } else {
247 0 : request
248 : };
249 :
250 : const RETRY_DELAY: Duration = Duration::from_secs(1);
251 0 : let mut attempt = 1;
252 :
253 : loop {
254 0 : if self.cancel.is_cancelled() {
255 0 : return Err(anyhow::anyhow!(
256 0 : "Shutting down while notifying cplane of import completion"
257 0 : ));
258 0 : }
259 :
260 0 : match request.try_clone().unwrap().send().await {
261 0 : Ok(response) if response.status().is_success() => {
262 0 : return Ok(());
263 : }
264 0 : Ok(response) => {
265 0 : tracing::warn!(
266 0 : "Import complete notification failed with status {}, attempt {}",
267 0 : response.status(),
268 : attempt
269 : );
270 : }
271 0 : Err(e) => {
272 0 : tracing::warn!(
273 0 : "Import complete notification failed with error: {}, attempt {}",
274 : e,
275 : attempt
276 : );
277 : }
278 : }
279 :
280 0 : tokio::select! {
281 0 : _ = tokio::time::sleep(RETRY_DELAY) => {}
282 0 : _ = self.cancel.cancelled() => {
283 0 : return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
284 : }
285 : }
286 0 : attempt += 1;
287 : }
288 0 : }
289 : }
|