Line data Source code
1 : use super::storage_layer::LayerName;
2 : use super::storage_layer::ResidentLayer;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use std::collections::{HashMap, VecDeque};
7 : use std::fmt::Debug;
8 :
9 : use chrono::NaiveDateTime;
10 : use std::sync::Arc;
11 : use tracing::info;
12 : use utils::lsn::AtomicLsn;
13 :
14 : use std::sync::atomic::AtomicU32;
15 : use utils::lsn::Lsn;
16 :
17 : #[cfg(feature = "testing")]
18 : use utils::generation::Generation;
19 :
20 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
21 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
22 : // that many upload queues in a running pageserver, and most of them are initialized
23 : // anyway.
24 : #[allow(clippy::large_enum_variant)]
25 : pub(super) enum UploadQueue {
26 : Uninitialized,
27 : Initialized(UploadQueueInitialized),
28 : Stopped(UploadQueueStopped),
29 : }
30 :
31 : impl UploadQueue {
32 0 : pub fn as_str(&self) -> &'static str {
33 0 : match self {
34 0 : UploadQueue::Uninitialized => "Uninitialized",
35 0 : UploadQueue::Initialized(_) => "Initialized",
36 0 : UploadQueue::Stopped(_) => "Stopped",
37 : }
38 0 : }
39 : }
40 :
41 : /// This keeps track of queued and in-progress tasks.
42 : pub(crate) struct UploadQueueInitialized {
43 : /// Counter to assign task IDs
44 : pub(crate) task_counter: u64,
45 :
46 : /// The next uploaded index_part.json; assumed to be dirty.
47 : ///
48 : /// Should not be read, directly except for layer file updates. Instead you should add a
49 : /// projected field.
50 : pub(crate) dirty: IndexPart,
51 :
52 : /// The latest remote persisted IndexPart.
53 : ///
54 : /// Each completed metadata upload will update this. The second item is the task_id which last
55 : /// updated the value, used to ensure we never store an older value over a newer one.
56 : pub(crate) clean: (IndexPart, Option<u64>),
57 :
58 : /// How many file uploads or deletions been scheduled, since the
59 : /// last (scheduling of) metadata index upload?
60 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
61 :
62 : /// The Lsn is only updated after our generation has been validated with
63 : /// the control plane (unlesss a timeline's generation is None, in which case
64 : /// we skip validation)
65 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
66 :
67 : // Breakdown of different kinds of tasks currently in-progress
68 : pub(crate) num_inprogress_layer_uploads: usize,
69 : pub(crate) num_inprogress_metadata_uploads: usize,
70 : pub(crate) num_inprogress_deletions: usize,
71 :
72 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
73 : /// has been launched for it. An in-progress task can be busy uploading, but it can
74 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
75 : /// be waiting for retry in `exponential_backoff`.
76 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
77 :
78 : /// Queued operations that have not been launched yet. They might depend on previous
79 : /// tasks to finish. For example, metadata upload cannot be performed before all
80 : /// preceding layer file uploads have completed.
81 : pub(crate) queued_operations: VecDeque<UploadOp>,
82 :
83 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
84 : /// for error logging.
85 : ///
86 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
87 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
88 : #[cfg(feature = "testing")]
89 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
90 :
91 : /// Deletions that are blocked by the tenant configuration
92 : pub(crate) blocked_deletions: Vec<Delete>,
93 :
94 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
95 : pub(crate) shutting_down: bool,
96 :
97 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
98 : /// wait on until one of them stops the queue. The semaphore is closed when
99 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
100 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
101 : }
102 :
103 : impl UploadQueueInitialized {
104 8 : pub(super) fn no_pending_work(&self) -> bool {
105 8 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
106 8 : }
107 :
108 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
109 0 : self.visible_remote_consistent_lsn.load()
110 0 : }
111 :
112 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
113 0 : let lsn = self.clean.0.metadata.disk_consistent_lsn();
114 0 : self.clean.1.map(|_| lsn)
115 0 : }
116 : }
117 :
118 : #[derive(Clone, Copy)]
119 : pub(super) enum SetDeletedFlagProgress {
120 : NotRunning,
121 : InProgress(NaiveDateTime),
122 : Successful(NaiveDateTime),
123 : }
124 :
125 : pub(super) struct UploadQueueStoppedDeletable {
126 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
127 : pub(super) deleted_at: SetDeletedFlagProgress,
128 : }
129 :
130 : pub(super) enum UploadQueueStopped {
131 : Deletable(UploadQueueStoppedDeletable),
132 : Uninitialized,
133 : }
134 :
135 0 : #[derive(thiserror::Error, Debug)]
136 : pub enum NotInitialized {
137 : #[error("queue is in state Uninitialized")]
138 : Uninitialized,
139 : #[error("queue is in state Stopped")]
140 : Stopped,
141 : #[error("queue is shutting down")]
142 : ShuttingDown,
143 : }
144 :
145 : impl NotInitialized {
146 0 : pub(crate) fn is_stopping(&self) -> bool {
147 : use NotInitialized::*;
148 0 : match self {
149 0 : Uninitialized => false,
150 0 : Stopped => true,
151 0 : ShuttingDown => true,
152 : }
153 0 : }
154 : }
155 :
156 : impl UploadQueue {
157 412 : pub(crate) fn initialize_empty_remote(
158 412 : &mut self,
159 412 : metadata: &TimelineMetadata,
160 412 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
161 412 : match self {
162 412 : UploadQueue::Uninitialized => (),
163 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
164 0 : anyhow::bail!("already initialized, state {}", self.as_str())
165 : }
166 : }
167 :
168 412 : info!("initializing upload queue for empty remote");
169 :
170 412 : let index_part = IndexPart::empty(metadata.clone());
171 412 :
172 412 : let state = UploadQueueInitialized {
173 412 : dirty: index_part.clone(),
174 412 : clean: (index_part, None),
175 412 : latest_files_changes_since_metadata_upload_scheduled: 0,
176 412 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
177 412 : // what follows are boring default initializations
178 412 : task_counter: 0,
179 412 : num_inprogress_layer_uploads: 0,
180 412 : num_inprogress_metadata_uploads: 0,
181 412 : num_inprogress_deletions: 0,
182 412 : inprogress_tasks: HashMap::new(),
183 412 : queued_operations: VecDeque::new(),
184 412 : #[cfg(feature = "testing")]
185 412 : dangling_files: HashMap::new(),
186 412 : blocked_deletions: Vec::new(),
187 412 : shutting_down: false,
188 412 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
189 412 : };
190 412 :
191 412 : *self = UploadQueue::Initialized(state);
192 412 : Ok(self.initialized_mut().expect("we just set it"))
193 412 : }
194 :
195 6 : pub(crate) fn initialize_with_current_remote_index_part(
196 6 : &mut self,
197 6 : index_part: &IndexPart,
198 6 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
199 6 : match self {
200 6 : UploadQueue::Uninitialized => (),
201 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
202 0 : anyhow::bail!("already initialized, state {}", self.as_str())
203 : }
204 : }
205 :
206 6 : info!(
207 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
208 0 : index_part.metadata.disk_consistent_lsn()
209 : );
210 :
211 6 : let state = UploadQueueInitialized {
212 6 : dirty: index_part.clone(),
213 6 : clean: (index_part.clone(), None),
214 6 : latest_files_changes_since_metadata_upload_scheduled: 0,
215 6 : visible_remote_consistent_lsn: Arc::new(
216 6 : index_part.metadata.disk_consistent_lsn().into(),
217 6 : ),
218 6 : // what follows are boring default initializations
219 6 : task_counter: 0,
220 6 : num_inprogress_layer_uploads: 0,
221 6 : num_inprogress_metadata_uploads: 0,
222 6 : num_inprogress_deletions: 0,
223 6 : inprogress_tasks: HashMap::new(),
224 6 : queued_operations: VecDeque::new(),
225 6 : #[cfg(feature = "testing")]
226 6 : dangling_files: HashMap::new(),
227 6 : blocked_deletions: Vec::new(),
228 6 : shutting_down: false,
229 6 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
230 6 : };
231 6 :
232 6 : *self = UploadQueue::Initialized(state);
233 6 : Ok(self.initialized_mut().expect("we just set it"))
234 6 : }
235 :
236 5257 : pub(crate) fn initialized_mut(
237 5257 : &mut self,
238 5257 : ) -> Result<&mut UploadQueueInitialized, NotInitialized> {
239 : use UploadQueue::*;
240 5257 : match self {
241 0 : Uninitialized => Err(NotInitialized::Uninitialized),
242 5257 : Initialized(x) => {
243 5257 : if x.shutting_down {
244 0 : Err(NotInitialized::ShuttingDown)
245 : } else {
246 5257 : Ok(x)
247 : }
248 : }
249 0 : Stopped(_) => Err(NotInitialized::Stopped),
250 : }
251 5257 : }
252 :
253 2 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
254 2 : match self {
255 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
256 0 : anyhow::bail!("queue is in state {}", self.as_str())
257 : }
258 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
259 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
260 : }
261 2 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
262 : }
263 2 : }
264 : }
265 :
266 : /// An in-progress upload or delete task.
267 : #[derive(Debug)]
268 : pub(crate) struct UploadTask {
269 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
270 : pub(crate) task_id: u64,
271 : pub(crate) retries: AtomicU32,
272 :
273 : pub(crate) op: UploadOp,
274 : }
275 :
276 : /// A deletion of some layers within the lifetime of a timeline. This is not used
277 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
278 : #[derive(Debug, Clone)]
279 : pub(crate) struct Delete {
280 : pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
281 : }
282 :
283 : #[derive(Debug)]
284 : pub(crate) enum UploadOp {
285 : /// Upload a layer file
286 : UploadLayer(ResidentLayer, LayerFileMetadata),
287 :
288 : /// Upload a index_part.json file
289 : UploadMetadata {
290 : /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
291 : uploaded: Box<IndexPart>,
292 : },
293 :
294 : /// Delete layer files
295 : Delete(Delete),
296 :
297 : /// Barrier. When the barrier operation is reached, the channel is closed.
298 : Barrier(tokio::sync::watch::Sender<()>),
299 :
300 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
301 : /// this is the same as a Barrier.
302 : Shutdown,
303 : }
304 :
305 : impl std::fmt::Display for UploadOp {
306 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
307 0 : match self {
308 0 : UploadOp::UploadLayer(layer, metadata) => {
309 0 : write!(
310 0 : f,
311 0 : "UploadLayer({}, size={:?}, gen={:?})",
312 0 : layer, metadata.file_size, metadata.generation
313 0 : )
314 : }
315 0 : UploadOp::UploadMetadata { uploaded, .. } => {
316 0 : write!(
317 0 : f,
318 0 : "UploadMetadata(lsn: {})",
319 0 : uploaded.metadata.disk_consistent_lsn()
320 0 : )
321 : }
322 0 : UploadOp::Delete(delete) => {
323 0 : write!(f, "Delete({} layers)", delete.layers.len())
324 : }
325 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
326 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
327 : }
328 0 : }
329 : }
|