TLA Line data Source code
1 : use super::storage_layer::LayerFileName;
2 : use super::Generation;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use std::collections::{HashMap, VecDeque};
7 : use std::fmt::Debug;
8 :
9 : use chrono::NaiveDateTime;
10 : use std::sync::Arc;
11 : use tracing::info;
12 : use utils::lsn::AtomicLsn;
13 :
14 : use std::sync::atomic::AtomicU32;
15 : use utils::lsn::Lsn;
16 :
17 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
18 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
19 : // that many upload queues in a running pageserver, and most of them are initialized
20 : // anyway.
21 : #[allow(clippy::large_enum_variant)]
22 : pub(super) enum UploadQueue {
23 : Uninitialized,
24 : Initialized(UploadQueueInitialized),
25 : Stopped(UploadQueueStopped),
26 : }
27 :
28 : impl UploadQueue {
29 UBC 0 : pub fn as_str(&self) -> &'static str {
30 0 : match self {
31 0 : UploadQueue::Uninitialized => "Uninitialized",
32 0 : UploadQueue::Initialized(_) => "Initialized",
33 0 : UploadQueue::Stopped(_) => "Stopped",
34 : }
35 0 : }
36 : }
37 :
38 : /// This keeps track of queued and in-progress tasks.
39 : pub(crate) struct UploadQueueInitialized {
40 : /// Counter to assign task IDs
41 : pub(crate) task_counter: u64,
42 :
43 : /// All layer files stored in the remote storage, taking into account all
44 : /// in-progress and queued operations
45 : pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
46 :
47 : /// How many file uploads or deletions been scheduled, since the
48 : /// last (scheduling of) metadata index upload?
49 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
50 :
51 : /// Metadata stored in the remote storage, taking into account all
52 : /// in-progress and queued operations.
53 : /// DANGER: do not return to outside world, e.g., safekeepers.
54 : pub(crate) latest_metadata: TimelineMetadata,
55 :
56 : /// `disk_consistent_lsn` from the last metadata file that was successfully
57 : /// uploaded. `Lsn(0)` if nothing was uploaded yet.
58 : /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
59 : /// Safekeeper can rely on it to make decisions for WAL storage.
60 : ///
61 : /// visible_remote_consistent_lsn is only updated after our generation has been validated with
62 : /// the control plane (unlesss a timeline's generation is None, in which case
63 : /// we skip validation)
64 : pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
65 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
66 :
67 : // Breakdown of different kinds of tasks currently in-progress
68 : pub(crate) num_inprogress_layer_uploads: usize,
69 : pub(crate) num_inprogress_metadata_uploads: usize,
70 : pub(crate) num_inprogress_deletions: usize,
71 :
72 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
73 : /// has been launched for it. An in-progress task can be busy uploading, but it can
74 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
75 : /// be waiting for retry in `exponential_backoff`.
76 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
77 :
78 : /// Queued operations that have not been launched yet. They might depend on previous
79 : /// tasks to finish. For example, metadata upload cannot be performed before all
80 : /// preceding layer file uploads have completed.
81 : pub(crate) queued_operations: VecDeque<UploadOp>,
82 : }
83 :
84 : impl UploadQueueInitialized {
85 CBC 214 : pub(super) fn no_pending_work(&self) -> bool {
86 214 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
87 214 : }
88 :
89 779573 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
90 779573 : self.visible_remote_consistent_lsn.load()
91 779573 : }
92 :
93 2671 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
94 2671 : self.projected_remote_consistent_lsn
95 2671 : }
96 : }
97 :
98 UBC 0 : #[derive(Clone, Copy)]
99 : pub(super) enum SetDeletedFlagProgress {
100 : NotRunning,
101 : InProgress(NaiveDateTime),
102 : Successful(NaiveDateTime),
103 : }
104 :
105 : pub(super) struct UploadQueueStopped {
106 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
107 : pub(super) deleted_at: SetDeletedFlagProgress,
108 : }
109 :
110 : impl UploadQueue {
111 CBC 967 : pub(crate) fn initialize_empty_remote(
112 967 : &mut self,
113 967 : metadata: &TimelineMetadata,
114 967 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
115 967 : match self {
116 967 : UploadQueue::Uninitialized => (),
117 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
118 UBC 0 : anyhow::bail!("already initialized, state {}", self.as_str())
119 : }
120 : }
121 :
122 CBC 967 : info!("initializing upload queue for empty remote");
123 :
124 967 : let state = UploadQueueInitialized {
125 967 : // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
126 967 : latest_files: HashMap::new(),
127 967 : latest_files_changes_since_metadata_upload_scheduled: 0,
128 967 : latest_metadata: metadata.clone(),
129 967 : projected_remote_consistent_lsn: None,
130 967 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
131 967 : // what follows are boring default initializations
132 967 : task_counter: 0,
133 967 : num_inprogress_layer_uploads: 0,
134 967 : num_inprogress_metadata_uploads: 0,
135 967 : num_inprogress_deletions: 0,
136 967 : inprogress_tasks: HashMap::new(),
137 967 : queued_operations: VecDeque::new(),
138 967 : };
139 967 :
140 967 : *self = UploadQueue::Initialized(state);
141 967 : Ok(self.initialized_mut().expect("we just set it"))
142 967 : }
143 :
144 335 : pub(crate) fn initialize_with_current_remote_index_part(
145 335 : &mut self,
146 335 : index_part: &IndexPart,
147 335 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
148 335 : match self {
149 335 : UploadQueue::Uninitialized => (),
150 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
151 UBC 0 : anyhow::bail!("already initialized, state {}", self.as_str())
152 : }
153 : }
154 :
155 CBC 335 : let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
156 9113 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
157 8778 : files.insert(
158 8778 : layer_name.to_owned(),
159 8778 : LayerFileMetadata::from(layer_metadata),
160 8778 : );
161 8778 : }
162 :
163 335 : info!(
164 335 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
165 335 : index_part.metadata.disk_consistent_lsn()
166 335 : );
167 :
168 335 : let state = UploadQueueInitialized {
169 335 : latest_files: files,
170 335 : latest_files_changes_since_metadata_upload_scheduled: 0,
171 335 : latest_metadata: index_part.metadata.clone(),
172 335 : projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
173 335 : visible_remote_consistent_lsn: Arc::new(
174 335 : index_part.metadata.disk_consistent_lsn().into(),
175 335 : ),
176 335 : // what follows are boring default initializations
177 335 : task_counter: 0,
178 335 : num_inprogress_layer_uploads: 0,
179 335 : num_inprogress_metadata_uploads: 0,
180 335 : num_inprogress_deletions: 0,
181 335 : inprogress_tasks: HashMap::new(),
182 335 : queued_operations: VecDeque::new(),
183 335 : };
184 335 :
185 335 : *self = UploadQueue::Initialized(state);
186 335 : Ok(self.initialized_mut().expect("we just set it"))
187 335 : }
188 :
189 30195 : pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
190 30195 : match self {
191 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
192 UBC 0 : anyhow::bail!("queue is in state {}", self.as_str())
193 : }
194 CBC 30195 : UploadQueue::Initialized(x) => Ok(x),
195 : }
196 30195 : }
197 :
198 647 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
199 647 : match self {
200 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
201 UBC 0 : anyhow::bail!("queue is in state {}", self.as_str())
202 : }
203 CBC 647 : UploadQueue::Stopped(stopped) => Ok(stopped),
204 : }
205 647 : }
206 :
207 591 : pub(crate) fn get_layer_metadata(
208 591 : &self,
209 591 : name: &LayerFileName,
210 591 : ) -> anyhow::Result<Option<LayerFileMetadata>> {
211 591 : match self {
212 : UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
213 UBC 0 : anyhow::bail!("queue is in state {}", self.as_str())
214 : }
215 CBC 591 : UploadQueue::Initialized(inner) => Ok(inner.latest_files.get(name).cloned()),
216 : }
217 591 : }
218 : }
219 :
220 : /// An in-progress upload or delete task.
221 UBC 0 : #[derive(Debug)]
222 : pub(crate) struct UploadTask {
223 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
224 : pub(crate) task_id: u64,
225 : pub(crate) retries: AtomicU32,
226 :
227 : pub(crate) op: UploadOp,
228 : }
229 :
230 : /// A deletion of some layers within the lifetime of a timeline. This is not used
231 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
232 0 : #[derive(Debug)]
233 : pub(crate) struct Delete {
234 : pub(crate) layers: Vec<(LayerFileName, Generation)>,
235 : }
236 :
237 0 : #[derive(Debug)]
238 : pub(crate) enum UploadOp {
239 : /// Upload a layer file
240 : UploadLayer(LayerFileName, LayerFileMetadata),
241 :
242 : /// Upload the metadata file
243 : UploadMetadata(IndexPart, Lsn),
244 :
245 : /// Delete layer files
246 : Delete(Delete),
247 :
248 : /// Barrier. When the barrier operation is reached,
249 : Barrier(tokio::sync::watch::Sender<()>),
250 : }
251 :
252 : impl std::fmt::Display for UploadOp {
253 CBC 5146 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
254 5146 : match self {
255 3701 : UploadOp::UploadLayer(path, metadata) => {
256 3701 : write!(
257 3701 : f,
258 3701 : "UploadLayer({}, size={:?}, gen={:?})",
259 3701 : path.file_name(),
260 3701 : metadata.file_size(),
261 3701 : metadata.generation,
262 3701 : )
263 : }
264 1445 : UploadOp::UploadMetadata(_, lsn) => {
265 1445 : write!(f, "UploadMetadata(lsn: {})", lsn)
266 : }
267 UBC 0 : UploadOp::Delete(delete) => {
268 0 : write!(f, "Delete({} layers)", delete.layers.len())
269 : }
270 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
271 : }
272 CBC 5146 : }
273 : }
|