xapian-core  1.4.27
chert_databasereplicator.cc
Go to the documentation of this file.
1 
4 /* Copyright 2008 Lemur Consulting Ltd
5  * Copyright 2009,2010,2011,2012,2014,2015,2016 Olly Betts
6  * Copyright 2010 Richard Boulton
7  *
8  * This program is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU General Public License as
10  * published by the Free Software Foundation; either version 2 of the
11  * License, or (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
21  * USA
22  */
23 
24 #include <config.h>
25 
27 
28 #include "xapian/error.h"
29 
30 #include "../flint_lock.h"
31 #include "chert_record.h"
33 #include "chert_types.h"
34 #include "chert_version.h"
35 #include "debuglog.h"
36 #include "fd.h"
37 #include "filetests.h"
38 #include "io_utils.h"
39 #include "noreturn.h"
40 #include "pack.h"
41 #include "posixy_wrapper.h"
42 #include "net/remoteconnection.h"
43 #include "replicate_utils.h"
44 #include "replicationprotocol.h"
45 #include "str.h"
46 #include "stringutils.h"
47 
48 #include <cerrno>
49 #include <cstdlib>
50 
51 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
52 static void
54 {
55  throw Xapian::NetworkError("Connection closed unexpectedly");
56 }
57 
58 using namespace std;
59 using namespace Xapian;
60 
62  : db_dir(db_dir_),
63  max_changesets(0)
64 {
65  const char *p = getenv("XAPIAN_MAX_CHANGESETS");
66  if (p)
67  max_changesets = atoi(p);
68 }
69 
70 bool
72  const string & target) const
73 {
74  LOGCALL(DB, bool, "ChertDatabaseReplicator::check_revision_at_least", rev | target);
75 
77  chert_revision_number_t target_val;
78 
79  const char * ptr = rev.data();
80  const char * end = ptr + rev.size();
81  if (!unpack_uint(&ptr, end, &rev_val)) {
82  throw NetworkError("Invalid revision string supplied to check_revision_at_least");
83  }
84 
85  ptr = target.data();
86  end = ptr + target.size();
87  if (!unpack_uint(&ptr, end, &target_val)) {
88  throw NetworkError("Invalid revision string supplied to check_revision_at_least");
89  }
90 
91  RETURN(rev_val >= target_val);
92 }
93 
94 void
96  string & buf,
97  RemoteConnection & conn,
98  double end_time,
99  int changes_fd) const
100 {
101  const char *ptr = buf.data();
102  const char *end = ptr + buf.size();
103 
104  // Get the letter
105  char letter = ptr[0];
106  if (letter != 'A' && letter != 'B')
107  throw NetworkError("Invalid base file letter in changeset");
108  ++ptr;
109 
110 
111  // Get the base size
112  if (ptr == end)
113  throw NetworkError("Unexpected end of changeset (5)");
114  string::size_type base_size;
115  if (!unpack_uint(&ptr, end, &base_size))
116  throw NetworkError("Invalid base file size in changeset");
117 
118  // Get the new base file into buf.
119  write_and_clear_changes(changes_fd, buf, ptr - buf.data());
120  int res = conn.get_message_chunk(buf, base_size, end_time);
121  if (res <= 0) {
122  if (res < 0)
124  throw NetworkError("Unexpected end of changeset (6)");
125  }
126 
127  // Write base_size bytes from start of buf to base file for tablename
128  string tmp_path = db_dir + "/" + tablename + "tmp";
129  string base_path = db_dir + "/" + tablename + ".base" + letter;
130  int fd = posixy_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 0666);
131  if (fd == -1) {
132  string msg = "Failed to open ";
133  msg += tmp_path;
134  throw DatabaseError(msg, errno);
135  }
136  {
137  FD closer(fd);
138 
139  io_write(fd, buf.data(), base_size);
140  io_sync(fd);
141  }
142 
143  // Finish writing the changeset before moving the base file into place.
144  write_and_clear_changes(changes_fd, buf, base_size);
145 
146  if (!io_tmp_rename(tmp_path, base_path)) {
147  string msg("Couldn't update base file ");
148  msg += tablename;
149  msg += ".base";
150  msg += letter;
151  throw DatabaseError(msg, errno);
152  }
153 }
154 
155 void
157  string & buf,
158  RemoteConnection & conn,
159  double end_time,
160  int changes_fd) const
161 {
162  const char *ptr = buf.data();
163  const char *end = ptr + buf.size();
164 
165  unsigned int changeset_blocksize;
166  if (!unpack_uint(&ptr, end, &changeset_blocksize))
167  throw NetworkError("Invalid blocksize in changeset");
168  write_and_clear_changes(changes_fd, buf, ptr - buf.data());
169 
170  string db_path = db_dir + "/" + tablename + ".DB";
171  int fd = posixy_open(db_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
172  if (fd == -1) {
173  string msg = "Failed to open ";
174  msg += db_path;
175  throw DatabaseError(msg, errno);
176  }
177  {
178  FD closer(fd);
179 
180  while (true) {
181  if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
183 
184  ptr = buf.data();
185  end = ptr + buf.size();
186 
187  uint4 block_number;
188  if (!unpack_uint(&ptr, end, &block_number))
189  throw NetworkError("Invalid block number in changeset");
190  write_and_clear_changes(changes_fd, buf, ptr - buf.data());
191  if (block_number == 0)
192  break;
193  --block_number;
194 
195  int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
196  if (res <= 0) {
197  if (res < 0)
199  throw NetworkError("Incomplete block in changeset");
200  }
201 
202  io_write_block(fd, buf.data(), changeset_blocksize, block_number);
203 
204  write_and_clear_changes(changes_fd, buf, changeset_blocksize);
205  }
206  io_sync(fd);
207  }
208 }
209 
210 string
212  double end_time,
213  bool valid) const
214 {
215  LOGCALL(DB, string, "ChertDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
216 
217  // Lock the database to perform modifications.
218  FlintLock lock(db_dir);
219  string explanation;
220  FlintLock::reason why = lock.lock(true, false, explanation);
221  if (why != FlintLock::SUCCESS) {
222  lock.throw_databaselockerror(why, db_dir, explanation);
223  }
224 
225  int type = conn.get_message_chunked(end_time);
226  if (type < 0)
229 
230  string buf;
231  // Read enough to be certain that we've got the header part of the
232  // changeset.
233 
234  if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
236  // Check the magic string.
237  if (!startswith(buf, CHANGES_MAGIC_STRING)) {
238  throw NetworkError("Invalid ChangeSet magic string");
239  }
240  const char *ptr = buf.data();
241  const char *end = ptr + buf.size();
243 
244  unsigned int changes_version;
245  if (!unpack_uint(&ptr, end, &changes_version))
246  throw NetworkError("Couldn't read a valid version number from changeset");
247  if (changes_version != CHANGES_VERSION)
248  throw NetworkError("Unsupported changeset version");
249 
250  chert_revision_number_t startrev;
252 
253  if (!unpack_uint(&ptr, end, &startrev))
254  throw NetworkError("Couldn't read a valid start revision from changeset");
255  if (!unpack_uint(&ptr, end, &endrev))
256  throw NetworkError("Couldn't read a valid end revision from changeset");
257 
258  if (endrev <= startrev)
259  throw NetworkError("End revision in changeset is not later than start revision");
260 
261  if (ptr == end)
262  throw NetworkError("Unexpected end of changeset (1)");
263 
264  FD changes_fd;
265  string changes_name;
266  if (max_changesets > 0) {
267  changes_fd = create_changeset_file(db_dir, "changes" + str(startrev),
268  changes_name);
269  }
270 
271  if (valid) {
272  // Check the revision number.
273  // If the database was not known to be valid, we cannot
274  // reliably determine its revision number, so must skip this
275  // check.
276  ChertRecordTable record_table(db_dir, true);
277  record_table.open();
278  if (startrev != record_table.get_open_revision_number())
279  throw NetworkError("Changeset supplied is for wrong revision number");
280  }
281 
282  unsigned char changes_type = ptr[0];
283  if (changes_type != 0) {
284  throw NetworkError("Unsupported changeset type: " + str(changes_type));
285  // FIXME - support changes of type 1, produced when DANGEROUS mode is
286  // on.
287  }
288 
289  // Write and clear the bits of the buffer which have been read.
290  write_and_clear_changes(changes_fd, buf, ptr + 1 - buf.data());
291 
292  // Read the items from the changeset.
293  while (true) {
294  if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
296  ptr = buf.data();
297  end = ptr + buf.size();
298 
299  // Read the type of the next chunk of data
300  if (ptr == end)
301  throw NetworkError("Unexpected end of changeset (2)");
302  unsigned char chunk_type = ptr[0];
303  ++ptr;
304  if (chunk_type == 0)
305  break;
306 
307  // Get the tablename.
308  string tablename;
309  if (!unpack_string(&ptr, end, tablename))
310  throw NetworkError("Unexpected end of changeset (3)");
311  if (tablename.empty())
312  throw NetworkError("Missing tablename in changeset");
313  if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
314  tablename.npos)
315  throw NetworkError("Invalid character in tablename in changeset");
316 
317  // Process the chunk
318  if (ptr == end)
319  throw NetworkError("Unexpected end of changeset (4)");
320  write_and_clear_changes(changes_fd, buf, ptr - buf.data());
321 
322  switch (chunk_type) {
323  case 1:
324  process_changeset_chunk_base(tablename, buf, conn, end_time,
325  changes_fd);
326  break;
327  case 2:
328  process_changeset_chunk_blocks(tablename, buf, conn, end_time,
329  changes_fd);
330  break;
331  default:
332  throw NetworkError("Unrecognised item type in changeset");
333  }
334  }
336  if (!unpack_uint(&ptr, end, &reqrev))
337  throw NetworkError("Couldn't read a valid required revision from changeset");
338  if (reqrev < endrev)
339  throw NetworkError("Required revision in changeset is earlier than end revision");
340  if (ptr != end)
341  throw NetworkError("Junk found at end of changeset");
342 
343  write_and_clear_changes(changes_fd, buf, buf.size());
344  pack_uint(buf, reqrev);
345  RETURN(buf);
346 }
347 
348 string
350 {
351  LOGCALL(DB, string, "ChertDatabaseReplicator::get_uuid", NO_ARGS);
352  ChertVersion version_file(db_dir);
353  try {
354  version_file.read_and_check();
355  } catch (const Xapian::DatabaseError &) {
356  RETURN(string());
357  }
358  RETURN(version_file.get_uuid_string());
359 }
The Xapian namespace contains public interfaces for the Xapian library.
Definition: compactor.cc:80
#define RETURN(A)
Definition: debuglog.h:493
void throw_databaselockerror(FlintLock::reason why, const std::string &db_dir, const std::string &explanation) const
Throw Xapian::DatabaseLockError.
Definition: flint_lock.cc:495
Support for chert database replication.
unsigned int max_changesets
The maximum number of changesets which should be kept in the database.
Define the XAPIAN_NORETURN macro.
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
RemoteConnection class used by the remote backend.
void io_write(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
Definition: io_utils.cc:145
Records in chert databases.
static void throw_connection_closed_unexpectedly()
#define AssertEq(A, B)
Definition: omassert.h:124
void read_and_check()
Read the version file and check it&#39;s a version we understand.
XAPIAN_REVISION_TYPE rev
Revision number of a database.
Definition: types.h:133
bool io_sync(int fd)
Ensure all data previously written to file descriptor fd has been written to disk.
Definition: io_utils.h:73
Provides wrappers with POSIXy semantics.
Internal definitions for chert database replication.
A record in a chert database.
Definition: chert_record.h:37
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
STL namespace.
Convert types to std::string.
void open()
Open the btree at the latest revision.
bool check_revision_at_least(const std::string &rev, const std::string &target) const
Virtual methods of DatabaseReplicator.
Utility functions for testing files.
#define O_CLOEXEC
Definition: safefcntl.h:90
std::string db_dir
Path of database.
Types used by chert backend and the Btree manager.
unsigned int chert_revision_number_t
A type used to store a revision number for a table.
Definition: chert_types.h:40
std::string get_uuid_string() const
Return UUID in the standard 36 character string format.
Definition: chert_version.h:61
#define REASONABLE_CHANGESET_SIZE
#define CHANGES_MAGIC_STRING
Hierarchy of classes which Xapian can throw as exceptions.
The ChertVersion class manages the "iamchert" file.
Definition: chert_version.h:34
int create_changeset_file(const string &changeset_dir, const string &filename, string &changes_name)
Create a new changeset file, and return an open fd for writing to it.
uint32_t uint4
Definition: internaltypes.h:32
chert_revision_number_t get_open_revision_number() const
Get the revision number at which this table is currently open.
Definition: chert_table.h:610
Definition: fd.h:30
void process_changeset_chunk_blocks(const std::string &tablename, std::string &buf, RemoteConnection &conn, double end_time, int changes_fd) const
Process a chunk which holds a list of changed blocks in the database.
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
void io_write_block(int fd, const char *p, size_t n, off_t b, off_t o)
Write block b size n bytes from buffer p to file descriptor fd, offset o.
Definition: io_utils.cc:228
string str(int value)
Convert int to std::string.
Definition: str.cc:90
Wrapper class around a file descriptor to avoid leaks.
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Definition: io_utils.cc:271
#define CONST_STRLEN(S)
Returns the length of a string constant.
Definition: stringutils.h:43
void process_changeset_chunk_base(const std::string &tablename, std::string &buf, RemoteConnection &conn, double end_time, int changes_fd) const
Process a chunk which holds a base block.
bool startswith(const std::string &s, char pfx)
Definition: stringutils.h:51
ChertVersion class.
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:382
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Replication protocol version and message numbers.
Utility functions for replication implementations.
Indicates a problem communicating with a remote database.
Definition: error.h:803
Pack types into strings and unpack them again.
Wrappers for low-level POSIX I/O routines.
Various handy helpers which std::string really should provide.
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:413
std::string get_uuid() const
Virtual methods of DatabaseReplicator.
#define posixy_open
reason lock(bool exclusive, bool wait, std::string &explanation)
Attempt to obtain the lock.
Definition: flint_lock.cc:125
#define CHANGES_VERSION
bool unpack_string(const char **p, const char *end, std::string &result)
Decode a std::string from a string.
Definition: pack.h:504
DatabaseError indicates some sort of database related error.
Definition: error.h:367
ChertDatabaseReplicator(const std::string &db_dir_)
Debug logging macros.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:487
std::string apply_changeset_from_conn(RemoteConnection &conn, double end_time, bool valid) const
Virtual methods of DatabaseReplicator.
void write_and_clear_changes(int changes_fd, string &buf, size_t bytes)
Write some changes from a buffer, and then drop them from the buffer.