Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{Result, bail};
4 : use camino::Utf8PathBuf;
5 : use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
6 : use remote_storage::GenericRemoteStorage;
7 : use safekeeper_api::membership::Configuration;
8 : use tokio::fs::OpenOptions;
9 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
10 : use tracing::{info, warn};
11 : use utils::id::TenantTimelineId;
12 : use utils::lsn::Lsn;
13 :
14 : use crate::GlobalTimelines;
15 : use crate::control_file::FileStorage;
16 : use crate::state::TimelinePersistentState;
17 : use crate::timeline::{TimelineError, WalResidentTimeline};
18 : use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
19 : use crate::wal_backup::copy_s3_segments;
20 : use crate::wal_storage::{WalReader, wal_file_paths};
21 :
22 : // we don't want to have more than 10 segments on disk after copy, because they take space
23 : const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
24 :
25 : pub struct Request {
26 : pub source_ttid: TenantTimelineId,
27 : pub until_lsn: Lsn,
28 : pub destination_ttid: TenantTimelineId,
29 : }
30 :
31 0 : pub async fn handle_request(
32 0 : request: Request,
33 0 : global_timelines: Arc<GlobalTimelines>,
34 0 : storage: Arc<GenericRemoteStorage>,
35 0 : ) -> Result<()> {
36 : // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
37 : // if LSN will point to the middle of a WAL record, timeline will be in "broken" state
38 :
39 0 : match global_timelines.get(request.destination_ttid) {
40 : // timeline already exists. would be good to check that this timeline is the copy
41 : // of the source timeline, but it isn't obvious how to do that
42 0 : Ok(_) => return Ok(()),
43 : // timeline not found, we are going to create it
44 0 : Err(TimelineError::NotFound(_)) => {}
45 : // error, probably timeline was deleted
46 0 : res => {
47 0 : res?;
48 : }
49 : }
50 :
51 0 : let source = global_timelines.get(request.source_ttid)?;
52 0 : let source_tli = source.wal_residence_guard().await?;
53 :
54 0 : let conf = &global_timelines.get_global_config();
55 0 : let ttid = request.destination_ttid;
56 :
57 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
58 :
59 0 : let (mem_state, state) = source_tli.get_state().await;
60 0 : let start_lsn = state.timeline_start_lsn;
61 0 : if start_lsn == Lsn::INVALID {
62 0 : bail!("timeline is not initialized");
63 0 : }
64 0 : let backup_lsn = mem_state.backup_lsn;
65 :
66 : {
67 0 : let commit_lsn = mem_state.commit_lsn;
68 0 : let flush_lsn = source_tli.get_flush_lsn().await;
69 :
70 0 : info!(
71 0 : "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
72 : start_lsn, backup_lsn, commit_lsn, flush_lsn
73 : );
74 :
75 0 : assert!(backup_lsn >= start_lsn);
76 0 : assert!(commit_lsn >= start_lsn);
77 0 : assert!(flush_lsn >= start_lsn);
78 :
79 0 : if request.until_lsn > flush_lsn {
80 0 : bail!(format!(
81 0 : "requested LSN {} is beyond the end of the timeline {}",
82 : request.until_lsn, flush_lsn
83 : ));
84 0 : }
85 0 : if request.until_lsn < start_lsn {
86 0 : bail!(format!(
87 0 : "requested LSN {} is before the start of the timeline {}",
88 : request.until_lsn, start_lsn
89 : ));
90 0 : }
91 :
92 0 : if request.until_lsn > commit_lsn {
93 0 : warn!("copy_timeline WAL is not fully committed");
94 0 : }
95 :
96 0 : if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG {
97 : // we have a lot of segments that are not backed up. we can try to wait here until
98 : // segments will be backed up to remote storage, but it's not clear how long to wait
99 0 : bail!("too many segments are not backed up");
100 0 : }
101 : }
102 :
103 0 : let wal_seg_size = state.server.wal_seg_size as usize;
104 0 : if wal_seg_size == 0 {
105 0 : bail!("wal_seg_size is not set");
106 0 : }
107 :
108 0 : let first_segment = start_lsn.segment_number(wal_seg_size);
109 0 : let last_segment = request.until_lsn.segment_number(wal_seg_size);
110 :
111 0 : let new_backup_lsn = {
112 : // we can't have new backup_lsn greater than existing backup_lsn or start of the last segment
113 0 : let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64));
114 :
115 0 : if max_backup_lsn <= start_lsn {
116 : // probably we are starting from the first segment, which was not backed up yet.
117 : // note that start_lsn can be in the middle of the segment
118 0 : start_lsn
119 : } else {
120 : // we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up
121 0 : assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0);
122 0 : max_backup_lsn
123 : }
124 : };
125 :
126 : // all previous segments will be copied inside S3
127 0 : let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size);
128 0 : assert!(first_ondisk_segment <= last_segment);
129 0 : assert!(first_ondisk_segment >= first_segment);
130 :
131 0 : copy_s3_segments(
132 0 : &storage,
133 0 : wal_seg_size,
134 0 : &request.source_ttid,
135 0 : &request.destination_ttid,
136 0 : first_segment,
137 0 : first_ondisk_segment,
138 0 : )
139 0 : .await?;
140 :
141 0 : copy_disk_segments(
142 0 : &source_tli,
143 0 : wal_seg_size,
144 0 : new_backup_lsn,
145 0 : request.until_lsn,
146 0 : &tli_dir_path,
147 0 : )
148 0 : .await?;
149 :
150 0 : let mut new_state = TimelinePersistentState::new(
151 0 : &request.destination_ttid,
152 0 : Configuration::empty(),
153 0 : state.server.clone(),
154 0 : start_lsn,
155 0 : request.until_lsn,
156 0 : )?;
157 0 : new_state.timeline_start_lsn = start_lsn;
158 0 : new_state.peer_horizon_lsn = request.until_lsn;
159 0 : new_state.backup_lsn = new_backup_lsn;
160 :
161 0 : FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
162 :
163 : // now we have a ready timeline in a temp directory
164 0 : validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
165 0 : global_timelines
166 0 : .load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
167 0 : .await?;
168 :
169 0 : Ok(())
170 0 : }
171 :
172 0 : async fn copy_disk_segments(
173 0 : tli: &WalResidentTimeline,
174 0 : wal_seg_size: usize,
175 0 : start_lsn: Lsn,
176 0 : end_lsn: Lsn,
177 0 : tli_dir_path: &Utf8PathBuf,
178 0 : ) -> Result<()> {
179 0 : let mut wal_reader = tli.get_walreader(start_lsn).await?;
180 :
181 0 : let mut buf = vec![0u8; MAX_SEND_SIZE];
182 :
183 0 : let first_segment = start_lsn.segment_number(wal_seg_size);
184 0 : let last_segment = end_lsn.segment_number(wal_seg_size);
185 :
186 0 : for segment in first_segment..=last_segment {
187 0 : let segment_start = segment * wal_seg_size as u64;
188 0 : let segment_end = segment_start + wal_seg_size as u64;
189 :
190 0 : let copy_start = segment_start.max(start_lsn.0);
191 0 : let copy_end = segment_end.min(end_lsn.0);
192 :
193 0 : let copy_start = copy_start - segment_start;
194 0 : let copy_end = copy_end - segment_start;
195 :
196 0 : let wal_file_path = {
197 0 : let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size);
198 :
199 0 : if segment == last_segment {
200 0 : partial
201 : } else {
202 0 : normal
203 : }
204 : };
205 :
206 0 : write_segment(
207 0 : &mut buf,
208 0 : &wal_file_path,
209 0 : wal_seg_size as u64,
210 0 : copy_start,
211 0 : copy_end,
212 0 : &mut wal_reader,
213 0 : )
214 0 : .await?;
215 : }
216 :
217 0 : Ok(())
218 0 : }
219 :
220 0 : async fn write_segment(
221 0 : buf: &mut [u8],
222 0 : file_path: &Utf8PathBuf,
223 0 : wal_seg_size: u64,
224 0 : from: u64,
225 0 : to: u64,
226 0 : reader: &mut WalReader,
227 0 : ) -> Result<()> {
228 0 : assert!(from <= to);
229 0 : assert!(to <= wal_seg_size);
230 :
231 : #[allow(clippy::suspicious_open_options)]
232 0 : let mut file = OpenOptions::new()
233 0 : .create(true)
234 0 : .write(true)
235 0 : .open(&file_path)
236 0 : .await?;
237 :
238 : // maybe fill with zeros, as in wal_storage.rs?
239 0 : file.set_len(wal_seg_size).await?;
240 0 : file.seek(std::io::SeekFrom::Start(from)).await?;
241 :
242 0 : let mut bytes_left = to - from;
243 0 : while bytes_left > 0 {
244 0 : let len = bytes_left as usize;
245 0 : let len = len.min(buf.len());
246 0 : let len = reader.read(&mut buf[..len]).await?;
247 0 : file.write_all(&buf[..len]).await?;
248 0 : bytes_left -= len as u64;
249 : }
250 :
251 0 : file.flush().await?;
252 0 : file.sync_all().await?;
253 0 : Ok(())
254 0 : }
|