Line data Source code
1 : use std::time::Duration;
2 : use std::{collections::HashMap, str::FromStr};
3 :
4 : use http_utils::error::ApiError;
5 : use reqwest::Method;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use pageserver_api::models::ShardImportStatus;
9 : use tokio_util::sync::CancellationToken;
10 : use utils::{
11 : id::{TenantId, TimelineId},
12 : shard::ShardIndex,
13 : };
14 :
15 : use crate::{persistence::TimelineImportPersistence, service::Config};
16 :
17 0 : #[derive(Deserialize, Serialize, PartialEq, Eq)]
18 : pub(crate) enum TimelineImportState {
19 : Importing,
20 : Idle,
21 : }
22 :
23 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
24 : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
25 :
26 : impl ShardImportStatuses {
27 0 : pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
28 0 : ShardImportStatuses(
29 0 : shards
30 0 : .into_iter()
31 0 : .map(|ts_id| (ts_id, ShardImportStatus::InProgress))
32 0 : .collect(),
33 0 : )
34 0 : }
35 : }
36 :
37 : #[derive(Debug)]
38 : pub(crate) struct TimelineImport {
39 : pub(crate) tenant_id: TenantId,
40 : pub(crate) timeline_id: TimelineId,
41 : pub(crate) shard_statuses: ShardImportStatuses,
42 : }
43 :
44 : pub(crate) enum TimelineImportUpdateFollowUp {
45 : Persist,
46 : None,
47 : }
48 :
49 : pub(crate) enum TimelineImportUpdateError {
50 : ImportNotFound {
51 : tenant_id: TenantId,
52 : timeline_id: TimelineId,
53 : },
54 : MismatchedShards,
55 : UnexpectedUpdate,
56 : }
57 :
58 : impl From<TimelineImportUpdateError> for ApiError {
59 0 : fn from(err: TimelineImportUpdateError) -> ApiError {
60 0 : match err {
61 : TimelineImportUpdateError::ImportNotFound {
62 0 : tenant_id,
63 0 : timeline_id,
64 0 : } => ApiError::NotFound(
65 0 : anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
66 0 : ),
67 : TimelineImportUpdateError::MismatchedShards => {
68 0 : ApiError::InternalServerError(anyhow::anyhow!(
69 0 : "Import shards do not match update request, likely a shard split happened during import, this is a bug"
70 0 : ))
71 : }
72 : TimelineImportUpdateError::UnexpectedUpdate => {
73 0 : ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
74 : }
75 : }
76 0 : }
77 : }
78 :
79 : impl TimelineImport {
80 0 : pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
81 0 : let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
82 0 : let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
83 0 : let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
84 :
85 0 : Ok(TimelineImport {
86 0 : tenant_id,
87 0 : timeline_id,
88 0 : shard_statuses,
89 0 : })
90 0 : }
91 :
92 0 : pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
93 0 : TimelineImportPersistence {
94 0 : tenant_id: self.tenant_id.to_string(),
95 0 : timeline_id: self.timeline_id.to_string(),
96 0 : shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
97 0 : }
98 0 : }
99 :
100 0 : pub(crate) fn update(
101 0 : &mut self,
102 0 : shard: ShardIndex,
103 0 : status: ShardImportStatus,
104 0 : ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
105 : use std::collections::hash_map::Entry::*;
106 :
107 0 : match self.shard_statuses.0.entry(shard) {
108 0 : Occupied(mut occ) => {
109 0 : let crnt = occ.get_mut();
110 0 : if *crnt == status {
111 0 : Ok(TimelineImportUpdateFollowUp::None)
112 0 : } else if crnt.is_terminal() && *crnt != status {
113 0 : Err(TimelineImportUpdateError::UnexpectedUpdate)
114 : } else {
115 0 : *crnt = status;
116 0 : Ok(TimelineImportUpdateFollowUp::Persist)
117 : }
118 : }
119 0 : Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
120 : }
121 0 : }
122 :
123 0 : pub(crate) fn is_complete(&self) -> bool {
124 0 : self.shard_statuses
125 0 : .0
126 0 : .values()
127 0 : .all(|status| status.is_terminal())
128 0 : }
129 :
130 0 : pub(crate) fn completion_error(&self) -> Option<String> {
131 0 : assert!(self.is_complete());
132 :
133 0 : let shard_errors: HashMap<_, _> = self
134 0 : .shard_statuses
135 0 : .0
136 0 : .iter()
137 0 : .filter_map(|(shard, status)| {
138 0 : if let ShardImportStatus::Error(err) = status {
139 0 : Some((*shard, err.clone()))
140 : } else {
141 0 : None
142 : }
143 0 : })
144 0 : .collect();
145 0 :
146 0 : if shard_errors.is_empty() {
147 0 : None
148 : } else {
149 0 : Some(serde_json::to_string(&shard_errors).unwrap())
150 : }
151 0 : }
152 : }
153 :
154 : pub(crate) struct UpcallClient {
155 : authorization_header: Option<String>,
156 : client: reqwest::Client,
157 : cancel: CancellationToken,
158 : base_url: String,
159 : }
160 :
161 : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
162 :
163 0 : #[derive(Serialize, Deserialize, Debug)]
164 : struct ImportCompleteRequest {
165 : tenant_id: TenantId,
166 : timeline_id: TimelineId,
167 : error: Option<String>,
168 : }
169 :
170 : impl UpcallClient {
171 0 : pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
172 0 : let authorization_header = config
173 0 : .control_plane_jwt_token
174 0 : .clone()
175 0 : .map(|jwt| format!("Bearer {}", jwt));
176 0 :
177 0 : let client = reqwest::ClientBuilder::new()
178 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
179 0 : .build()
180 0 : .expect("Failed to construct HTTP client");
181 0 :
182 0 : let base_url = config
183 0 : .control_plane_url
184 0 : .clone()
185 0 : .expect("must be configured");
186 0 :
187 0 : Self {
188 0 : authorization_header,
189 0 : client,
190 0 : cancel,
191 0 : base_url,
192 0 : }
193 0 : }
194 :
195 : /// Notify control plane of a completed import
196 : ///
197 : /// This method guarantees at least once delivery semantics assuming
198 : /// eventual cplane availability. The cplane API is idempotent.
199 0 : pub(crate) async fn notify_import_complete(
200 0 : &self,
201 0 : import: &TimelineImport,
202 0 : ) -> anyhow::Result<()> {
203 0 : let endpoint = if self.base_url.ends_with('/') {
204 0 : format!("{}import_complete", self.base_url)
205 : } else {
206 0 : format!("{}/import_complete", self.base_url)
207 : };
208 :
209 0 : tracing::info!("Endpoint is {endpoint}");
210 :
211 0 : let request = self
212 0 : .client
213 0 : .request(Method::PUT, endpoint)
214 0 : .json(&ImportCompleteRequest {
215 0 : tenant_id: import.tenant_id,
216 0 : timeline_id: import.timeline_id,
217 0 : error: import.completion_error(),
218 0 : })
219 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
220 :
221 0 : let request = if let Some(auth) = &self.authorization_header {
222 0 : request.header(reqwest::header::AUTHORIZATION, auth)
223 : } else {
224 0 : request
225 : };
226 :
227 : const RETRY_DELAY: Duration = Duration::from_secs(1);
228 0 : let mut attempt = 1;
229 :
230 : loop {
231 0 : if self.cancel.is_cancelled() {
232 0 : return Err(anyhow::anyhow!(
233 0 : "Shutting down while notifying cplane of import completion"
234 0 : ));
235 0 : }
236 0 :
237 0 : match request.try_clone().unwrap().send().await {
238 0 : Ok(response) if response.status().is_success() => {
239 0 : return Ok(());
240 : }
241 0 : Ok(response) => {
242 0 : tracing::warn!(
243 0 : "Import complete notification failed with status {}, attempt {}",
244 0 : response.status(),
245 : attempt
246 : );
247 : }
248 0 : Err(e) => {
249 0 : tracing::warn!(
250 0 : "Import complete notification failed with error: {}, attempt {}",
251 : e,
252 : attempt
253 : );
254 : }
255 : }
256 :
257 0 : tokio::select! {
258 0 : _ = tokio::time::sleep(RETRY_DELAY) => {}
259 0 : _ = self.cancel.cancelled() => {
260 0 : return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
261 : }
262 : }
263 0 : attempt += 1;
264 : }
265 0 : }
266 : }
|