Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod batch_split_writer;
4 : pub mod delta_layer;
5 : pub mod filter_iterator;
6 : pub mod image_layer;
7 : pub mod inmemory_layer;
8 : pub(crate) mod layer;
9 : mod layer_desc;
10 : mod layer_name;
11 : pub mod merge_iterator;
12 :
13 : use std::cmp::Ordering;
14 : use std::collections::hash_map::Entry;
15 : use std::collections::{BinaryHeap, HashMap};
16 : use std::ops::Range;
17 : use std::pin::Pin;
18 : use std::sync::Arc;
19 : use std::sync::atomic::AtomicUsize;
20 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
21 :
22 : use crate::PERF_TRACE_TARGET;
23 : pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
24 : use bytes::Bytes;
25 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
26 : use futures::StreamExt;
27 : use futures::stream::FuturesUnordered;
28 : pub use image_layer::{ImageLayer, ImageLayerWriter};
29 : pub use inmemory_layer::InMemoryLayer;
30 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
31 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
32 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
33 : use pageserver_api::key::Key;
34 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
35 : use pageserver_api::record::NeonWalRecord;
36 : use pageserver_api::value::Value;
37 : use tracing::{Instrument, info_span, trace};
38 : use utils::lsn::Lsn;
39 : use utils::sync::gate::GateGuard;
40 :
41 : use self::inmemory_layer::InMemoryLayerFileId;
42 : use super::PageReconstructError;
43 : use super::layer_map::InMemoryLayerDesc;
44 : use super::timeline::{GetVectoredError, ReadPath};
45 : use crate::config::PageServerConf;
46 : use crate::context::{
47 : AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
48 : };
49 :
50 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
51 0 : where
52 0 : T: PartialOrd<T>,
53 0 : {
54 0 : if a.start < b.start {
55 0 : a.end > b.start
56 : } else {
57 0 : b.end > a.start
58 : }
59 0 : }
60 :
61 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
62 : ///
63 : /// Before first call, you can fill in 'page_img' if you have an older cached
64 : /// version of the page available. That can save work in
65 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
66 : /// when all the WAL records going back to the cached image have been collected.
67 : ///
68 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
69 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
70 : /// record that initializes the page without requiring a previous image.
71 : ///
72 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
73 : /// been collected, but there are more records outside the current layer. Pass
74 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
75 : /// call, to collect more records.
76 : ///
77 : #[derive(Debug, Default)]
78 : pub(crate) struct ValueReconstructState {
79 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
80 : pub(crate) img: Option<(Lsn, Bytes)>,
81 : }
82 :
83 : impl ValueReconstructState {
84 : /// Returns the number of page deltas applied to the page image.
85 8785576 : pub fn num_deltas(&self) -> usize {
86 8785576 : match self.img {
87 8455960 : Some(_) => self.records.len(),
88 329616 : None => self.records.len() - 1, // omit will_init record
89 : }
90 8785576 : }
91 : }
92 :
93 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
94 : pub(crate) enum ValueReconstructSituation {
95 : Complete,
96 : #[default]
97 : Continue,
98 : }
99 :
100 : /// On disk representation of a value loaded in a buffer
101 : #[derive(Debug)]
102 : pub(crate) enum OnDiskValue {
103 : /// Unencoded [`Value::Image`]
104 : RawImage(Bytes),
105 : /// Encoded [`Value`]. Can deserialize into an image or a WAL record
106 : WalRecordOrImage(Bytes),
107 : }
108 :
109 : /// Reconstruct data accumulated for a single key during a vectored get
110 : #[derive(Debug, Default)]
111 : pub(crate) struct VectoredValueReconstructState {
112 : pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
113 :
114 : pub(crate) situation: ValueReconstructSituation,
115 : }
116 :
117 : #[derive(Debug)]
118 : pub(crate) struct OnDiskValueIoWaiter {
119 : rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
120 : }
121 :
122 : #[derive(Debug)]
123 : #[must_use]
124 : pub(crate) enum OnDiskValueIo {
125 : /// Traversal identified this IO as required to complete the vectored get.
126 : Required {
127 : num_active_ios: Arc<AtomicUsize>,
128 : tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
129 : },
130 : /// Sparse keyspace reads always read all the values for a given key,
131 : /// even though only the first value is needed.
132 : ///
133 : /// This variant represents the unnecessary IOs for those values at lower LSNs
134 : /// that aren't needed, but are currently still being done.
135 : ///
136 : /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
137 : /// We added this explicit representation here so that we can drop
138 : /// unnecessary IO results immediately, instead of buffering them in
139 : /// `oneshot` channels inside [`VectoredValueReconstructState`] until
140 : /// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
141 : Unnecessary,
142 : }
143 :
144 : type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
145 :
146 : impl OnDiskValueIo {
147 21501713 : pub(crate) fn complete(self, res: OnDiskValueIoResult) {
148 21501713 : match self {
149 21067454 : OnDiskValueIo::Required { num_active_ios, tx } => {
150 21067454 : num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
151 21067454 : let _ = tx.send(res);
152 21067454 : }
153 434259 : OnDiskValueIo::Unnecessary => {
154 434259 : // Nobody cared, see variant doc comment.
155 434259 : }
156 : }
157 21501713 : }
158 : }
159 :
160 : #[derive(Debug, thiserror::Error)]
161 : pub(crate) enum WaitCompletionError {
162 : #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
163 : IoDropped,
164 : }
165 :
166 : impl OnDiskValueIoWaiter {
167 21067448 : pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
168 21067448 : // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
169 21067448 : self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
170 21067448 : }
171 : }
172 :
173 : impl VectoredValueReconstructState {
174 : /// # Cancel-Safety
175 : ///
176 : /// Technically fine to stop polling this future, but, the IOs will still
177 : /// be executed to completion by the sidecar task and hold on to / consume resources.
178 : /// Better not do it to make reasonsing about the system easier.
179 4393208 : pub(crate) async fn collect_pending_ios(
180 4393208 : self,
181 4393208 : ) -> Result<ValueReconstructState, PageReconstructError> {
182 : use utils::bin_ser::BeSer;
183 :
184 4393208 : let mut res = Ok(ValueReconstructState::default());
185 :
186 : // We should try hard not to bail early, so that by the time we return from this
187 : // function, all IO for this value is done. It's not required -- we could totally
188 : // stop polling the IO futures in the sidecar task, they need to support that,
189 : // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
190 : // to reason about the system if we just wait for all IO to complete, even if
191 : // we're no longer interested in the result.
192 : //
193 : // Revisit this when IO futures are replaced with a more sophisticated IO system
194 : // and an IO scheduler, where we know which IOs were submitted and which ones
195 : // just queued. Cf the comment on IoConcurrency::spawn_io.
196 25460656 : for (lsn, waiter) in self.on_disk_values {
197 21067448 : let value_recv_res = waiter
198 21067448 : .wait_completion()
199 21067448 : // we rely on the caller to poll us to completion, so this is not a bail point
200 21067448 : .await;
201 : // Force not bailing early by wrapping the code into a closure.
202 : #[allow(clippy::redundant_closure_call)]
203 21067448 : let _: () = (|| {
204 21067448 : match (&mut res, value_recv_res) {
205 0 : (Err(_), _) => {
206 0 : // We've already failed, no need to process more.
207 0 : }
208 0 : (Ok(_), Err(wait_err)) => {
209 0 : // This shouldn't happen - likely the sidecar task panicked.
210 0 : res = Err(PageReconstructError::Other(wait_err.into()));
211 0 : }
212 0 : (Ok(_), Ok(Err(err))) => {
213 0 : let err: std::io::Error = err;
214 0 : // TODO: returning IO error here will fail a compute query.
215 0 : // Probably not what we want, we're not doing `maybe_fatal_err`
216 0 : // in the IO futures.
217 0 : // But it's been like that for a long time, not changing it
218 0 : // as part of concurrent IO.
219 0 : // => https://github.com/neondatabase/neon/issues/10454
220 0 : res = Err(PageReconstructError::Other(err.into()));
221 0 : }
222 334853 : (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
223 334853 : assert!(ok.img.is_none());
224 334853 : ok.img = Some((lsn, img));
225 : }
226 20732595 : (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
227 20732595 : match Value::des(&buf) {
228 16839048 : Ok(Value::WalRecord(rec)) => {
229 16839048 : ok.records.push((lsn, rec));
230 16839048 : }
231 3893547 : Ok(Value::Image(img)) => {
232 3893547 : assert!(ok.img.is_none());
233 3893547 : ok.img = Some((lsn, img));
234 : }
235 0 : Err(err) => {
236 0 : res = Err(PageReconstructError::Other(err.into()));
237 0 : }
238 : }
239 : }
240 : }
241 21067448 : })();
242 21067448 : }
243 :
244 4393208 : res
245 4393208 : }
246 : }
247 :
248 : /// Bag of data accumulated during a vectored get..
249 : pub(crate) struct ValuesReconstructState {
250 : /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
251 : /// should not expect to get anything from this hashmap.
252 : pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
253 : /// The keys which are already retrieved
254 : keys_done: KeySpaceRandomAccum,
255 :
256 : /// The keys covered by the image layers
257 : keys_with_image_coverage: Option<Range<Key>>,
258 :
259 : // Statistics that are still accessible as a caller of `get_vectored_impl`.
260 : layers_visited: u32,
261 : delta_layers_visited: u32,
262 :
263 : pub(crate) io_concurrency: IoConcurrency,
264 : num_active_ios: Arc<AtomicUsize>,
265 :
266 : pub(crate) read_path: Option<ReadPath>,
267 : }
268 :
269 : /// The level of IO concurrency to be used on the read path
270 : ///
271 : /// The desired end state is that we always do parallel IO.
272 : /// This struct and the dispatching in the impl will be removed once
273 : /// we've built enough confidence.
274 : pub(crate) enum IoConcurrency {
275 : Sequential,
276 : SidecarTask {
277 : task_id: usize,
278 : ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
279 : },
280 : }
281 :
282 : type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
283 :
284 : pub(crate) enum SelectedIoConcurrency {
285 : Sequential,
286 : SidecarTask(GateGuard),
287 : }
288 :
289 : impl std::fmt::Debug for IoConcurrency {
290 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 0 : match self {
292 0 : IoConcurrency::Sequential => write!(f, "Sequential"),
293 0 : IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
294 : }
295 0 : }
296 : }
297 :
298 : impl std::fmt::Debug for SelectedIoConcurrency {
299 192 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 192 : match self {
301 96 : SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
302 96 : SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
303 : }
304 192 : }
305 : }
306 :
307 : impl IoConcurrency {
308 : /// Force sequential IO. This is a temporary workaround until we have
309 : /// moved plumbing-through-the-call-stack
310 : /// of IoConcurrency into `RequestContextq.
311 : ///
312 : /// DO NOT USE for new code.
313 : ///
314 : /// Tracking issue: <https://github.com/neondatabase/neon/issues/10460>.
315 3646016 : pub(crate) fn sequential() -> Self {
316 3646016 : Self::spawn(SelectedIoConcurrency::Sequential)
317 3646016 : }
318 :
319 3180 : pub(crate) fn spawn_from_conf(
320 3180 : conf: &'static PageServerConf,
321 3180 : gate_guard: GateGuard,
322 3180 : ) -> IoConcurrency {
323 : use pageserver_api::config::GetVectoredConcurrentIo;
324 3180 : let selected = match conf.get_vectored_concurrent_io {
325 3180 : GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
326 0 : GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
327 : };
328 3180 : Self::spawn(selected)
329 3180 : }
330 :
331 3649388 : pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
332 3649388 : match io_concurrency {
333 3649292 : SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
334 96 : SelectedIoConcurrency::SidecarTask(gate_guard) => {
335 96 : let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
336 : static TASK_ID: AtomicUsize = AtomicUsize::new(0);
337 96 : let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
338 : // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
339 96 : let span =
340 96 : tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
341 96 : trace!(task_id, "spawning sidecar task");
342 96 : tokio::spawn(async move {
343 96 : trace!("start");
344 96 : scopeguard::defer!{ trace!("end") };
345 : type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
346 : enum State {
347 : Waiting {
348 : // invariant: is_empty(), but we recycle the allocation
349 : empty_futures: FuturesUnordered<IoFuture>,
350 : ios_rx: IosRx,
351 : },
352 : Executing {
353 : futures: FuturesUnordered<IoFuture>,
354 : ios_rx: IosRx,
355 : },
356 : ShuttingDown {
357 : futures: FuturesUnordered<IoFuture>,
358 : },
359 : }
360 96 : let mut state = State::Waiting {
361 96 : empty_futures: FuturesUnordered::new(),
362 96 : ios_rx,
363 96 : };
364 : loop {
365 117314 : match state {
366 : State::Waiting {
367 55674 : empty_futures,
368 55674 : mut ios_rx,
369 55674 : } => {
370 55674 : assert!(empty_futures.is_empty());
371 55674 : tokio::select! {
372 55674 : fut = ios_rx.recv() => {
373 55578 : if let Some(fut) = fut {
374 55578 : trace!("received new io future");
375 55578 : empty_futures.push(fut);
376 55578 : state = State::Executing { futures: empty_futures, ios_rx };
377 : } else {
378 0 : state = State::ShuttingDown { futures: empty_futures }
379 : }
380 : }
381 : }
382 : }
383 : State::Executing {
384 61640 : mut futures,
385 61640 : mut ios_rx,
386 61640 : } => {
387 61640 : tokio::select! {
388 61640 : res = futures.next() => {
389 58609 : trace!("io future completed");
390 58609 : assert!(res.is_some());
391 58609 : if futures.is_empty() {
392 55578 : state = State::Waiting { empty_futures: futures, ios_rx};
393 55578 : } else {
394 3031 : state = State::Executing { futures, ios_rx };
395 3031 : }
396 : }
397 61640 : fut = ios_rx.recv() => {
398 3031 : if let Some(fut) = fut {
399 3031 : trace!("received new io future");
400 3031 : futures.push(fut);
401 3031 : state = State::Executing { futures, ios_rx};
402 0 : } else {
403 0 : state = State::ShuttingDown { futures };
404 0 : }
405 : }
406 : }
407 : }
408 : State::ShuttingDown {
409 0 : mut futures,
410 0 : } => {
411 0 : trace!("shutting down");
412 0 : while let Some(()) = futures.next().await {
413 0 : trace!("io future completed (shutdown)");
414 : // drain
415 : }
416 0 : trace!("shutdown complete");
417 0 : break;
418 0 : }
419 0 : }
420 0 : }
421 0 : drop(gate_guard); // drop it right before we exit
422 96 : }.instrument(span));
423 96 : IoConcurrency::SidecarTask { task_id, ios_tx }
424 : }
425 : }
426 3649388 : }
427 :
428 229446 : pub(crate) fn clone(&self) -> Self {
429 229446 : match self {
430 118812 : IoConcurrency::Sequential => IoConcurrency::Sequential,
431 110634 : IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
432 110634 : task_id: *task_id,
433 110634 : ios_tx: ios_tx.clone(),
434 110634 : },
435 : }
436 229446 : }
437 :
438 : /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
439 : ///
440 : /// The IO is represented as an opaque future.
441 : /// IO completion must be handled inside the future, e.g., through a oneshot channel.
442 : ///
443 : /// The API seems simple but there are multiple **pitfalls** involving
444 : /// DEADLOCK RISK.
445 : ///
446 : /// First, there are no guarantees about the exexecution of the IO.
447 : /// It may be `await`ed in-place before this function returns.
448 : /// It may be polled partially by this task and handed off to another task to be finished.
449 : /// It may be polled and then dropped before returning ready.
450 : ///
451 : /// This means that submitted IOs must not be interedependent.
452 : /// Interdependence may be through shared limited resources, e.g.,
453 : /// - VirtualFile file descriptor cache slot acquisition
454 : /// - tokio-epoll-uring slot
455 : ///
456 : /// # Why current usage is safe from deadlocks
457 : ///
458 : /// Textbook condition for a deadlock is that _all_ of the following be given
459 : /// - Mutual exclusion
460 : /// - Hold and wait
461 : /// - No preemption
462 : /// - Circular wait
463 : ///
464 : /// The current usage is safe because:
465 : /// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
466 : /// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
467 : /// for acquisition of other resources:
468 : /// - VirtualFile file descriptor cache slot tokio mutex
469 : /// - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
470 : /// - No preemption: there's no taking-away of acquired locks/resources => given
471 : /// - Circular wait: this is the part of the condition that isn't met: all IO futures
472 : /// first acquire VirtualFile mutex, then tokio-epoll-uring slot.
473 : /// There is no IO future that acquires slot before VirtualFile.
474 : /// Hence there can be no circular waiting.
475 : /// Hence there cannot be a deadlock.
476 : ///
477 : /// This is a very fragile situation and must be revisited whenver any code called from
478 : /// inside the IO futures is changed.
479 : ///
480 : /// We will move away from opaque IO futures towards well-defined IOs at some point in
481 : /// the future when we have shipped this first version of concurrent IO to production
482 : /// and are ready to retire the Sequential mode which runs the futures in place.
483 : /// Right now, while brittle, the opaque IO approach allows us to ship the feature
484 : /// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
485 : ///
486 : /// Also read the comment in `collect_pending_ios`.
487 4946232 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
488 4946232 : where
489 4946232 : F: std::future::Future<Output = ()> + Send + 'static,
490 4946232 : {
491 4946232 : match self {
492 4887623 : IoConcurrency::Sequential => fut.await,
493 58609 : IoConcurrency::SidecarTask { ios_tx, .. } => {
494 58609 : let fut = Box::pin(fut);
495 58609 : // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
496 58609 : // while insignificant for latency.
497 58609 : // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
498 58609 : // a submission here, but never poll the future. That way, io_uring can make proccess while
499 58609 : // the future sits in the ios_tx queue.
500 58609 : match ios_tx.send(fut) {
501 58609 : Ok(()) => {}
502 : Err(_) => {
503 0 : unreachable!("the io task must have exited, likely it panicked")
504 : }
505 : }
506 : }
507 : }
508 4946232 : }
509 :
510 : #[cfg(test)]
511 192 : pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
512 : use std::ops::{Deref, DerefMut};
513 :
514 : use tracing::info;
515 : use utils::sync::gate::Gate;
516 :
517 : // Spawn needs a Gate, give it one.
518 : struct Wrapper {
519 : inner: IoConcurrency,
520 : #[allow(dead_code)]
521 : gate: Box<Gate>,
522 : }
523 : impl Deref for Wrapper {
524 : type Target = IoConcurrency;
525 :
526 110922 : fn deref(&self) -> &Self::Target {
527 110922 : &self.inner
528 110922 : }
529 : }
530 : impl DerefMut for Wrapper {
531 0 : fn deref_mut(&mut self) -> &mut Self::Target {
532 0 : &mut self.inner
533 0 : }
534 : }
535 192 : let gate = Box::new(Gate::default());
536 :
537 : // The default behavior when running Rust unit tests without any further
538 : // flags is to use the new behavior.
539 : // The CI uses the following environment variable to unit test both old
540 : // and new behavior.
541 : // NB: the Python regression & perf tests take the `else` branch
542 : // below and have their own defaults management.
543 192 : let selected = {
544 : // The pageserver_api::config type is unsuitable because it's internally tagged.
545 192 : #[derive(serde::Deserialize)]
546 : #[serde(rename_all = "kebab-case")]
547 : enum TestOverride {
548 : Sequential,
549 : SidecarTask,
550 : }
551 : use once_cell::sync::Lazy;
552 192 : static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
553 192 : utils::env::var_serde_json_string(
554 192 : "NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
555 192 : )
556 192 : .unwrap_or(TestOverride::SidecarTask)
557 192 : });
558 :
559 192 : match *TEST_OVERRIDE {
560 96 : TestOverride::Sequential => SelectedIoConcurrency::Sequential,
561 : TestOverride::SidecarTask => {
562 96 : SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
563 : }
564 : }
565 : };
566 :
567 192 : info!(?selected, "get_vectored_concurrent_io test");
568 :
569 192 : Wrapper {
570 192 : inner: Self::spawn(selected),
571 192 : gate,
572 192 : }
573 192 : }
574 : }
575 :
576 : /// Make noise in case the [`ValuesReconstructState`] gets dropped while
577 : /// there are still IOs in flight.
578 : /// Refer to `collect_pending_ios` for why we prefer not to do that.
579 : //
580 : /// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
581 : /// gets dropped in a tracing span with more context.
582 : /// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
583 : /// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
584 : impl Drop for ValuesReconstructState {
585 3778850 : fn drop(&mut self) {
586 3778850 : let num_active_ios = self
587 3778850 : .num_active_ios
588 3778850 : .load(std::sync::atomic::Ordering::Acquire);
589 3778850 : if num_active_ios == 0 {
590 3778844 : return;
591 6 : }
592 6 : let sidecar_task_id = match &self.io_concurrency {
593 0 : IoConcurrency::Sequential => None,
594 6 : IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
595 : };
596 6 : tracing::warn!(
597 : num_active_ios,
598 : ?sidecar_task_id,
599 0 : backtrace=%std::backtrace::Backtrace::force_capture(),
600 0 : "dropping ValuesReconstructState while some IOs have not been completed",
601 : );
602 3778850 : }
603 : }
604 :
605 : impl ValuesReconstructState {
606 3778850 : pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
607 3778850 : Self {
608 3778850 : keys: HashMap::new(),
609 3778850 : keys_done: KeySpaceRandomAccum::new(),
610 3778850 : keys_with_image_coverage: None,
611 3778850 : layers_visited: 0,
612 3778850 : delta_layers_visited: 0,
613 3778850 : io_concurrency,
614 3778850 : num_active_ios: Arc::new(AtomicUsize::new(0)),
615 3778850 : read_path: None,
616 3778850 : }
617 3778850 : }
618 :
619 : /// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
620 4946232 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
621 4946232 : where
622 4946232 : F: std::future::Future<Output = ()> + Send + 'static,
623 4946232 : {
624 4946232 : self.io_concurrency.spawn_io(fut).await;
625 4946232 : }
626 :
627 5316626 : pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
628 5316626 : self.layers_visited += 1;
629 5316626 : if let ReadableLayer::PersistentLayer(layer) = layer {
630 1599415 : if layer.layer_desc().is_delta() {
631 1417975 : self.delta_layers_visited += 1;
632 1417975 : }
633 3717211 : }
634 5316626 : }
635 :
636 1440 : pub(crate) fn get_delta_layers_visited(&self) -> u32 {
637 1440 : self.delta_layers_visited
638 1440 : }
639 :
640 3778676 : pub(crate) fn get_layers_visited(&self) -> u32 {
641 3778676 : self.layers_visited
642 3778676 : }
643 :
644 : /// On hitting image layer, we can mark all keys in this range as done, because
645 : /// if the image layer does not contain a key, it is deleted/never added.
646 181512 : pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
647 181512 : let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
648 181512 : assert_eq!(
649 : prev_val, None,
650 0 : "should consume the keyspace before the next iteration"
651 : );
652 181512 : }
653 :
654 : /// Update the state collected for a given key.
655 : /// Returns true if this was the last value needed for the key and false otherwise.
656 : ///
657 : /// If the key is done after the update, mark it as such.
658 : ///
659 : /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
660 : /// `key_done`.
661 : // TODO: rename this method & update description.
662 21501713 : pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
663 21501713 : let state = self.keys.entry(*key).or_default();
664 21501713 :
665 21501713 : let is_sparse_key = key.is_sparse();
666 :
667 21501713 : let required_io = match state.situation {
668 : ValueReconstructSituation::Complete => {
669 434259 : if is_sparse_key {
670 : // Sparse keyspace might be visited multiple times because
671 : // we don't track unmapped keyspaces.
672 434259 : return OnDiskValueIo::Unnecessary;
673 : } else {
674 0 : unreachable!()
675 : }
676 : }
677 : ValueReconstructSituation::Continue => {
678 21067454 : self.num_active_ios
679 21067454 : .fetch_add(1, std::sync::atomic::Ordering::Release);
680 21067454 : let (tx, rx) = tokio::sync::oneshot::channel();
681 21067454 : state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
682 21067454 : OnDiskValueIo::Required {
683 21067454 : tx,
684 21067454 : num_active_ios: Arc::clone(&self.num_active_ios),
685 21067454 : }
686 21067454 : }
687 21067454 : };
688 21067454 :
689 21067454 : if completes && state.situation == ValueReconstructSituation::Continue {
690 4393214 : state.situation = ValueReconstructSituation::Complete;
691 4393214 : if !is_sparse_key {
692 4010642 : self.keys_done.add_key(*key);
693 4010642 : }
694 16674240 : }
695 :
696 21067454 : required_io
697 21501713 : }
698 :
699 : /// Returns the key space describing the keys that have
700 : /// been marked as completed since the last call to this function.
701 : /// Returns individual keys done, and the image layer coverage.
702 5316626 : pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
703 5316626 : (
704 5316626 : self.keys_done.consume_keyspace(),
705 5316626 : self.keys_with_image_coverage.take(),
706 5316626 : )
707 5316626 : }
708 : }
709 :
710 : /// A key that uniquely identifies a layer in a timeline
711 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
712 : pub(crate) enum LayerId {
713 : PersitentLayerId(PersistentLayerKey),
714 : InMemoryLayerId(InMemoryLayerFileId),
715 : }
716 :
717 : /// Uniquely identify a layer visit by the layer
718 : /// and LSN range of the reads. Note that the end of the range is exclusive.
719 : ///
720 : /// The layer itself is not enough since we may have different LSN lower
721 : /// bounds for delta layer reads. Scenarios where this can happen are:
722 : ///
723 : /// 1. Layer overlaps: imagine an image layer inside and in-memory layer
724 : /// and a query that only partially hits the image layer. Part of the query
725 : /// needs to read the whole in-memory layer and the other part needs to read
726 : /// only up to the image layer. Hence, they'll have different LSN floor values
727 : /// for the read.
728 : ///
729 : /// 2. Scattered reads: the read path supports starting at different LSNs. Imagine
730 : /// The start LSN for one range is inside a layer and the start LSN for another range
731 : /// Is above the layer (includes all of it). Both ranges need to read the layer all the
732 : /// Way to the end but starting at different points. Hence, they'll have different LSN
733 : /// Ceil values.
734 : ///
735 : /// The implication is that we might visit the same layer multiple times
736 : /// in order to read different LSN ranges from it. In practice, this isn't very concerning
737 : /// because:
738 : /// 1. Layer overlaps are rare and generally not intended
739 : /// 2. Scattered reads will stabilise after the first few layers provided their starting LSNs
740 : /// are grouped tightly enough (likely the case).
741 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
742 : struct LayerToVisitId {
743 : layer_id: LayerId,
744 : lsn_floor: Lsn,
745 : lsn_ceil: Lsn,
746 : }
747 :
748 : #[derive(Debug, PartialEq, Eq, Hash)]
749 : pub enum ReadableLayerWeak {
750 : PersistentLayer(Arc<PersistentLayerDesc>),
751 : InMemoryLayer(InMemoryLayerDesc),
752 : }
753 :
754 : /// Layer wrapper for the read path. Note that it is valid
755 : /// to use these layers even after external operations have
756 : /// been performed on them (compaction, freeze, etc.).
757 : #[derive(Debug)]
758 : pub(crate) enum ReadableLayer {
759 : PersistentLayer(Layer),
760 : InMemoryLayer(Arc<InMemoryLayer>),
761 : }
762 :
763 : /// A partial description of a read to be done.
764 : #[derive(Debug, Clone)]
765 : struct LayerVisit {
766 : /// An id used to resolve the readable layer within the fringe
767 : layer_to_visit_id: LayerToVisitId,
768 : /// Lsn range for the read, used for selecting the next read
769 : lsn_range: Range<Lsn>,
770 : }
771 :
772 : /// Data structure which maintains a fringe of layers for the
773 : /// read path. The fringe is the set of layers which intersects
774 : /// the current keyspace that the search is descending on.
775 : /// Each layer tracks the keyspace that intersects it.
776 : ///
777 : /// The fringe must appear sorted by Lsn. Hence, it uses
778 : /// a two layer indexing scheme.
779 : #[derive(Debug)]
780 : pub(crate) struct LayerFringe {
781 : planned_visits_by_lsn: BinaryHeap<LayerVisit>,
782 : visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
783 : }
784 :
785 : #[derive(Debug)]
786 : struct LayerVisitReads {
787 : layer: ReadableLayer,
788 : target_keyspace: KeySpaceRandomAccum,
789 : }
790 :
791 : impl LayerFringe {
792 5131668 : pub(crate) fn new() -> Self {
793 5131668 : LayerFringe {
794 5131668 : planned_visits_by_lsn: BinaryHeap::new(),
795 5131668 : visit_reads: HashMap::new(),
796 5131668 : }
797 5131668 : }
798 :
799 10448294 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
800 10448294 : let read_desc = self.planned_visits_by_lsn.pop()?;
801 :
802 5316626 : let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
803 5316626 :
804 5316626 : match removed {
805 : Some((
806 : _,
807 : LayerVisitReads {
808 5316626 : layer,
809 5316626 : mut target_keyspace,
810 5316626 : },
811 5316626 : )) => Some((
812 5316626 : layer,
813 5316626 : target_keyspace.consume_keyspace(),
814 5316626 : read_desc.lsn_range,
815 5316626 : )),
816 0 : None => unreachable!("fringe internals are always consistent"),
817 : }
818 10448294 : }
819 :
820 5488046 : pub(crate) fn update(
821 5488046 : &mut self,
822 5488046 : layer: ReadableLayer,
823 5488046 : keyspace: KeySpace,
824 5488046 : lsn_range: Range<Lsn>,
825 5488046 : ) {
826 5488046 : let layer_to_visit_id = LayerToVisitId {
827 5488046 : layer_id: layer.id(),
828 5488046 : lsn_floor: lsn_range.start,
829 5488046 : lsn_ceil: lsn_range.end,
830 5488046 : };
831 5488046 :
832 5488046 : let entry = self.visit_reads.entry(layer_to_visit_id.clone());
833 5488046 : match entry {
834 171420 : Entry::Occupied(mut entry) => {
835 171420 : entry.get_mut().target_keyspace.add_keyspace(keyspace);
836 171420 : }
837 5316626 : Entry::Vacant(entry) => {
838 5316626 : self.planned_visits_by_lsn.push(LayerVisit {
839 5316626 : lsn_range,
840 5316626 : layer_to_visit_id: layer_to_visit_id.clone(),
841 5316626 : });
842 5316626 : let mut accum = KeySpaceRandomAccum::new();
843 5316626 : accum.add_keyspace(keyspace);
844 5316626 : entry.insert(LayerVisitReads {
845 5316626 : layer,
846 5316626 : target_keyspace: accum,
847 5316626 : });
848 5316626 : }
849 : }
850 5488046 : }
851 : }
852 :
853 : impl Default for LayerFringe {
854 0 : fn default() -> Self {
855 0 : Self::new()
856 0 : }
857 : }
858 :
859 : impl Ord for LayerVisit {
860 1071342 : fn cmp(&self, other: &Self) -> Ordering {
861 1071342 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
862 1071342 : if ord == std::cmp::Ordering::Equal {
863 87160 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
864 : } else {
865 984182 : ord
866 : }
867 1071342 : }
868 : }
869 :
870 : impl PartialOrd for LayerVisit {
871 1071342 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
872 1071342 : Some(self.cmp(other))
873 1071342 : }
874 : }
875 :
876 : impl PartialEq for LayerVisit {
877 0 : fn eq(&self, other: &Self) -> bool {
878 0 : self.lsn_range == other.lsn_range
879 0 : }
880 : }
881 :
882 : impl Eq for LayerVisit {}
883 :
884 : impl ReadableLayer {
885 5488046 : pub(crate) fn id(&self) -> LayerId {
886 5488046 : match self {
887 1724779 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
888 3763267 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
889 : }
890 5488046 : }
891 :
892 5316626 : pub(crate) async fn get_values_reconstruct_data(
893 5316626 : &self,
894 5316626 : keyspace: KeySpace,
895 5316626 : lsn_range: Range<Lsn>,
896 5316626 : reconstruct_state: &mut ValuesReconstructState,
897 5316626 : ctx: &RequestContext,
898 5316626 : ) -> Result<(), GetVectoredError> {
899 5316626 : match self {
900 1599415 : ReadableLayer::PersistentLayer(layer) => {
901 1599415 : let ctx = RequestContextBuilder::from(ctx)
902 1599415 : .perf_span(|crnt_perf_span| {
903 0 : info_span!(
904 : target: PERF_TRACE_TARGET,
905 0 : parent: crnt_perf_span,
906 : "PLAN_LAYER",
907 : layer = %layer
908 : )
909 1599415 : })
910 1599415 : .attached_child();
911 1599415 :
912 1599415 : layer
913 1599415 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
914 1599415 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
915 1599415 : .await
916 : }
917 3717211 : ReadableLayer::InMemoryLayer(layer) => {
918 3717211 : let ctx = RequestContextBuilder::from(ctx)
919 3717211 : .perf_span(|crnt_perf_span| {
920 0 : info_span!(
921 : target: PERF_TRACE_TARGET,
922 0 : parent: crnt_perf_span,
923 : "PLAN_LAYER",
924 : layer = %layer
925 : )
926 3717211 : })
927 3717211 : .attached_child();
928 3717211 :
929 3717211 : layer
930 3717211 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
931 3717211 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
932 3717211 : .await
933 : }
934 : }
935 5316626 : }
936 : }
937 :
938 : /// Layers contain a hint indicating whether they are likely to be used for reads.
939 : ///
940 : /// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
941 : /// when changing the visibility of layers (for example when creating a branch that makes some previously
942 : /// covered layers visible). It should be used for cache management but not for correctness-critical checks.
943 : #[derive(Debug, Clone, PartialEq, Eq)]
944 : pub enum LayerVisibilityHint {
945 : /// A Visible layer might be read while serving a read, because there is not an image layer between it
946 : /// and a readable LSN (the tip of the branch or a child's branch point)
947 : Visible,
948 : /// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
949 : /// a branch or ephemeral endpoint at an LSN below the layer that covers this.
950 : Covered,
951 : }
952 :
953 : pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
954 :
955 0 : #[derive(Clone, Copy, strum_macros::EnumString)]
956 : pub(crate) enum LayerAccessStatsReset {
957 : NoReset,
958 : AllStats,
959 : }
960 :
961 : impl Default for LayerAccessStats {
962 11724 : fn default() -> Self {
963 11724 : // Default value is to assume resident since creation time, and visible.
964 11724 : let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
965 11724 : value |= 0x1 << Self::VISIBILITY_SHIFT;
966 11724 :
967 11724 : Self(std::sync::atomic::AtomicU64::new(value))
968 11724 : }
969 : }
970 :
971 : // Efficient store of two very-low-resolution timestamps and some bits. Used for storing last access time and
972 : // last residence change time.
973 : impl LayerAccessStats {
974 : // How many high bits to drop from a u32 timestamp?
975 : // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
976 : // after that, this software has been very successful!)
977 : // - Dropping the top bit is implicitly safe because unix timestamps are meant to be
978 : // stored in an i32, so they never used it.
979 : // - Dropping the next two bits is safe because this code is only running on systems in
980 : // years >= 2024, and these bits have been 1 since 2021
981 : //
982 : // Therefore we may store only 28 bits for a timestamp with one second resolution. We do
983 : // this truncation to make space for some flags in the high bits of our u64.
984 : const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
985 : const TS_MASK: u32 = 0x1f_ff_ff_ff;
986 : const TS_ONES: u32 = 0x60_00_00_00;
987 :
988 : const ATIME_SHIFT: u32 = 0;
989 : const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
990 : const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
991 :
992 1600375 : fn write_bits(&self, mask: u64, value: u64) -> u64 {
993 1600375 : self.0
994 1600375 : .fetch_update(
995 1600375 : // TODO: decide what orderings are correct
996 1600375 : std::sync::atomic::Ordering::Relaxed,
997 1600375 : std::sync::atomic::Ordering::Relaxed,
998 1600375 : |v| Some((v & !mask) | (value & mask)),
999 1600375 : )
1000 1600375 : .expect("Inner function is infallible")
1001 1600375 : }
1002 :
1003 1609831 : fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
1004 1609831 : // Drop the low three bits of the timestamp, for an ~8s accuracy
1005 1609831 : let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
1006 1609831 :
1007 1609831 : ((Self::TS_MASK as u64) << shift, timestamp << shift)
1008 1609831 : }
1009 :
1010 876 : fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
1011 876 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
1012 876 :
1013 876 : let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
1014 876 : if ts_bits == 0 {
1015 396 : None
1016 : } else {
1017 480 : Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
1018 : }
1019 876 : }
1020 :
1021 : /// Record a change in layer residency.
1022 : ///
1023 : /// Recording the event must happen while holding the layer map lock to
1024 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
1025 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
1026 : ///
1027 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
1028 : /// the following race could happen:
1029 : ///
1030 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
1031 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
1032 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
1033 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
1034 300 : pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
1035 300 : let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
1036 300 : self.write_bits(mask, value);
1037 300 : }
1038 :
1039 288 : pub(crate) fn record_residence_event(&self) {
1040 288 : self.record_residence_event_at(SystemTime::now())
1041 288 : }
1042 :
1043 1597807 : fn record_access_at(&self, now: SystemTime) -> bool {
1044 1597807 : let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
1045 1597807 :
1046 1597807 : // A layer which is accessed must be visible.
1047 1597807 : mask |= 0x1 << Self::VISIBILITY_SHIFT;
1048 1597807 : value |= 0x1 << Self::VISIBILITY_SHIFT;
1049 1597807 :
1050 1597807 : let old_bits = self.write_bits(mask, value);
1051 12 : !matches!(
1052 1597807 : self.decode_visibility(old_bits),
1053 : LayerVisibilityHint::Visible
1054 : )
1055 1597807 : }
1056 :
1057 : /// Returns true if we modified the layer's visibility to set it to Visible implicitly
1058 : /// as a result of this access
1059 1599487 : pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
1060 1599487 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
1061 1716 : return false;
1062 1597771 : }
1063 1597771 :
1064 1597771 : self.record_access_at(SystemTime::now())
1065 1599487 : }
1066 :
1067 0 : fn as_api_model(
1068 0 : &self,
1069 0 : reset: LayerAccessStatsReset,
1070 0 : ) -> pageserver_api::models::LayerAccessStats {
1071 0 : let ret = pageserver_api::models::LayerAccessStats {
1072 0 : access_time: self
1073 0 : .read_low_res_timestamp(Self::ATIME_SHIFT)
1074 0 : .unwrap_or(UNIX_EPOCH),
1075 0 : residence_time: self
1076 0 : .read_low_res_timestamp(Self::RTIME_SHIFT)
1077 0 : .unwrap_or(UNIX_EPOCH),
1078 0 : visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
1079 : };
1080 0 : match reset {
1081 0 : LayerAccessStatsReset::NoReset => {}
1082 0 : LayerAccessStatsReset::AllStats => {
1083 0 : self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
1084 0 : self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
1085 0 : }
1086 : }
1087 0 : ret
1088 0 : }
1089 :
1090 : /// Get the latest access timestamp, falling back to latest residence event. The latest residence event
1091 : /// will be this Layer's construction time, if its residence hasn't changed since then.
1092 252 : pub(crate) fn latest_activity(&self) -> SystemTime {
1093 252 : if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
1094 36 : t
1095 : } else {
1096 216 : self.read_low_res_timestamp(Self::RTIME_SHIFT)
1097 216 : .expect("Residence time is set on construction")
1098 : }
1099 252 : }
1100 :
1101 : /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
1102 : ///
1103 : /// This indicates whether the layer has been used for some purpose that would motivate
1104 : /// us to keep it on disk, such as for serving a getpage request.
1105 204 : fn accessed(&self) -> bool {
1106 204 : // Consider it accessed if the most recent access is more recent than
1107 204 : // the most recent change in residence status.
1108 204 : match (
1109 204 : self.read_low_res_timestamp(Self::ATIME_SHIFT),
1110 204 : self.read_low_res_timestamp(Self::RTIME_SHIFT),
1111 : ) {
1112 180 : (None, _) => false,
1113 0 : (Some(_), None) => true,
1114 24 : (Some(a), Some(r)) => a >= r,
1115 : }
1116 204 : }
1117 :
1118 : /// Helper for extracting the visibility hint from the literal value of our inner u64
1119 1604855 : fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
1120 1604855 : match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
1121 1604711 : 1 => LayerVisibilityHint::Visible,
1122 144 : 0 => LayerVisibilityHint::Covered,
1123 0 : _ => unreachable!(),
1124 : }
1125 1604855 : }
1126 :
1127 : /// Returns the old value which has been replaced
1128 2268 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
1129 2268 : let value = match visibility {
1130 1956 : LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
1131 312 : LayerVisibilityHint::Covered => 0x0,
1132 : };
1133 :
1134 2268 : let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
1135 2268 : self.decode_visibility(old_bits)
1136 2268 : }
1137 :
1138 4780 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
1139 4780 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
1140 4780 : self.decode_visibility(read)
1141 4780 : }
1142 : }
1143 :
1144 : /// Get a layer descriptor from a layer.
1145 : pub(crate) trait AsLayerDesc {
1146 : /// Get the layer descriptor.
1147 : fn layer_desc(&self) -> &PersistentLayerDesc;
1148 : }
1149 :
1150 : pub mod tests {
1151 : use pageserver_api::shard::TenantShardId;
1152 : use utils::id::TimelineId;
1153 :
1154 : use super::*;
1155 :
1156 : impl From<DeltaLayerName> for PersistentLayerDesc {
1157 132 : fn from(value: DeltaLayerName) -> Self {
1158 132 : PersistentLayerDesc::new_delta(
1159 132 : TenantShardId::from([0; 18]),
1160 132 : TimelineId::from_array([0; 16]),
1161 132 : value.key_range,
1162 132 : value.lsn_range,
1163 132 : 233,
1164 132 : )
1165 132 : }
1166 : }
1167 :
1168 : impl From<ImageLayerName> for PersistentLayerDesc {
1169 144 : fn from(value: ImageLayerName) -> Self {
1170 144 : PersistentLayerDesc::new_img(
1171 144 : TenantShardId::from([0; 18]),
1172 144 : TimelineId::from_array([0; 16]),
1173 144 : value.key_range,
1174 144 : value.lsn,
1175 144 : 233,
1176 144 : )
1177 144 : }
1178 : }
1179 :
1180 : impl From<LayerName> for PersistentLayerDesc {
1181 276 : fn from(value: LayerName) -> Self {
1182 276 : match value {
1183 132 : LayerName::Delta(d) => Self::from(d),
1184 144 : LayerName::Image(i) => Self::from(i),
1185 : }
1186 276 : }
1187 : }
1188 : }
1189 :
1190 : /// Range wrapping newtype, which uses display to render Debug.
1191 : ///
1192 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
1193 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
1194 :
1195 : impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
1196 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1197 0 : write!(f, "{}..{}", self.0.start, self.0.end)
1198 0 : }
1199 : }
1200 :
1201 : #[cfg(test)]
1202 : mod tests2 {
1203 : use pageserver_api::key::DBDIR_KEY;
1204 : use tracing::info;
1205 :
1206 : use super::*;
1207 : use crate::tenant::storage_layer::IoConcurrency;
1208 :
1209 : /// TODO: currently this test relies on manual visual inspection of the --no-capture output.
1210 : /// Should look like so:
1211 : /// ```text
1212 : /// RUST_LOG=trace cargo nextest run --features testing --no-capture test_io_concurrency_noise
1213 : /// running 1 test
1214 : /// 2025-01-21T17:42:01.335679Z INFO get_vectored_concurrent_io test selected=SidecarTask
1215 : /// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
1216 : /// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
1217 : /// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
1218 : /// 2025-01-21T17:42:01.335999Z INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
1219 : /// 2025-01-21T17:42:01.336229Z WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace= 0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
1220 : /// at ./src/tenant/storage_layer.rs:553:24
1221 : /// 1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
1222 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:521:1
1223 : /// 2: core::mem::drop
1224 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:942:24
1225 : /// 3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
1226 : /// at ./src/tenant/storage_layer.rs:1159:9
1227 : /// ...
1228 : /// 49: <unknown>
1229 : /// 2025-01-21T17:42:01.452293Z INFO IoConcurrency_sidecar{task_id=0}: completing IO
1230 : /// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
1231 : /// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
1232 : /// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
1233 : ///
1234 : /// ```
1235 : #[tokio::test]
1236 12 : async fn test_io_concurrency_noise() {
1237 12 : crate::tenant::harness::setup_logging();
1238 12 :
1239 12 : let io_concurrency = IoConcurrency::spawn_for_test();
1240 12 : match *io_concurrency {
1241 12 : IoConcurrency::Sequential => {
1242 12 : // This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
1243 12 : return;
1244 12 : }
1245 12 : IoConcurrency::SidecarTask { .. } => {}
1246 6 : }
1247 6 : let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
1248 6 :
1249 6 : let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
1250 6 : let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
1251 6 : let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
1252 6 :
1253 6 : let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
1254 6 : reconstruct_state
1255 6 : .spawn_io(async move {
1256 6 : info!("waiting for signal to complete IO");
1257 12 : io_fut_is_waiting_tx.send(()).unwrap();
1258 6 : should_complete_io.await.unwrap();
1259 6 : info!("completing IO");
1260 12 : io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
1261 6 : io_fut_exiting_tx.send(()).unwrap();
1262 6 : })
1263 6 : .await;
1264 12 :
1265 12 : io_fut_is_waiting.await.unwrap();
1266 6 :
1267 6 : // this is what makes the noise
1268 6 : drop(reconstruct_state);
1269 6 :
1270 6 : do_complete_io.send(()).unwrap();
1271 6 :
1272 6 : io_fut_exiting.await.unwrap();
1273 12 : }
1274 : }
|