Line data Source code
1 : use std::time::Duration;
2 : use std::{collections::HashMap, str::FromStr};
3 :
4 : use http_utils::error::ApiError;
5 : use reqwest::Method;
6 : use serde::{Deserialize, Serialize};
7 :
8 : use pageserver_api::models::ShardImportStatus;
9 : use tokio_util::sync::CancellationToken;
10 : use utils::{
11 : id::{TenantId, TimelineId},
12 : shard::ShardIndex,
13 : };
14 :
15 : use crate::{persistence::TimelineImportPersistence, service::Config};
16 :
17 0 : #[derive(Serialize, Deserialize, Clone, Debug)]
18 : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
19 :
20 : impl ShardImportStatuses {
21 0 : pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
22 0 : ShardImportStatuses(
23 0 : shards
24 0 : .into_iter()
25 0 : .map(|ts_id| (ts_id, ShardImportStatus::InProgress))
26 0 : .collect(),
27 0 : )
28 0 : }
29 : }
30 :
31 : #[derive(Debug)]
32 : pub(crate) struct TimelineImport {
33 : pub(crate) tenant_id: TenantId,
34 : pub(crate) timeline_id: TimelineId,
35 : pub(crate) shard_statuses: ShardImportStatuses,
36 : }
37 :
38 : pub(crate) enum TimelineImportUpdateFollowUp {
39 : Persist,
40 : None,
41 : }
42 :
43 : pub(crate) enum TimelineImportUpdateError {
44 : ImportNotFound {
45 : tenant_id: TenantId,
46 : timeline_id: TimelineId,
47 : },
48 : MismatchedShards,
49 : UnexpectedUpdate,
50 : }
51 :
52 : impl From<TimelineImportUpdateError> for ApiError {
53 0 : fn from(err: TimelineImportUpdateError) -> ApiError {
54 0 : match err {
55 : TimelineImportUpdateError::ImportNotFound {
56 0 : tenant_id,
57 0 : timeline_id,
58 0 : } => ApiError::NotFound(
59 0 : anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
60 0 : ),
61 : TimelineImportUpdateError::MismatchedShards => {
62 0 : ApiError::InternalServerError(anyhow::anyhow!(
63 0 : "Import shards do not match update request, likely a shard split happened during import, this is a bug"
64 0 : ))
65 : }
66 : TimelineImportUpdateError::UnexpectedUpdate => {
67 0 : ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
68 : }
69 : }
70 0 : }
71 : }
72 :
73 : impl TimelineImport {
74 0 : pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
75 0 : let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
76 0 : let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
77 0 : let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
78 :
79 0 : Ok(TimelineImport {
80 0 : tenant_id,
81 0 : timeline_id,
82 0 : shard_statuses,
83 0 : })
84 0 : }
85 :
86 0 : pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
87 0 : TimelineImportPersistence {
88 0 : tenant_id: self.tenant_id.to_string(),
89 0 : timeline_id: self.timeline_id.to_string(),
90 0 : shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
91 0 : }
92 0 : }
93 :
94 0 : pub(crate) fn update(
95 0 : &mut self,
96 0 : shard: ShardIndex,
97 0 : status: ShardImportStatus,
98 0 : ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
99 : use std::collections::hash_map::Entry::*;
100 :
101 0 : match self.shard_statuses.0.entry(shard) {
102 0 : Occupied(mut occ) => {
103 0 : let crnt = occ.get_mut();
104 0 : if *crnt == status {
105 0 : Ok(TimelineImportUpdateFollowUp::None)
106 0 : } else if crnt.is_terminal() && *crnt != status {
107 0 : Err(TimelineImportUpdateError::UnexpectedUpdate)
108 : } else {
109 0 : *crnt = status;
110 0 : Ok(TimelineImportUpdateFollowUp::Persist)
111 : }
112 : }
113 0 : Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
114 : }
115 0 : }
116 :
117 0 : pub(crate) fn is_complete(&self) -> bool {
118 0 : self.shard_statuses
119 0 : .0
120 0 : .values()
121 0 : .all(|status| status.is_terminal())
122 0 : }
123 :
124 0 : pub(crate) fn completion_error(&self) -> Option<String> {
125 0 : assert!(self.is_complete());
126 :
127 0 : let shard_errors: HashMap<_, _> = self
128 0 : .shard_statuses
129 0 : .0
130 0 : .iter()
131 0 : .filter_map(|(shard, status)| {
132 0 : if let ShardImportStatus::Error(err) = status {
133 0 : Some((*shard, err.clone()))
134 : } else {
135 0 : None
136 : }
137 0 : })
138 0 : .collect();
139 0 :
140 0 : if shard_errors.is_empty() {
141 0 : None
142 : } else {
143 0 : Some(serde_json::to_string(&shard_errors).unwrap())
144 : }
145 0 : }
146 : }
147 :
148 : pub(crate) struct UpcallClient {
149 : authorization_header: Option<String>,
150 : client: reqwest::Client,
151 : cancel: CancellationToken,
152 : base_url: String,
153 : }
154 :
155 : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
156 :
157 0 : #[derive(Serialize, Deserialize, Debug)]
158 : struct ImportCompleteRequest {
159 : tenant_id: TenantId,
160 : timeline_id: TimelineId,
161 : error: Option<String>,
162 : }
163 :
164 : impl UpcallClient {
165 0 : pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
166 0 : let authorization_header = config
167 0 : .control_plane_jwt_token
168 0 : .clone()
169 0 : .map(|jwt| format!("Bearer {}", jwt));
170 0 :
171 0 : let client = reqwest::ClientBuilder::new()
172 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
173 0 : .build()
174 0 : .expect("Failed to construct HTTP client");
175 0 :
176 0 : let base_url = config
177 0 : .control_plane_url
178 0 : .clone()
179 0 : .expect("must be configured");
180 0 :
181 0 : Self {
182 0 : authorization_header,
183 0 : client,
184 0 : cancel,
185 0 : base_url,
186 0 : }
187 0 : }
188 :
189 : /// Notify control plane of a completed import
190 : ///
191 : /// This method guarantees at least once delivery semantics assuming
192 : /// eventual cplane availability. The cplane API is idempotent.
193 0 : pub(crate) async fn notify_import_complete(
194 0 : &self,
195 0 : import: &TimelineImport,
196 0 : ) -> anyhow::Result<()> {
197 0 : let endpoint = if self.base_url.ends_with('/') {
198 0 : format!("{}import_complete", self.base_url)
199 : } else {
200 0 : format!("{}/import_complete", self.base_url)
201 : };
202 :
203 0 : tracing::info!("Endpoint is {endpoint}");
204 :
205 0 : let request = self
206 0 : .client
207 0 : .request(Method::PUT, endpoint)
208 0 : .json(&ImportCompleteRequest {
209 0 : tenant_id: import.tenant_id,
210 0 : timeline_id: import.timeline_id,
211 0 : error: import.completion_error(),
212 0 : })
213 0 : .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
214 :
215 0 : let request = if let Some(auth) = &self.authorization_header {
216 0 : request.header(reqwest::header::AUTHORIZATION, auth)
217 : } else {
218 0 : request
219 : };
220 :
221 : const RETRY_DELAY: Duration = Duration::from_secs(1);
222 0 : let mut attempt = 1;
223 :
224 : loop {
225 0 : if self.cancel.is_cancelled() {
226 0 : return Err(anyhow::anyhow!(
227 0 : "Shutting down while notifying cplane of import completion"
228 0 : ));
229 0 : }
230 0 :
231 0 : match request.try_clone().unwrap().send().await {
232 0 : Ok(response) if response.status().is_success() => {
233 0 : return Ok(());
234 : }
235 0 : Ok(response) => {
236 0 : tracing::warn!(
237 0 : "Import complete notification failed with status {}, attempt {}",
238 0 : response.status(),
239 : attempt
240 : );
241 : }
242 0 : Err(e) => {
243 0 : tracing::warn!(
244 0 : "Import complete notification failed with error: {}, attempt {}",
245 : e,
246 : attempt
247 : );
248 : }
249 : }
250 :
251 0 : tokio::select! {
252 0 : _ = tokio::time::sleep(RETRY_DELAY) => {}
253 0 : _ = self.cancel.cancelled() => {
254 0 : return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
255 : }
256 : }
257 0 : attempt += 1;
258 : }
259 0 : }
260 : }
|