Line data Source code
1 : use std::time::Duration;
2 : use std::{collections::HashMap, str::FromStr};
3 :
4 : use http_utils::error::ApiError;
5 : use reqwest::Method;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
9 : use tokio_util::sync::CancellationToken;
10 : use utils::{
11 : id::{TenantId, TimelineId},
12 : shard::ShardIndex,
13 : };
14 :
15 : use crate::{persistence::TimelineImportPersistence, service::Config};
16 :
17 0 : #[derive(Deserialize, Serialize, PartialEq, Eq)]
18 : pub(crate) enum TimelineImportState {
19 : Importing,
20 : Idle,
21 : }
22 :
23 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
24 : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
25 :
26 : impl ShardImportStatuses {
27 0 : pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
28 0 : ShardImportStatuses(
29 0 : shards
30 0 : .into_iter()
31 0 : .map(|ts_id| {
32 0 : (
33 0 : ts_id,
34 0 : ShardImportStatus::InProgress(None::<ShardImportProgress>),
35 0 : )
36 0 : })
37 0 : .collect(),
38 0 : )
39 0 : }
40 : }
41 :
42 : #[derive(Debug)]
43 : pub(crate) struct TimelineImport {
44 : pub(crate) tenant_id: TenantId,
45 : pub(crate) timeline_id: TimelineId,
46 : pub(crate) shard_statuses: ShardImportStatuses,
47 : }
48 :
49 : pub(crate) enum TimelineImportUpdateFollowUp {
50 : Persist,
51 : None,
52 : }
53 :
54 : #[derive(thiserror::Error, Debug)]
55 : pub(crate) enum TimelineImportFinalizeError {
56 : #[error("Shut down interrupted import finalize")]
57 : ShuttingDown,
58 : #[error("Mismatched shard detected during import finalize: {0}")]
59 : MismatchedShards(ShardIndex),
60 : }
61 :
62 : pub(crate) enum TimelineImportUpdateError {
63 : ImportNotFound {
64 : tenant_id: TenantId,
65 : timeline_id: TimelineId,
66 : },
67 : MismatchedShards,
68 : UnexpectedUpdate,
69 : }
70 :
71 : impl From<TimelineImportUpdateError> for ApiError {
72 0 : fn from(err: TimelineImportUpdateError) -> ApiError {
73 0 : match err {
74 : TimelineImportUpdateError::ImportNotFound {
75 0 : tenant_id,
76 0 : timeline_id,
77 0 : } => ApiError::NotFound(
78 0 : anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
79 0 : ),
80 : TimelineImportUpdateError::MismatchedShards => {
81 0 : ApiError::InternalServerError(anyhow::anyhow!(
82 0 : "Import shards do not match update request, likely a shard split happened during import, this is a bug"
83 0 : ))
84 : }
85 : TimelineImportUpdateError::UnexpectedUpdate => {
86 0 : ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
87 : }
88 : }
89 0 : }
90 : }
91 :
92 : impl TimelineImport {
93 0 : pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
94 0 : let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
95 0 : let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
96 0 : let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
97 :
98 0 : Ok(TimelineImport {
99 0 : tenant_id,
100 0 : timeline_id,
101 0 : shard_statuses,
102 0 : })
103 0 : }
104 :
105 0 : pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
106 0 : TimelineImportPersistence {
107 0 : tenant_id: self.tenant_id.to_string(),
108 0 : timeline_id: self.timeline_id.to_string(),
109 0 : shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
110 0 : }
111 0 : }
112 :
113 0 : pub(crate) fn update(
114 0 : &mut self,
115 0 : shard: ShardIndex,
116 0 : status: ShardImportStatus,
117 0 : ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
118 : use std::collections::hash_map::Entry::*;
119 :
120 0 : match self.shard_statuses.0.entry(shard) {
121 0 : Occupied(mut occ) => {
122 0 : let crnt = occ.get_mut();
123 0 : if *crnt == status {
124 0 : Ok(TimelineImportUpdateFollowUp::None)
125 0 : } else if crnt.is_terminal() && *crnt != status {
126 0 : Err(TimelineImportUpdateError::UnexpectedUpdate)
127 : } else {
128 0 : *crnt = status;
129 0 : Ok(TimelineImportUpdateFollowUp::Persist)
130 : }
131 : }
132 0 : Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
133 : }
134 0 : }
135 :
136 0 : pub(crate) fn is_complete(&self) -> bool {
137 0 : self.shard_statuses
138 0 : .0
139 0 : .values()
140 0 : .all(|status| status.is_terminal())
141 0 : }
142 :
143 0 : pub(crate) fn completion_error(&self) -> Option<String> {
144 0 : assert!(self.is_complete());
145 :
146 0 : let shard_errors: HashMap<_, _> = self
147 0 : .shard_statuses
148 0 : .0
149 0 : .iter()
150 0 : .filter_map(|(shard, status)| {
151 0 : if let ShardImportStatus::Error(err) = status {
152 0 : Some((*shard, err.clone()))
153 : } else {
154 0 : None
155 : }
156 0 : })
157 0 : .collect();
158 0 :
159 0 : if shard_errors.is_empty() {
160 0 : None
161 : } else {
162 0 : Some(serde_json::to_string(&shard_errors).unwrap())
163 : }
164 0 : }
165 : }
166 :
167 : pub(crate) type ImportResult = Result<(), String>;
168 :
169 : pub(crate) struct UpcallClient {
170 : authorization_header: Option<String>,
171 : client: reqwest::Client,
172 : cancel: CancellationToken,
173 : base_url: String,
174 : }
175 :
176 : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
177 :
178 0 : #[derive(Serialize, Deserialize, Debug)]
179 : struct ImportCompleteRequest {
180 : tenant_id: TenantId,
181 : timeline_id: TimelineId,
182 : error: Option<String>,
183 : }
184 :
185 : impl UpcallClient {
186 0 : pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
187 0 : let authorization_header = config
188 0 : .control_plane_jwt_token
189 0 : .clone()
190 0 : .map(|jwt| format!("Bearer {}", jwt));
191 0 :
192 0 : let client = reqwest::ClientBuilder::new()
193 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
194 0 : .build()
195 0 : .expect("Failed to construct HTTP client");
196 0 :
197 0 : let base_url = config
198 0 : .control_plane_url
199 0 : .clone()
200 0 : .expect("must be configured");
201 0 :
202 0 : Self {
203 0 : authorization_header,
204 0 : client,
205 0 : cancel,
206 0 : base_url,
207 0 : }
208 0 : }
209 :
210 : /// Notify control plane of a completed import
211 : ///
212 : /// This method guarantees at least once delivery semantics assuming
213 : /// eventual cplane availability. The cplane API is idempotent.
214 0 : pub(crate) async fn notify_import_complete(
215 0 : &self,
216 0 : tenant_id: TenantId,
217 0 : timeline_id: TimelineId,
218 0 : import_result: ImportResult,
219 0 : ) -> anyhow::Result<()> {
220 0 : let endpoint = if self.base_url.ends_with('/') {
221 0 : format!("{}import_complete", self.base_url)
222 : } else {
223 0 : format!("{}/import_complete", self.base_url)
224 : };
225 :
226 0 : let request = self
227 0 : .client
228 0 : .request(Method::PUT, endpoint)
229 0 : .json(&ImportCompleteRequest {
230 0 : tenant_id,
231 0 : timeline_id,
232 0 : error: import_result.err(),
233 0 : })
234 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
235 :
236 0 : let request = if let Some(auth) = &self.authorization_header {
237 0 : request.header(reqwest::header::AUTHORIZATION, auth)
238 : } else {
239 0 : request
240 : };
241 :
242 : const RETRY_DELAY: Duration = Duration::from_secs(1);
243 0 : let mut attempt = 1;
244 :
245 : loop {
246 0 : if self.cancel.is_cancelled() {
247 0 : return Err(anyhow::anyhow!(
248 0 : "Shutting down while notifying cplane of import completion"
249 0 : ));
250 0 : }
251 0 :
252 0 : match request.try_clone().unwrap().send().await {
253 0 : Ok(response) if response.status().is_success() => {
254 0 : return Ok(());
255 : }
256 0 : Ok(response) => {
257 0 : tracing::warn!(
258 0 : "Import complete notification failed with status {}, attempt {}",
259 0 : response.status(),
260 : attempt
261 : );
262 : }
263 0 : Err(e) => {
264 0 : tracing::warn!(
265 0 : "Import complete notification failed with error: {}, attempt {}",
266 : e,
267 : attempt
268 : );
269 : }
270 : }
271 :
272 0 : tokio::select! {
273 0 : _ = tokio::time::sleep(RETRY_DELAY) => {}
274 0 : _ = self.cancel.cancelled() => {
275 0 : return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
276 : }
277 : }
278 0 : attempt += 1;
279 : }
280 0 : }
281 : }
|