Line data Source code
1 : use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
2 :
3 : use clashmap::{ClashMap, Entry};
4 : use safekeeper_api::models::PullTimelineRequest;
5 : use safekeeper_client::mgmt_api;
6 : use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
7 : use tokio_util::sync::CancellationToken;
8 : use tracing::Instrument;
9 : use utils::{
10 : id::{NodeId, TenantId, TimelineId},
11 : logging::SecretString,
12 : };
13 :
14 : use crate::{
15 : persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
16 : safekeeper_client::SafekeeperClient,
17 : };
18 :
19 : use super::Service;
20 :
21 : pub(crate) struct SafekeeperReconcilers {
22 : cancel: CancellationToken,
23 : reconcilers: HashMap<NodeId, ReconcilerHandle>,
24 : }
25 :
26 : impl SafekeeperReconcilers {
27 0 : pub fn new(cancel: CancellationToken) -> Self {
28 0 : SafekeeperReconcilers {
29 0 : cancel,
30 0 : reconcilers: HashMap::new(),
31 0 : }
32 0 : }
33 0 : pub(crate) fn schedule_request_vec(
34 0 : &mut self,
35 0 : service: &Arc<Service>,
36 0 : reqs: Vec<ScheduleRequest>,
37 0 : ) {
38 0 : tracing::info!(
39 0 : "Scheduling {} pending safekeeper ops loaded from db",
40 0 : reqs.len()
41 : );
42 0 : for req in reqs {
43 0 : self.schedule_request(service, req);
44 0 : }
45 0 : }
46 0 : pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
47 0 : let node_id = req.safekeeper.get_id();
48 0 : let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
49 0 : SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
50 0 : });
51 0 : reconciler_handle.schedule_reconcile(req);
52 0 : }
53 0 : pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
54 0 : if let Some(handle) = self.reconcilers.remove(&node_id) {
55 0 : handle.cancel.cancel();
56 0 : }
57 0 : }
58 : /// Cancel ongoing reconciles for the given timeline
59 : ///
60 : /// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
61 : /// instead of doing this for all timelines of the tenant.
62 : ///
63 : /// Callers must remove the reconciles from the db manually
64 0 : pub(crate) fn cancel_reconciles_for_timeline(
65 0 : &mut self,
66 0 : node_id: NodeId,
67 0 : tenant_id: TenantId,
68 0 : timeline_id: Option<TimelineId>,
69 0 : ) {
70 0 : if let Some(handle) = self.reconcilers.get(&node_id) {
71 0 : handle.cancel_reconciliation(tenant_id, timeline_id);
72 0 : }
73 0 : }
74 : }
75 :
76 : /// Initial load of the pending operations from the db
77 0 : pub(crate) async fn load_schedule_requests(
78 0 : service: &Arc<Service>,
79 0 : safekeepers: &HashMap<NodeId, Safekeeper>,
80 0 : ) -> anyhow::Result<Vec<ScheduleRequest>> {
81 0 : let pending_ops = service.persistence.list_pending_ops().await?;
82 0 : let mut res = Vec::with_capacity(pending_ops.len());
83 0 : for op_persist in pending_ops {
84 0 : let node_id = NodeId(op_persist.sk_id as u64);
85 0 : let Some(sk) = safekeepers.get(&node_id) else {
86 : // This shouldn't happen, at least the safekeeper should exist as decomissioned.
87 0 : tracing::warn!(
88 : tenant_id = op_persist.tenant_id,
89 : timeline_id = op_persist.timeline_id,
90 0 : "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
91 : );
92 0 : continue;
93 : };
94 0 : let sk = Box::new(sk.clone());
95 0 : let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
96 0 : let timeline_id = if !op_persist.timeline_id.is_empty() {
97 0 : Some(TimelineId::from_str(&op_persist.timeline_id)?)
98 : } else {
99 0 : None
100 : };
101 0 : let host_list = match op_persist.op_kind {
102 0 : SafekeeperTimelineOpKind::Delete => Vec::new(),
103 0 : SafekeeperTimelineOpKind::Exclude => Vec::new(),
104 : SafekeeperTimelineOpKind::Pull => {
105 : // TODO this code is super hacky, it doesn't take migrations into account
106 0 : let Some(timeline_id) = timeline_id else {
107 0 : anyhow::bail!(
108 0 : "timeline_id is empty for `pull` schedule request for {tenant_id}"
109 0 : );
110 : };
111 0 : let timeline_persist = service
112 0 : .persistence
113 0 : .get_timeline(tenant_id, timeline_id)
114 0 : .await?;
115 0 : let Some(timeline_persist) = timeline_persist else {
116 : // This shouldn't happen, the timeline should still exist
117 0 : tracing::warn!(
118 : tenant_id = op_persist.tenant_id,
119 : timeline_id = op_persist.timeline_id,
120 0 : "couldn't find timeline for corresponding pull op"
121 : );
122 0 : continue;
123 : };
124 0 : timeline_persist
125 0 : .sk_set
126 0 : .iter()
127 0 : .filter_map(|sk_id| {
128 0 : let other_node_id = NodeId(*sk_id as u64);
129 0 : if node_id == other_node_id {
130 : // We obviously don't want to pull from ourselves
131 0 : return None;
132 0 : }
133 0 : let Some(sk) = safekeepers.get(&other_node_id) else {
134 0 : tracing::warn!(
135 0 : "couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
136 : );
137 0 : return None;
138 : };
139 0 : Some((other_node_id, sk.base_url()))
140 0 : })
141 0 : .collect::<Vec<_>>()
142 : }
143 : };
144 0 : let req = ScheduleRequest {
145 0 : safekeeper: sk,
146 0 : host_list,
147 0 : tenant_id,
148 0 : timeline_id,
149 0 : generation: op_persist.generation as u32,
150 0 : kind: op_persist.op_kind,
151 0 : };
152 0 : res.push(req);
153 : }
154 0 : Ok(res)
155 0 : }
156 :
157 : pub(crate) struct ScheduleRequest {
158 : pub(crate) safekeeper: Box<Safekeeper>,
159 : pub(crate) host_list: Vec<(NodeId, String)>,
160 : pub(crate) tenant_id: TenantId,
161 : pub(crate) timeline_id: Option<TimelineId>,
162 : pub(crate) generation: u32,
163 : pub(crate) kind: SafekeeperTimelineOpKind,
164 : }
165 :
166 : struct ReconcilerHandle {
167 : tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
168 : ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
169 : cancel: CancellationToken,
170 : }
171 :
172 : impl ReconcilerHandle {
173 : /// Obtain a new token slot, cancelling any existing reconciliations for that timeline
174 0 : fn new_token_slot(
175 0 : &self,
176 0 : tenant_id: TenantId,
177 0 : timeline_id: Option<TimelineId>,
178 0 : ) -> CancellationToken {
179 0 : let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
180 0 : if let Entry::Occupied(entry) = &entry {
181 0 : let cancel: &CancellationToken = entry.get();
182 0 : cancel.cancel();
183 0 : }
184 0 : entry.insert(self.cancel.child_token()).clone()
185 0 : }
186 : /// Cancel an ongoing reconciliation
187 0 : fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
188 0 : if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
189 0 : cancel.cancel();
190 0 : }
191 0 : }
192 0 : fn schedule_reconcile(&self, req: ScheduleRequest) {
193 0 : let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
194 0 : let hostname = req.safekeeper.skp.host.clone();
195 0 : if let Err(err) = self.tx.send((req, cancel)) {
196 0 : tracing::info!("scheduling request onto {hostname} returned error: {err}");
197 0 : }
198 0 : }
199 : }
200 :
201 : pub(crate) struct SafekeeperReconciler {
202 : service: Arc<Service>,
203 : rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
204 : cancel: CancellationToken,
205 : }
206 :
207 : impl SafekeeperReconciler {
208 0 : fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
209 0 : // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
210 0 : let (tx, rx) = mpsc::unbounded_channel();
211 0 : let mut reconciler = SafekeeperReconciler {
212 0 : service,
213 0 : rx,
214 0 : cancel: cancel.clone(),
215 0 : };
216 0 : let handle = ReconcilerHandle {
217 0 : tx,
218 0 : ongoing_tokens: Arc::new(ClashMap::new()),
219 0 : cancel,
220 0 : };
221 0 : tokio::spawn(async move { reconciler.run().await });
222 0 : handle
223 0 : }
224 0 : async fn run(&mut self) {
225 : loop {
226 : // TODO add parallelism with semaphore here
227 0 : let req = tokio::select! {
228 0 : req = self.rx.recv() => req,
229 0 : _ = self.cancel.cancelled() => break,
230 : };
231 0 : let Some((req, req_cancel)) = req else { break };
232 0 : if req_cancel.is_cancelled() {
233 0 : continue;
234 0 : }
235 0 :
236 0 : let kind = req.kind;
237 0 : let tenant_id = req.tenant_id;
238 0 : let timeline_id = req.timeline_id;
239 0 : let node_id = req.safekeeper.skp.id;
240 0 : self.reconcile_one(req, req_cancel)
241 0 : .instrument(tracing::info_span!(
242 0 : "reconcile_one",
243 0 : ?kind,
244 0 : %tenant_id,
245 0 : ?timeline_id,
246 0 : %node_id,
247 0 : ))
248 0 : .await;
249 : }
250 0 : }
251 0 : async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
252 0 : let req_host = req.safekeeper.skp.host.clone();
253 0 : match req.kind {
254 : SafekeeperTimelineOpKind::Pull => {
255 0 : let Some(timeline_id) = req.timeline_id else {
256 0 : tracing::warn!(
257 0 : "ignoring invalid schedule request: timeline_id is empty for `pull`"
258 : );
259 0 : return;
260 : };
261 0 : let our_id = req.safekeeper.get_id();
262 0 : let http_hosts = req
263 0 : .host_list
264 0 : .iter()
265 0 : .filter(|(node_id, _hostname)| *node_id != our_id)
266 0 : .map(|(_, hostname)| hostname.clone())
267 0 : .collect::<Vec<_>>();
268 0 : let pull_req = PullTimelineRequest {
269 0 : http_hosts,
270 0 : tenant_id: req.tenant_id,
271 0 : timeline_id,
272 0 : };
273 0 : self.reconcile_inner(
274 0 : req,
275 0 : async |client| client.pull_timeline(&pull_req).await,
276 0 : |resp| {
277 0 : tracing::info!(
278 0 : "pulled timeline from {} onto {req_host}",
279 : resp.safekeeper_host,
280 : );
281 0 : },
282 0 : req_cancel,
283 0 : )
284 0 : .await;
285 : }
286 : SafekeeperTimelineOpKind::Exclude => {
287 : // TODO actually exclude instead of delete here
288 0 : let tenant_id = req.tenant_id;
289 0 : let Some(timeline_id) = req.timeline_id else {
290 0 : tracing::warn!(
291 0 : "ignoring invalid schedule request: timeline_id is empty for `exclude`"
292 : );
293 0 : return;
294 : };
295 0 : self.reconcile_inner(
296 0 : req,
297 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
298 0 : |_resp| {
299 0 : tracing::info!("deleted timeline from {req_host}");
300 0 : },
301 0 : req_cancel,
302 0 : )
303 0 : .await;
304 : }
305 : SafekeeperTimelineOpKind::Delete => {
306 0 : let tenant_id = req.tenant_id;
307 0 : if let Some(timeline_id) = req.timeline_id {
308 0 : let deleted = self.reconcile_inner(
309 0 : req,
310 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
311 0 : |_resp| {
312 0 : tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
313 0 : },
314 0 : req_cancel,
315 0 : )
316 0 : .await;
317 0 : if deleted {
318 0 : self.delete_timeline_from_db(tenant_id, timeline_id).await;
319 0 : }
320 : } else {
321 0 : let deleted = self
322 0 : .reconcile_inner(
323 0 : req,
324 0 : async |client| client.delete_tenant(tenant_id).await,
325 0 : |_resp| {
326 0 : tracing::info!(%tenant_id, "deleted tenant from {req_host}");
327 0 : },
328 0 : req_cancel,
329 0 : )
330 0 : .await;
331 0 : if deleted {
332 0 : self.delete_tenant_timelines_from_db(tenant_id).await;
333 0 : }
334 : }
335 : }
336 : }
337 0 : }
338 0 : async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
339 0 : match self
340 0 : .service
341 0 : .persistence
342 0 : .list_pending_ops_for_timeline(tenant_id, timeline_id)
343 0 : .await
344 : {
345 0 : Ok(list) => {
346 0 : if !list.is_empty() {
347 0 : tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
348 0 : return;
349 0 : }
350 : }
351 0 : Err(e) => {
352 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
353 0 : return;
354 : }
355 : }
356 0 : tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
357 : // In theory we could crash right after deleting the op from the db and right before reaching this,
358 : // but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
359 0 : if let Err(err) = self
360 0 : .service
361 0 : .persistence
362 0 : .delete_timeline(tenant_id, timeline_id)
363 0 : .await
364 : {
365 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
366 0 : }
367 0 : }
368 0 : async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
369 0 : let timeline_list = match self
370 0 : .service
371 0 : .persistence
372 0 : .list_timelines_for_tenant(tenant_id)
373 0 : .await
374 : {
375 0 : Ok(timeline_list) => timeline_list,
376 0 : Err(e) => {
377 0 : tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
378 0 : return;
379 : }
380 : };
381 0 : for timeline in timeline_list {
382 0 : let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
383 0 : tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
384 0 : continue;
385 : };
386 0 : self.delete_timeline_from_db(tenant_id, timeline_id).await;
387 : }
388 0 : }
389 : /// Returns whether the reconciliation happened successfully
390 0 : async fn reconcile_inner<T, F, U>(
391 0 : &self,
392 0 : req: ScheduleRequest,
393 0 : closure: impl Fn(SafekeeperClient) -> F,
394 0 : log_success: impl FnOnce(T) -> U,
395 0 : req_cancel: CancellationToken,
396 0 : ) -> bool
397 0 : where
398 0 : F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
399 0 : {
400 0 : let jwt = self
401 0 : .service
402 0 : .config
403 0 : .safekeeper_jwt_token
404 0 : .clone()
405 0 : .map(SecretString::from);
406 : loop {
407 0 : let res = req
408 0 : .safekeeper
409 0 : .with_client_retries(
410 0 : |client| {
411 0 : let closure = &closure;
412 0 : async move { closure(client).await }
413 0 : },
414 0 : self.service.get_http_client(),
415 0 : &jwt,
416 0 : 3,
417 0 : 10,
418 0 : Duration::from_secs(10),
419 0 : &req_cancel,
420 0 : )
421 0 : .await;
422 0 : match res {
423 0 : Ok(resp) => {
424 0 : log_success(resp);
425 0 : let res = self
426 0 : .service
427 0 : .persistence
428 0 : .remove_pending_op(
429 0 : req.tenant_id,
430 0 : req.timeline_id,
431 0 : req.safekeeper.get_id(),
432 0 : req.generation,
433 0 : )
434 0 : .await;
435 0 : if let Err(err) = res {
436 0 : tracing::info!(
437 0 : "couldn't remove reconciliation request onto {} from persistence: {err:?}",
438 : req.safekeeper.skp.host
439 : );
440 0 : }
441 0 : return true;
442 : }
443 : Err(mgmt_api::Error::Cancelled) => {
444 : // On cancellation, the code that issued it will take care of removing db entries (if needed)
445 0 : return false;
446 : }
447 0 : Err(e) => {
448 0 : tracing::info!(
449 0 : "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
450 : req.safekeeper.skp.host
451 : );
452 : const SLEEP_TIME: Duration = Duration::from_secs(1);
453 0 : tokio::time::sleep(SLEEP_TIME).await;
454 : }
455 : }
456 : }
457 0 : }
458 : }
|