Line data Source code
1 : use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
2 :
3 : use clashmap::{ClashMap, Entry};
4 : use safekeeper_api::models::PullTimelineRequest;
5 : use safekeeper_client::mgmt_api;
6 : use tokio::sync::{
7 : Semaphore,
8 : mpsc::{self, UnboundedReceiver, UnboundedSender},
9 : };
10 : use tokio_util::sync::CancellationToken;
11 : use tracing::Instrument;
12 : use utils::{
13 : id::{NodeId, TenantId, TimelineId},
14 : logging::SecretString,
15 : };
16 :
17 : use crate::{
18 : persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
19 : safekeeper_client::SafekeeperClient,
20 : };
21 :
22 : use super::Service;
23 :
24 : pub(crate) struct SafekeeperReconcilers {
25 : cancel: CancellationToken,
26 : reconcilers: HashMap<NodeId, ReconcilerHandle>,
27 : }
28 :
29 : impl SafekeeperReconcilers {
30 0 : pub fn new(cancel: CancellationToken) -> Self {
31 0 : SafekeeperReconcilers {
32 0 : cancel,
33 0 : reconcilers: HashMap::new(),
34 0 : }
35 0 : }
36 : /// Adds a safekeeper-specific reconciler.
37 : /// Can be called multiple times, but it needs to be called at least once
38 : /// for every new safekeeper added.
39 0 : pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc<Service>) {
40 0 : self.reconcilers.entry(node_id).or_insert_with(|| {
41 0 : SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
42 0 : });
43 0 : }
44 : /// Stop a safekeeper-specific reconciler.
45 : /// Stops the reconciler, cancelling all ongoing tasks.
46 0 : pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) {
47 0 : if let Some(handle) = self.reconcilers.remove(&node_id) {
48 0 : handle.cancel.cancel();
49 0 : }
50 0 : }
51 0 : pub(crate) fn schedule_request_vec(&self, reqs: Vec<ScheduleRequest>) {
52 0 : tracing::info!(
53 0 : "Scheduling {} pending safekeeper ops loaded from db",
54 0 : reqs.len()
55 : );
56 0 : for req in reqs {
57 0 : self.schedule_request(req);
58 0 : }
59 0 : }
60 0 : pub(crate) fn schedule_request(&self, req: ScheduleRequest) {
61 0 : let node_id = req.safekeeper.get_id();
62 0 : let reconciler_handle = self.reconcilers.get(&node_id).unwrap();
63 0 : reconciler_handle.schedule_reconcile(req);
64 0 : }
65 : /// Cancel ongoing reconciles for the given timeline
66 : ///
67 : /// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
68 : /// instead of doing this for all timelines of the tenant.
69 : ///
70 : /// Callers must remove the reconciles from the db manually
71 0 : pub(crate) fn cancel_reconciles_for_timeline(
72 0 : &mut self,
73 0 : node_id: NodeId,
74 0 : tenant_id: TenantId,
75 0 : timeline_id: Option<TimelineId>,
76 0 : ) {
77 0 : if let Some(handle) = self.reconcilers.get(&node_id) {
78 0 : handle.cancel_reconciliation(tenant_id, timeline_id);
79 0 : }
80 0 : }
81 : }
82 :
83 : /// Initial load of the pending operations from the db
84 0 : pub(crate) async fn load_schedule_requests(
85 0 : service: &Arc<Service>,
86 0 : safekeepers: &HashMap<NodeId, Safekeeper>,
87 0 : ) -> anyhow::Result<Vec<ScheduleRequest>> {
88 0 : let pending_ops_timelines = service
89 0 : .persistence
90 0 : .list_pending_ops_with_timelines()
91 0 : .await?;
92 0 : let mut res = Vec::with_capacity(pending_ops_timelines.len());
93 0 : for (op_persist, timeline_persist) in pending_ops_timelines {
94 0 : let node_id = NodeId(op_persist.sk_id as u64);
95 0 : let Some(sk) = safekeepers.get(&node_id) else {
96 : // This shouldn't happen, at least the safekeeper should exist as decomissioned.
97 0 : tracing::warn!(
98 : tenant_id = op_persist.tenant_id,
99 : timeline_id = op_persist.timeline_id,
100 0 : "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
101 : );
102 0 : continue;
103 : };
104 0 : let sk = Box::new(sk.clone());
105 0 : let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
106 0 : let timeline_id = if !op_persist.timeline_id.is_empty() {
107 0 : Some(TimelineId::from_str(&op_persist.timeline_id)?)
108 : } else {
109 0 : None
110 : };
111 0 : let host_list = match op_persist.op_kind {
112 0 : SafekeeperTimelineOpKind::Delete => Vec::new(),
113 0 : SafekeeperTimelineOpKind::Exclude => Vec::new(),
114 : SafekeeperTimelineOpKind::Pull => {
115 0 : if timeline_id.is_none() {
116 : // We only do this extra check (outside of timeline_persist check) to give better error msgs
117 0 : anyhow::bail!(
118 0 : "timeline_id is empty for `pull` schedule request for {tenant_id}"
119 0 : );
120 0 : };
121 0 : let Some(timeline_persist) = timeline_persist else {
122 : // This shouldn't happen, the timeline should still exist
123 0 : tracing::warn!(
124 : tenant_id = op_persist.tenant_id,
125 : timeline_id = op_persist.timeline_id,
126 0 : "couldn't find timeline for corresponding pull op"
127 : );
128 0 : continue;
129 : };
130 0 : timeline_persist
131 0 : .sk_set
132 0 : .iter()
133 0 : .filter_map(|sk_id| {
134 0 : let other_node_id = NodeId(*sk_id as u64);
135 0 : if node_id == other_node_id {
136 : // We obviously don't want to pull from ourselves
137 0 : return None;
138 0 : }
139 0 : let Some(sk) = safekeepers.get(&other_node_id) else {
140 0 : tracing::warn!(
141 0 : "couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
142 : );
143 0 : return None;
144 : };
145 0 : Some((other_node_id, sk.base_url()))
146 0 : })
147 0 : .collect::<Vec<_>>()
148 : }
149 : };
150 0 : let req = ScheduleRequest {
151 0 : safekeeper: sk,
152 0 : host_list,
153 0 : tenant_id,
154 0 : timeline_id,
155 0 : generation: op_persist.generation as u32,
156 0 : kind: op_persist.op_kind,
157 0 : };
158 0 : res.push(req);
159 : }
160 0 : Ok(res)
161 0 : }
162 :
163 : pub(crate) struct ScheduleRequest {
164 : pub(crate) safekeeper: Box<Safekeeper>,
165 : pub(crate) host_list: Vec<(NodeId, String)>,
166 : pub(crate) tenant_id: TenantId,
167 : pub(crate) timeline_id: Option<TimelineId>,
168 : pub(crate) generation: u32,
169 : pub(crate) kind: SafekeeperTimelineOpKind,
170 : }
171 :
172 : /// Handle to per safekeeper reconciler.
173 : struct ReconcilerHandle {
174 : tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
175 : ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
176 : cancel: CancellationToken,
177 : }
178 :
179 : impl ReconcilerHandle {
180 : /// Obtain a new token slot, cancelling any existing reconciliations for
181 : /// that timeline. It is not useful to have >1 operation per <tenant_id,
182 : /// timeline_id, safekeeper>, hence scheduling op cancels current one if it
183 : /// exists.
184 0 : fn new_token_slot(
185 0 : &self,
186 0 : tenant_id: TenantId,
187 0 : timeline_id: Option<TimelineId>,
188 0 : ) -> CancellationToken {
189 0 : let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
190 0 : if let Entry::Occupied(entry) = &entry {
191 0 : let cancel: &CancellationToken = entry.get();
192 0 : cancel.cancel();
193 0 : }
194 0 : entry.insert(self.cancel.child_token()).clone()
195 0 : }
196 : /// Cancel an ongoing reconciliation
197 0 : fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
198 0 : if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
199 0 : cancel.cancel();
200 0 : }
201 0 : }
202 0 : fn schedule_reconcile(&self, req: ScheduleRequest) {
203 0 : let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
204 0 : let hostname = req.safekeeper.skp.host.clone();
205 0 : if let Err(err) = self.tx.send((req, cancel)) {
206 0 : tracing::info!("scheduling request onto {hostname} returned error: {err}");
207 0 : }
208 0 : }
209 : }
210 :
211 : pub(crate) struct SafekeeperReconciler {
212 : inner: SafekeeperReconcilerInner,
213 : concurrency_limiter: Arc<Semaphore>,
214 : rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
215 : cancel: CancellationToken,
216 : }
217 :
218 : /// Thin wrapper over `Service` to not clutter its inherent functions
219 : #[derive(Clone)]
220 : struct SafekeeperReconcilerInner {
221 : service: Arc<Service>,
222 : }
223 :
224 : impl SafekeeperReconciler {
225 0 : fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
226 0 : // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
227 0 : let (tx, rx) = mpsc::unbounded_channel();
228 0 : let concurrency = service.config.safekeeper_reconciler_concurrency;
229 0 : let mut reconciler = SafekeeperReconciler {
230 0 : inner: SafekeeperReconcilerInner { service },
231 0 : rx,
232 0 : concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
233 0 : cancel: cancel.clone(),
234 0 : };
235 0 : let handle = ReconcilerHandle {
236 0 : tx,
237 0 : ongoing_tokens: Arc::new(ClashMap::new()),
238 0 : cancel,
239 0 : };
240 0 : tokio::spawn(async move { reconciler.run().await });
241 0 : handle
242 0 : }
243 0 : async fn run(&mut self) {
244 : loop {
245 0 : let req = tokio::select! {
246 0 : req = self.rx.recv() => req,
247 0 : _ = self.cancel.cancelled() => break,
248 : };
249 0 : let Some((req, req_cancel)) = req else { break };
250 :
251 0 : let permit_res = tokio::select! {
252 0 : req = self.concurrency_limiter.clone().acquire_owned() => req,
253 0 : _ = self.cancel.cancelled() => break,
254 : };
255 0 : let Ok(_permit) = permit_res else { return };
256 :
257 0 : let inner = self.inner.clone();
258 0 : if req_cancel.is_cancelled() {
259 0 : continue;
260 0 : }
261 0 :
262 0 : tokio::task::spawn(async move {
263 0 : let kind = req.kind;
264 0 : let tenant_id = req.tenant_id;
265 0 : let timeline_id = req.timeline_id;
266 0 : let node_id = req.safekeeper.skp.id;
267 0 : inner
268 0 : .reconcile_one(req, req_cancel)
269 0 : .instrument(tracing::info_span!(
270 0 : "reconcile_one",
271 0 : ?kind,
272 0 : %tenant_id,
273 0 : ?timeline_id,
274 0 : %node_id,
275 0 : ))
276 0 : .await;
277 0 : });
278 : }
279 0 : }
280 : }
281 :
282 : impl SafekeeperReconcilerInner {
283 0 : async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
284 0 : let req_host = req.safekeeper.skp.host.clone();
285 0 : match req.kind {
286 : SafekeeperTimelineOpKind::Pull => {
287 0 : let Some(timeline_id) = req.timeline_id else {
288 0 : tracing::warn!(
289 0 : "ignoring invalid schedule request: timeline_id is empty for `pull`"
290 : );
291 0 : return;
292 : };
293 0 : let our_id = req.safekeeper.get_id();
294 0 : let http_hosts = req
295 0 : .host_list
296 0 : .iter()
297 0 : .filter(|(node_id, _hostname)| *node_id != our_id)
298 0 : .map(|(_, hostname)| hostname.clone())
299 0 : .collect::<Vec<_>>();
300 0 : let pull_req = PullTimelineRequest {
301 0 : http_hosts,
302 0 : tenant_id: req.tenant_id,
303 0 : timeline_id,
304 0 : };
305 0 : self.reconcile_inner(
306 0 : req,
307 0 : async |client| client.pull_timeline(&pull_req).await,
308 0 : |resp| {
309 0 : if let Some(host) = resp.safekeeper_host {
310 0 : tracing::info!("pulled timeline from {host} onto {req_host}");
311 : } else {
312 0 : tracing::info!("timeline already present on safekeeper on {req_host}");
313 : }
314 0 : },
315 0 : req_cancel,
316 0 : )
317 0 : .await;
318 : }
319 : SafekeeperTimelineOpKind::Exclude => {
320 : // TODO actually exclude instead of delete here
321 0 : let tenant_id = req.tenant_id;
322 0 : let Some(timeline_id) = req.timeline_id else {
323 0 : tracing::warn!(
324 0 : "ignoring invalid schedule request: timeline_id is empty for `exclude`"
325 : );
326 0 : return;
327 : };
328 0 : self.reconcile_inner(
329 0 : req,
330 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
331 0 : |_resp| {
332 0 : tracing::info!("deleted timeline from {req_host}");
333 0 : },
334 0 : req_cancel,
335 0 : )
336 0 : .await;
337 : }
338 : SafekeeperTimelineOpKind::Delete => {
339 0 : let tenant_id = req.tenant_id;
340 0 : if let Some(timeline_id) = req.timeline_id {
341 0 : let deleted = self
342 0 : .reconcile_inner(
343 0 : req,
344 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
345 0 : |_resp| {
346 0 : tracing::info!("deleted timeline from {req_host}");
347 0 : },
348 0 : req_cancel,
349 0 : )
350 0 : .await;
351 0 : if deleted {
352 0 : self.delete_timeline_from_db(tenant_id, timeline_id).await;
353 0 : }
354 : } else {
355 0 : let deleted = self
356 0 : .reconcile_inner(
357 0 : req,
358 0 : async |client| client.delete_tenant(tenant_id).await,
359 0 : |_resp| {
360 0 : tracing::info!(%tenant_id, "deleted tenant from {req_host}");
361 0 : },
362 0 : req_cancel,
363 0 : )
364 0 : .await;
365 0 : if deleted {
366 0 : self.delete_tenant_timelines_from_db(tenant_id).await;
367 0 : }
368 : }
369 : }
370 : }
371 0 : }
372 0 : async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
373 0 : match self
374 0 : .service
375 0 : .persistence
376 0 : .list_pending_ops_for_timeline(tenant_id, timeline_id)
377 0 : .await
378 : {
379 0 : Ok(list) => {
380 0 : if !list.is_empty() {
381 : // duplicate the timeline_id here because it might be None in the reconcile context
382 0 : tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
383 0 : return;
384 0 : }
385 : }
386 0 : Err(e) => {
387 0 : tracing::warn!(%timeline_id, "couldn't query pending ops: {e}");
388 0 : return;
389 : }
390 : }
391 0 : tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
392 : // In theory we could crash right after deleting the op from the db and right before reaching this,
393 : // but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
394 0 : if let Err(err) = self
395 0 : .service
396 0 : .persistence
397 0 : .delete_timeline(tenant_id, timeline_id)
398 0 : .await
399 : {
400 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
401 0 : }
402 0 : }
403 0 : async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
404 0 : let timeline_list = match self
405 0 : .service
406 0 : .persistence
407 0 : .list_timelines_for_tenant(tenant_id)
408 0 : .await
409 : {
410 0 : Ok(timeline_list) => timeline_list,
411 0 : Err(e) => {
412 0 : tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
413 0 : return;
414 : }
415 : };
416 0 : for timeline in timeline_list {
417 0 : let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
418 0 : tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
419 0 : continue;
420 : };
421 0 : self.delete_timeline_from_db(tenant_id, timeline_id).await;
422 : }
423 0 : }
424 : /// Returns whether the reconciliation happened successfully
425 0 : async fn reconcile_inner<T, F, U>(
426 0 : &self,
427 0 : req: ScheduleRequest,
428 0 : closure: impl Fn(SafekeeperClient) -> F,
429 0 : log_success: impl FnOnce(T) -> U,
430 0 : req_cancel: CancellationToken,
431 0 : ) -> bool
432 0 : where
433 0 : F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
434 0 : {
435 0 : let jwt = self
436 0 : .service
437 0 : .config
438 0 : .safekeeper_jwt_token
439 0 : .clone()
440 0 : .map(SecretString::from);
441 : loop {
442 0 : let res = req
443 0 : .safekeeper
444 0 : .with_client_retries(
445 0 : |client| {
446 0 : let closure = &closure;
447 0 : async move { closure(client).await }
448 0 : },
449 0 : self.service.get_http_client(),
450 0 : &jwt,
451 0 : 3,
452 0 : 10,
453 0 : Duration::from_secs(10),
454 0 : &req_cancel,
455 0 : )
456 0 : .await;
457 0 : match res {
458 0 : Ok(resp) => {
459 0 : log_success(resp);
460 0 : let res = self
461 0 : .service
462 0 : .persistence
463 0 : .remove_pending_op(
464 0 : req.tenant_id,
465 0 : req.timeline_id,
466 0 : req.safekeeper.get_id(),
467 0 : req.generation,
468 0 : )
469 0 : .await;
470 0 : if let Err(err) = res {
471 0 : tracing::info!(
472 0 : "couldn't remove reconciliation request onto {} from persistence: {err:?}",
473 : req.safekeeper.skp.host
474 : );
475 0 : }
476 0 : return true;
477 : }
478 : Err(mgmt_api::Error::Cancelled) => {
479 : // On cancellation, the code that issued it will take care of removing db entries (if needed)
480 0 : return false;
481 : }
482 0 : Err(e) => {
483 0 : tracing::info!(
484 0 : "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
485 : req.safekeeper.skp.host
486 : );
487 : const SLEEP_TIME: Duration = Duration::from_secs(1);
488 0 : tokio::time::sleep(SLEEP_TIME).await;
489 : }
490 : }
491 : }
492 0 : }
493 : }
|