Line data Source code
1 : mod downloader;
2 : pub mod heatmap;
3 : mod heatmap_uploader;
4 : mod scheduler;
5 :
6 : use std::{sync::Arc, time::SystemTime};
7 :
8 : use crate::{
9 : config::PageServerConf,
10 : disk_usage_eviction_task::DiskUsageEvictionInfo,
11 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
12 : virtual_file::MaybeFatalIo,
13 : };
14 :
15 : use self::{
16 : downloader::{downloader_task, SecondaryDetail},
17 : heatmap_uploader::heatmap_uploader_task,
18 : };
19 :
20 : use super::{
21 : config::{SecondaryLocationConfig, TenantConfOpt},
22 : mgr::TenantManager,
23 : span::debug_assert_current_span_has_tenant_id,
24 : storage_layer::LayerFileName,
25 : };
26 :
27 : use pageserver_api::{
28 : models,
29 : shard::{ShardIdentity, TenantShardId},
30 : };
31 : use remote_storage::GenericRemoteStorage;
32 :
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::instrument;
35 : use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
36 :
37 : enum DownloadCommand {
38 : Download(TenantShardId),
39 : }
40 : enum UploadCommand {
41 : Upload(TenantShardId),
42 : }
43 :
44 : impl UploadCommand {
45 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
46 0 : match self {
47 0 : Self::Upload(id) => id,
48 0 : }
49 0 : }
50 : }
51 :
52 : impl DownloadCommand {
53 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
54 0 : match self {
55 0 : Self::Download(id) => id,
56 0 : }
57 0 : }
58 : }
59 :
60 : struct CommandRequest<T> {
61 : payload: T,
62 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
63 : }
64 :
65 : struct CommandResponse {
66 : result: anyhow::Result<()>,
67 : }
68 :
69 : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
70 : // we do for secondary tenant locations: where we are not serving clients or
71 : // ingesting WAL, but we are maintaining a warm cache of layer files.
72 : //
73 : // This type is all about the _download_ path for secondary mode. The upload path
74 : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
75 : //
76 : // This structure coordinates TenantManager and SecondaryDownloader,
77 : // so that the downloader can indicate which tenants it is currently
78 : // operating on, and the manager can indicate when a particular
79 : // secondary tenant should cancel any work in flight.
80 0 : #[derive(Debug)]
81 : pub(crate) struct SecondaryTenant {
82 : /// Carrying a tenant shard ID simplifies callers such as the downloader
83 : /// which need to organize many of these objects by ID.
84 : tenant_shard_id: TenantShardId,
85 :
86 : /// Cancellation token indicates to SecondaryDownloader that it should stop doing
87 : /// any work for this tenant at the next opportunity.
88 : pub(crate) cancel: CancellationToken,
89 :
90 : pub(crate) gate: Gate,
91 :
92 : // Secondary mode does not need the full shard identity or the TenantConfOpt. However,
93 : // storing these enables us to report our full LocationConf, enabling convenient reconciliation
94 : // by the control plane (see [`Self::get_location_conf`])
95 : shard_identity: ShardIdentity,
96 : tenant_conf: std::sync::Mutex<TenantConfOpt>,
97 :
98 : detail: std::sync::Mutex<SecondaryDetail>,
99 : }
100 :
101 : impl SecondaryTenant {
102 0 : pub(crate) fn new(
103 0 : tenant_shard_id: TenantShardId,
104 0 : shard_identity: ShardIdentity,
105 0 : tenant_conf: TenantConfOpt,
106 0 : config: &SecondaryLocationConfig,
107 0 : ) -> Arc<Self> {
108 0 : Arc::new(Self {
109 0 : tenant_shard_id,
110 0 : // todo: shall we make this a descendent of the
111 0 : // main cancellation token, or is it sufficient that
112 0 : // on shutdown we walk the tenants and fire their
113 0 : // individual cancellations?
114 0 : cancel: CancellationToken::new(),
115 0 : gate: Gate::default(),
116 0 :
117 0 : shard_identity,
118 0 : tenant_conf: std::sync::Mutex::new(tenant_conf),
119 0 :
120 0 : detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
121 0 : })
122 0 : }
123 :
124 0 : pub(crate) async fn shutdown(&self) {
125 0 : self.cancel.cancel();
126 0 :
127 0 : // Wait for any secondary downloader work to complete
128 0 : self.gate.close().await;
129 0 : }
130 :
131 0 : pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
132 0 : self.detail.lock().unwrap().config = config.clone();
133 0 : }
134 :
135 0 : pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
136 0 : *(self.tenant_conf.lock().unwrap()) = config.clone();
137 0 : }
138 :
139 : /// For API access: generate a LocationConfig equivalent to the one that would be used to
140 : /// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
141 : /// rare external API calls, like a reconciliation at startup.
142 0 : pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
143 0 : let conf = self.detail.lock().unwrap().config.clone();
144 0 :
145 0 : let conf = models::LocationConfigSecondary { warm: conf.warm };
146 0 :
147 0 : let tenant_conf = self.tenant_conf.lock().unwrap().clone();
148 0 : models::LocationConfig {
149 0 : mode: models::LocationConfigMode::Secondary,
150 0 : generation: None,
151 0 : secondary_conf: Some(conf),
152 0 : shard_number: self.tenant_shard_id.shard_number.0,
153 0 : shard_count: self.tenant_shard_id.shard_count.literal(),
154 0 : shard_stripe_size: self.shard_identity.stripe_size.0,
155 0 : tenant_conf: tenant_conf.into(),
156 0 : }
157 0 : }
158 :
159 0 : pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
160 0 : &self.tenant_shard_id
161 0 : }
162 :
163 0 : pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
164 0 : self.detail.lock().unwrap().get_layers_for_eviction(self)
165 0 : }
166 :
167 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
168 : pub(crate) async fn evict_layer(
169 : &self,
170 : conf: &PageServerConf,
171 : timeline_id: TimelineId,
172 : name: LayerFileName,
173 : ) {
174 : debug_assert_current_span_has_tenant_id();
175 :
176 : let _guard = match self.gate.enter() {
177 : Ok(g) => g,
178 : Err(_) => {
179 0 : tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
180 : return;
181 : }
182 : };
183 :
184 : let now = SystemTime::now();
185 :
186 : let path = conf
187 : .timeline_path(&self.tenant_shard_id, &timeline_id)
188 : .join(name.file_name());
189 :
190 : // We tolerate ENOENT, because between planning eviction and executing
191 : // it, the secondary downloader could have seen an updated heatmap that
192 : // resulted in a layer being deleted.
193 : // Other local I/O errors are process-fatal: these should never happen.
194 : tokio::fs::remove_file(path)
195 : .await
196 : .or_else(fs_ext::ignore_not_found)
197 : .fatal_err("Deleting layer during eviction");
198 :
199 : // Update the timeline's state. This does not have to be synchronized with
200 : // the download process, because:
201 : // - If downloader is racing with us to remove a file (e.g. because it is
202 : // removed from heatmap), then our mutual .remove() operations will both
203 : // succeed.
204 : // - If downloader is racing with us to download the object (this would require
205 : // multiple eviction iterations to race with multiple download iterations), then
206 : // if we remove it from the state, the worst that happens is the downloader
207 : // downloads it again before re-inserting, or we delete the file but it remains
208 : // in the state map (in which case it will be downloaded if this secondary
209 : // tenant transitions to attached and tries to access it)
210 : //
211 : // The important assumption here is that the secondary timeline state does not
212 : // have to 100% match what is on disk, because it's a best-effort warming
213 : // of the cache.
214 : let mut detail = self.detail.lock().unwrap();
215 : if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
216 : timeline_detail.on_disk_layers.remove(&name);
217 : timeline_detail.evicted_at.insert(name, now);
218 : }
219 : }
220 : }
221 :
222 : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
223 : /// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
224 : /// where we want to immediately upload/download for a particular tenant. In normal operation
225 : /// uploads & downloads are autonomous and not driven by this interface.
226 : pub struct SecondaryController {
227 : upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
228 : download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
229 : }
230 :
231 : impl SecondaryController {
232 0 : async fn dispatch<T>(
233 0 : &self,
234 0 : queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
235 0 : payload: T,
236 0 : ) -> anyhow::Result<()> {
237 0 : let (response_tx, response_rx) = tokio::sync::oneshot::channel();
238 0 :
239 0 : queue
240 0 : .send(CommandRequest {
241 0 : payload,
242 0 : response_tx,
243 0 : })
244 0 : .await
245 0 : .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
246 :
247 0 : let response = response_rx
248 0 : .await
249 0 : .map_err(|_| anyhow::anyhow!("Request dropped"))?;
250 :
251 0 : response.result
252 0 : }
253 :
254 0 : pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
255 0 : self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
256 0 : .await
257 0 : }
258 0 : pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
259 0 : self.dispatch(
260 0 : &self.download_req_tx,
261 0 : DownloadCommand::Download(tenant_shard_id),
262 0 : )
263 0 : .await
264 0 : }
265 : }
266 :
267 0 : pub fn spawn_tasks(
268 0 : tenant_manager: Arc<TenantManager>,
269 0 : remote_storage: GenericRemoteStorage,
270 0 : background_jobs_can_start: Barrier,
271 0 : cancel: CancellationToken,
272 0 : ) -> SecondaryController {
273 0 : let mgr_clone = tenant_manager.clone();
274 0 : let storage_clone = remote_storage.clone();
275 0 : let cancel_clone = cancel.clone();
276 0 : let bg_jobs_clone = background_jobs_can_start.clone();
277 0 :
278 0 : let (download_req_tx, download_req_rx) =
279 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
280 0 : let (upload_req_tx, upload_req_rx) =
281 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
282 0 :
283 0 : task_mgr::spawn(
284 0 : BACKGROUND_RUNTIME.handle(),
285 0 : TaskKind::SecondaryDownloads,
286 0 : None,
287 0 : None,
288 0 : "secondary tenant downloads",
289 0 : false,
290 0 : async move {
291 0 : downloader_task(
292 0 : mgr_clone,
293 0 : storage_clone,
294 0 : download_req_rx,
295 0 : bg_jobs_clone,
296 0 : cancel_clone,
297 0 : )
298 0 : .await;
299 :
300 0 : Ok(())
301 0 : },
302 0 : );
303 0 :
304 0 : task_mgr::spawn(
305 0 : BACKGROUND_RUNTIME.handle(),
306 0 : TaskKind::SecondaryUploads,
307 0 : None,
308 0 : None,
309 0 : "heatmap uploads",
310 0 : false,
311 0 : async move {
312 0 : heatmap_uploader_task(
313 0 : tenant_manager,
314 0 : remote_storage,
315 0 : upload_req_rx,
316 0 : background_jobs_can_start,
317 0 : cancel,
318 0 : )
319 0 : .await;
320 :
321 0 : Ok(())
322 0 : },
323 0 : );
324 0 :
325 0 : SecondaryController {
326 0 : download_req_tx,
327 0 : upload_req_tx,
328 0 : }
329 0 : }
330 :
331 : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
332 0 : pub fn null_controller() -> SecondaryController {
333 0 : let (download_req_tx, _download_req_rx) =
334 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
335 0 : let (upload_req_tx, _upload_req_rx) =
336 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
337 0 : SecondaryController {
338 0 : upload_req_tx,
339 0 : download_req_tx,
340 0 : }
341 0 : }
|