Line data Source code
1 : use super::storage_layer::LayerName;
2 : use super::storage_layer::ResidentLayer;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use std::collections::HashSet;
7 : use std::collections::{HashMap, VecDeque};
8 : use std::fmt::Debug;
9 :
10 : use chrono::NaiveDateTime;
11 : use std::sync::Arc;
12 : use tracing::info;
13 : use utils::lsn::AtomicLsn;
14 :
15 : use std::sync::atomic::AtomicU32;
16 : use utils::lsn::Lsn;
17 :
18 : use utils::generation::Generation;
19 :
20 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
21 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
22 : // that many upload queues in a running pageserver, and most of them are initialized
23 : // anyway.
24 : #[allow(clippy::large_enum_variant)]
25 : pub(super) enum UploadQueue {
26 : Uninitialized,
27 : Initialized(UploadQueueInitialized),
28 : Stopped(UploadQueueStopped),
29 : }
30 :
31 : impl UploadQueue {
32 0 : pub fn as_str(&self) -> &'static str {
33 0 : match self {
34 0 : UploadQueue::Uninitialized => "Uninitialized",
35 0 : UploadQueue::Initialized(_) => "Initialized",
36 0 : UploadQueue::Stopped(_) => "Stopped",
37 : }
38 0 : }
39 : }
40 :
41 : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
42 : pub(crate) enum OpType {
43 : MayReorder,
44 : FlushDeletion,
45 : }
46 :
47 : /// This keeps track of queued and in-progress tasks.
48 : pub(crate) struct UploadQueueInitialized {
49 : /// Counter to assign task IDs
50 : pub(crate) task_counter: u64,
51 :
52 : /// The next uploaded index_part.json; assumed to be dirty.
53 : ///
54 : /// Should not be read, directly except for layer file updates. Instead you should add a
55 : /// projected field.
56 : pub(crate) dirty: IndexPart,
57 :
58 : /// The latest remote persisted IndexPart.
59 : ///
60 : /// Each completed metadata upload will update this. The second item is the task_id which last
61 : /// updated the value, used to ensure we never store an older value over a newer one.
62 : pub(crate) clean: (IndexPart, Option<u64>),
63 :
64 : /// How many file uploads or deletions been scheduled, since the
65 : /// last (scheduling of) metadata index upload?
66 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
67 :
68 : /// The Lsn is only updated after our generation has been validated with
69 : /// the control plane (unlesss a timeline's generation is None, in which case
70 : /// we skip validation)
71 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
72 :
73 : // Breakdown of different kinds of tasks currently in-progress
74 : pub(crate) num_inprogress_layer_uploads: usize,
75 : pub(crate) num_inprogress_metadata_uploads: usize,
76 : pub(crate) num_inprogress_deletions: usize,
77 :
78 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
79 : /// has been launched for it. An in-progress task can be busy uploading, but it can
80 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
81 : /// be waiting for retry in `exponential_backoff`.
82 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
83 :
84 : /// Queued operations that have not been launched yet. They might depend on previous
85 : /// tasks to finish. For example, metadata upload cannot be performed before all
86 : /// preceding layer file uploads have completed.
87 : pub(crate) queued_operations: VecDeque<UploadOp>,
88 :
89 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
90 : /// for error logging.
91 : ///
92 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
93 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
94 : #[cfg(feature = "testing")]
95 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
96 :
97 : /// Ensure we order file operations correctly.
98 : pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
99 :
100 : /// Deletions that are blocked by the tenant configuration
101 : pub(crate) blocked_deletions: Vec<Delete>,
102 :
103 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
104 : pub(crate) shutting_down: bool,
105 :
106 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
107 : /// wait on until one of them stops the queue. The semaphore is closed when
108 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
109 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
110 : }
111 :
112 : impl UploadQueueInitialized {
113 8 : pub(super) fn no_pending_work(&self) -> bool {
114 8 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
115 8 : }
116 :
117 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
118 0 : self.visible_remote_consistent_lsn.load()
119 0 : }
120 :
121 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
122 0 : let lsn = self.clean.0.metadata.disk_consistent_lsn();
123 0 : self.clean.1.map(|_| lsn)
124 0 : }
125 : }
126 :
127 : #[derive(Clone, Copy)]
128 : pub(super) enum SetDeletedFlagProgress {
129 : NotRunning,
130 : InProgress(NaiveDateTime),
131 : Successful(NaiveDateTime),
132 : }
133 :
134 : pub(super) struct UploadQueueStoppedDeletable {
135 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
136 : pub(super) deleted_at: SetDeletedFlagProgress,
137 : }
138 :
139 : pub(super) enum UploadQueueStopped {
140 : Deletable(UploadQueueStoppedDeletable),
141 : Uninitialized,
142 : }
143 :
144 0 : #[derive(thiserror::Error, Debug)]
145 : pub enum NotInitialized {
146 : #[error("queue is in state Uninitialized")]
147 : Uninitialized,
148 : #[error("queue is in state Stopped")]
149 : Stopped,
150 : #[error("queue is shutting down")]
151 : ShuttingDown,
152 : }
153 :
154 : impl NotInitialized {
155 0 : pub(crate) fn is_stopping(&self) -> bool {
156 : use NotInitialized::*;
157 0 : match self {
158 0 : Uninitialized => false,
159 0 : Stopped => true,
160 0 : ShuttingDown => true,
161 : }
162 0 : }
163 : }
164 :
165 : impl UploadQueue {
166 412 : pub(crate) fn initialize_empty_remote(
167 412 : &mut self,
168 412 : metadata: &TimelineMetadata,
169 412 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
170 412 : match self {
171 412 : UploadQueue::Uninitialized => (),
172 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
173 0 : anyhow::bail!("already initialized, state {}", self.as_str())
174 : }
175 : }
176 :
177 412 : info!("initializing upload queue for empty remote");
178 :
179 412 : let index_part = IndexPart::empty(metadata.clone());
180 412 :
181 412 : let state = UploadQueueInitialized {
182 412 : dirty: index_part.clone(),
183 412 : clean: (index_part, None),
184 412 : latest_files_changes_since_metadata_upload_scheduled: 0,
185 412 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
186 412 : // what follows are boring default initializations
187 412 : task_counter: 0,
188 412 : num_inprogress_layer_uploads: 0,
189 412 : num_inprogress_metadata_uploads: 0,
190 412 : num_inprogress_deletions: 0,
191 412 : inprogress_tasks: HashMap::new(),
192 412 : queued_operations: VecDeque::new(),
193 412 : #[cfg(feature = "testing")]
194 412 : dangling_files: HashMap::new(),
195 412 : recently_deleted: HashSet::new(),
196 412 : blocked_deletions: Vec::new(),
197 412 : shutting_down: false,
198 412 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
199 412 : };
200 412 :
201 412 : *self = UploadQueue::Initialized(state);
202 412 : Ok(self.initialized_mut().expect("we just set it"))
203 412 : }
204 :
205 6 : pub(crate) fn initialize_with_current_remote_index_part(
206 6 : &mut self,
207 6 : index_part: &IndexPart,
208 6 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
209 6 : match self {
210 6 : UploadQueue::Uninitialized => (),
211 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
212 0 : anyhow::bail!("already initialized, state {}", self.as_str())
213 : }
214 : }
215 :
216 6 : info!(
217 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
218 0 : index_part.metadata.disk_consistent_lsn()
219 : );
220 :
221 6 : let state = UploadQueueInitialized {
222 6 : dirty: index_part.clone(),
223 6 : clean: (index_part.clone(), None),
224 6 : latest_files_changes_since_metadata_upload_scheduled: 0,
225 6 : visible_remote_consistent_lsn: Arc::new(
226 6 : index_part.metadata.disk_consistent_lsn().into(),
227 6 : ),
228 6 : // what follows are boring default initializations
229 6 : task_counter: 0,
230 6 : num_inprogress_layer_uploads: 0,
231 6 : num_inprogress_metadata_uploads: 0,
232 6 : num_inprogress_deletions: 0,
233 6 : inprogress_tasks: HashMap::new(),
234 6 : queued_operations: VecDeque::new(),
235 6 : #[cfg(feature = "testing")]
236 6 : dangling_files: HashMap::new(),
237 6 : recently_deleted: HashSet::new(),
238 6 : blocked_deletions: Vec::new(),
239 6 : shutting_down: false,
240 6 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
241 6 : };
242 6 :
243 6 : *self = UploadQueue::Initialized(state);
244 6 : Ok(self.initialized_mut().expect("we just set it"))
245 6 : }
246 :
247 5257 : pub(crate) fn initialized_mut(
248 5257 : &mut self,
249 5257 : ) -> Result<&mut UploadQueueInitialized, NotInitialized> {
250 : use UploadQueue::*;
251 5257 : match self {
252 0 : Uninitialized => Err(NotInitialized::Uninitialized),
253 5257 : Initialized(x) => {
254 5257 : if x.shutting_down {
255 0 : Err(NotInitialized::ShuttingDown)
256 : } else {
257 5257 : Ok(x)
258 : }
259 : }
260 0 : Stopped(_) => Err(NotInitialized::Stopped),
261 : }
262 5257 : }
263 :
264 2 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
265 2 : match self {
266 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
267 0 : anyhow::bail!("queue is in state {}", self.as_str())
268 : }
269 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
270 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
271 : }
272 2 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
273 : }
274 2 : }
275 : }
276 :
277 : /// An in-progress upload or delete task.
278 : #[derive(Debug)]
279 : pub(crate) struct UploadTask {
280 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
281 : pub(crate) task_id: u64,
282 : pub(crate) retries: AtomicU32,
283 :
284 : pub(crate) op: UploadOp,
285 : }
286 :
287 : /// A deletion of some layers within the lifetime of a timeline. This is not used
288 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
289 : #[derive(Debug, Clone)]
290 : pub(crate) struct Delete {
291 : pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
292 : }
293 :
294 : #[derive(Debug)]
295 : pub(crate) enum UploadOp {
296 : /// Upload a layer file. The last field indicates the last operation for thie file.
297 : UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
298 :
299 : /// Upload a index_part.json file
300 : UploadMetadata {
301 : /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
302 : uploaded: Box<IndexPart>,
303 : },
304 :
305 : /// Delete layer files
306 : Delete(Delete),
307 :
308 : /// Barrier. When the barrier operation is reached, the channel is closed.
309 : Barrier(tokio::sync::watch::Sender<()>),
310 :
311 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
312 : /// this is the same as a Barrier.
313 : Shutdown,
314 : }
315 :
316 : impl std::fmt::Display for UploadOp {
317 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
318 0 : match self {
319 0 : UploadOp::UploadLayer(layer, metadata, mode) => {
320 0 : write!(
321 0 : f,
322 0 : "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
323 0 : layer, metadata.file_size, metadata.generation, mode
324 0 : )
325 : }
326 0 : UploadOp::UploadMetadata { uploaded, .. } => {
327 0 : write!(
328 0 : f,
329 0 : "UploadMetadata(lsn: {})",
330 0 : uploaded.metadata.disk_consistent_lsn()
331 0 : )
332 : }
333 0 : UploadOp::Delete(delete) => {
334 0 : write!(f, "Delete({} layers)", delete.layers.len())
335 : }
336 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
337 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
338 : }
339 0 : }
340 : }
|