xapian-core  2.0.0
glass_databasereplicator.cc
Go to the documentation of this file.
1 
4 /* Copyright 2008 Lemur Consulting Ltd
5  * Copyright 2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, see
19  * <https://www.gnu.org/licenses/>.
20  */
21 
22 #include <config.h>
23 
25 
26 #include "xapian/error.h"
27 
28 #include "../flint_lock.h"
29 #include "glass_defs.h"
31 #include "glass_version.h"
32 #include "compression_stream.h"
33 #include "debuglog.h"
34 #include "fd.h"
35 #include "internaltypes.h"
36 #include "io_utils.h"
37 #include "pack.h"
38 #include "posixy_wrapper.h"
39 #include "net/remoteconnection.h"
40 #include "replicationprotocol.h"
41 #include "str.h"
42 #include "stringutils.h"
43 
44 #include <algorithm>
45 #include <cerrno>
46 
47 [[noreturn]]
48 static void
50 {
51  throw Xapian::NetworkError("Connection closed unexpectedly");
52 }
53 
54 using namespace std;
55 using namespace Xapian;
56 
57 static const char * dbnames =
58  "/postlist." GLASS_TABLE_EXTENSION "\0"
59  "/docdata." GLASS_TABLE_EXTENSION "\0\0"
60  "/termlist." GLASS_TABLE_EXTENSION "\0"
61  "/position." GLASS_TABLE_EXTENSION "\0"
62  "/spelling." GLASS_TABLE_EXTENSION "\0"
63  "/synonym." GLASS_TABLE_EXTENSION;
64 
66  : db_dir(db_dir_)
67 {
68  std::fill_n(fds, sizeof(fds) / sizeof(fds[0]), -1);
69 }
70 
71 void
73 {
74  for (size_t i = 0; i != Glass::MAX_; ++i) {
75  int fd = fds[i];
76  if (fd >= 0) {
77  io_sync(fd);
78 #if 0 // FIXME: close or keep open?
79  close(fd);
80  fds[i] = -1;
81 #endif
82  }
83  }
84 }
85 
87 {
88  for (size_t i = 0; i != Glass::MAX_; ++i) {
89  int fd = fds[i];
90  if (fd >= 0) {
91  close(fd);
92  }
93  }
94 }
95 
96 bool
98  const string & target) const
99 {
100  LOGCALL(DB, bool, "GlassDatabaseReplicator::check_revision_at_least", rev | target);
101 
102  glass_revision_number_t rev_val;
103  glass_revision_number_t target_val;
104 
105  const char * ptr = rev.data();
106  const char * end = ptr + rev.size();
107  if (!unpack_uint(&ptr, end, &rev_val)) {
108  throw NetworkError("Invalid revision string supplied to check_revision_at_least");
109  }
110 
111  ptr = target.data();
112  end = ptr + target.size();
113  if (!unpack_uint(&ptr, end, &target_val)) {
114  throw NetworkError("Invalid revision string supplied to check_revision_at_least");
115  }
116 
117  RETURN(rev_val >= target_val);
118 }
119 
120 void
122  RemoteConnection & conn,
123  double end_time) const
124 {
125  const char *ptr = buf.data();
126  const char *end = ptr + buf.size();
127 
129  if (!unpack_uint(&ptr, end, &rev))
130  throw NetworkError("Invalid revision in changeset");
131 
132  string::size_type size;
133  if (!unpack_uint(&ptr, end, &size))
134  throw NetworkError("Invalid version file size in changeset");
135 
136  // Get the new version file into buf.
137  buf.erase(0, ptr - buf.data());
138  int res = conn.get_message_chunk(buf, size, end_time);
139  if (res <= 0) {
140  if (res < 0)
142  throw NetworkError("Unexpected end of changeset (6)");
143  }
144 
145  // Write size bytes from start of buf to new version file.
146  string tmpfile = db_dir;
147  tmpfile += "/v.rtmp";
148  int fd = posixy_open(tmpfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 0666);
149  if (fd == -1) {
150  string msg = "Failed to open ";
151  msg += tmpfile;
152  throw DatabaseError(msg, errno);
153  }
154  {
155  FD closer(fd);
156  io_write(fd, buf.data(), size);
157  io_sync(fd);
158  }
159  string version_file = db_dir;
160  version_file += "/iamglass";
161  if (!io_tmp_rename(tmpfile, version_file)) {
162  string msg("Couldn't create new version file ");
163  msg += version_file;
164  throw DatabaseError(msg, errno);
165  }
166 
167  buf.erase(0, size);
168 }
169 
170 void
172  unsigned v,
173  string & buf,
174  RemoteConnection & conn,
175  double end_time) const
176 {
177  const char *ptr = buf.data();
178  const char *end = ptr + buf.size();
179 
180  unsigned int changeset_blocksize = GLASS_MIN_BLOCKSIZE << v;
181  if (changeset_blocksize > 65536 ||
182  (changeset_blocksize & (changeset_blocksize - 1))) {
183  throw NetworkError("Invalid blocksize in changeset");
184  }
185  uint4 block_number;
186  if (!unpack_uint(&ptr, end, &block_number))
187  throw NetworkError("Invalid block number in changeset");
188 
189  buf.erase(0, ptr - buf.data());
190 
191  int fd = fds[table];
192  if (fd == -1) {
193  string db_path = db_dir;
194  db_path += dbnames + table * (11 + CONST_STRLEN(GLASS_TABLE_EXTENSION));
195  fd = posixy_open(db_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
196  if (fd == -1) {
197  string msg = "Failed to open ";
198  msg += db_path;
199  throw DatabaseError(msg, errno);
200  }
201  fds[table] = fd;
202  }
203 
204  int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
205  if (res <= 0) {
206  if (res < 0)
208  throw NetworkError("Unexpected end of changeset (4)");
209  }
210 
211  io_write_block(fd, buf.data(), changeset_blocksize, block_number);
212  buf.erase(0, changeset_blocksize);
213 }
214 
215 string
217  double end_time,
218  bool valid) const
219 {
220  LOGCALL(DB, string, "GlassDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
221 
222  // Lock the database to perform modifications.
223  FlintLock lock(db_dir);
224  string explanation;
225  FlintLock::reason why = lock.lock(true, false, explanation);
226  if (why != FlintLock::SUCCESS) {
227  lock.throw_databaselockerror(why, db_dir, explanation);
228  }
229 
230  int type = conn.get_message_chunked(end_time);
231  if (type < 0)
234 
235  string buf;
236  // Read enough to be certain that we've got the header part of the
237  // changeset.
238 
241  const char *ptr = buf.data();
242  const char *end = ptr + buf.size();
243  // Check the magic string.
244  if (!startswith(buf, CHANGES_MAGIC_STRING)) {
245  throw NetworkError("Invalid ChangeSet magic string");
246  }
248  if (ptr == end)
249  throw NetworkError("Couldn't read a valid version number from changeset");
250  unsigned int changes_version = *ptr++;
251  if (changes_version != CHANGES_VERSION)
252  throw NetworkError("Unsupported changeset version");
253 
254  glass_revision_number_t startrev;
256 
257  if (!unpack_uint(&ptr, end, &startrev))
258  throw NetworkError("Couldn't read a valid start revision from changeset");
259  if (!unpack_uint(&ptr, end, &endrev))
260  throw NetworkError("Couldn't read a valid end revision from changeset");
261 
262  if (endrev <= startrev)
263  throw NetworkError("End revision in changeset is not later than start revision");
264 
265  if (ptr == end)
266  throw NetworkError("Unexpected end of changeset (1)");
267 
268  if (valid) {
269  // Check the revision number.
270  // If the database was not known to be valid, we cannot
271  // reliably determine its revision number, so must skip this
272  // check.
273  GlassVersion version_file(db_dir);
274  version_file.read();
275  if (startrev != version_file.get_revision())
276  throw NetworkError("Changeset supplied is for wrong revision number");
277  }
278 
279  unsigned char changes_type = *ptr++;
280  if (changes_type != 0) {
281  throw NetworkError("Unsupported changeset type: " + str(changes_type));
282  // FIXME - support changes of type 1, produced when DANGEROUS mode is
283  // on.
284  }
285 
286  // Clear the bits of the buffer which have been read.
287  buf.erase(0, ptr - buf.data());
288 
289  // Read the items from the changeset.
290  while (true) {
293  ptr = buf.data();
294  end = ptr + buf.size();
295  if (ptr == end)
296  throw NetworkError("Unexpected end of changeset (3)");
297 
298  // Read the type of the next chunk of data
299  // chunk type can be (in binary):
300  //
301  // 11111111 - last chunk
302  // 11111110 - version file
303  // 00BBBTTT - table block:
304  // Block size = (GLASS_MIN_BLOCKSIZE<<BBB) BBB=0..5
305  // Table TTT=0..(Glass::MAX_-1)
306  unsigned char chunk_type = *ptr++;
307  if (chunk_type == 0xff)
308  break;
309  if (chunk_type == 0xfe) {
310  // Version file.
311  buf.erase(0, ptr - buf.data());
313  continue;
314  }
315  size_t table_code = (chunk_type & 0x07);
316  if (table_code >= Glass::MAX_)
317  throw NetworkError("Bad table code in changeset file");
318  Glass::table_type table = static_cast<Glass::table_type>(table_code);
319  unsigned char v = (chunk_type >> 3) & 0x0f;
320 
321  // Process the chunk
322  buf.erase(0, ptr - buf.data());
323  process_changeset_chunk_blocks(table, v, buf, conn, end_time);
324  }
325 
326  if (ptr != end)
327  throw NetworkError("Junk found at end of changeset");
328 
329  buf.resize(0);
330  pack_uint(buf, endrev);
331 
332  commit();
333 
334  RETURN(buf);
335 }
336 
337 string
339 {
340  LOGCALL(DB, string, "GlassDatabaseReplicator::get_uuid", NO_ARGS);
341  GlassVersion version_file(db_dir);
342  try {
343  version_file.read();
344  } catch (const Xapian::DatabaseError &) {
345  RETURN(string());
346  }
347  RETURN(version_file.get_uuid_string());
348 }
Definition: fd.h:30
reason lock(bool exclusive, bool wait, std::string &explanation)
Attempt to obtain the lock.
Definition: flint_lock.cc:124
void throw_databaselockerror(FlintLock::reason why, const std::string &db_dir, const std::string &explanation) const
Throw Xapian::DatabaseLockError.
Definition: flint_lock.cc:494
void process_changeset_chunk_blocks(Glass::table_type table, unsigned v, std::string &buf, RemoteConnection &conn, double end_time) const
Process a chunk which holds a list of changed blocks in the database.
std::string apply_changeset_from_conn(RemoteConnection &conn, double end_time, bool valid) const
Read and apply the next changeset.
bool check_revision_at_least(const std::string &rev, const std::string &target) const
Virtual methods of DatabaseReplicator.
std::string get_uuid() const
Get a UUID for the replica.
void process_changeset_chunk_version(std::string &buf, RemoteConnection &conn, double end_time) const
Process a chunk which holds a version file.
int fds[Glass::MAX_]
File descriptors for writing to each table.
GlassDatabaseReplicator(const std::string &db_dir_)
std::string db_dir
Path of database.
The GlassVersion class manages the revision files.
Definition: glass_version.h:96
glass_revision_number_t get_revision() const
std::string get_uuid_string() const
Return UUID in the standard 36 character string format.
void read()
Read the version file and check it's a version we understand.
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
DatabaseError indicates some sort of database related error.
Definition: error.h:355
Indicates a problem communicating with a remote database.
Definition: error.h:791
class wrapper around zlib
Debug logging macros.
#define RETURN(...)
Definition: debuglog.h:484
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:478
Hierarchy of classes which Xapian can throw as exceptions.
Wrapper class around a file descriptor to avoid leaks.
int close(FD &fd)
Definition: fd.h:63
static void throw_connection_closed_unexpectedly()
static const char * dbnames
Support for glass database replication.
Definitions, types, etc for use inside glass.
#define GLASS_MIN_BLOCKSIZE
Minimum B-tree block size.
Definition: glass_defs.h:33
uint4 glass_revision_number_t
The revision number of a glass database.
Definition: glass_defs.h:68
#define GLASS_TABLE_EXTENSION
Glass table extension.
Definition: glass_defs.h:27
Internal definitions for glass database replication.
#define CHANGES_MAGIC_STRING
#define REASONABLE_CHANGESET_SIZE
#define CHANGES_VERSION
GlassVersion class.
Types used internally.
uint32_t uint4
Definition: internaltypes.h:31
void io_write(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
Definition: io_utils.cc:263
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Definition: io_utils.cc:573
void io_write_block(int fd, const char *p, size_t n, off_t b, off_t o)
Write block b size n bytes from buffer p to file descriptor fd, offset o.
Definition: io_utils.cc:507
Wrappers for low-level POSIX I/O routines.
bool io_sync(int fd)
Ensure all data previously written to file descriptor fd has been written to disk.
Definition: io_utils.h:107
table_type
Definition: glass_defs.h:53
@ MAX_
Definition: glass_defs.h:60
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
string str(int value)
Convert int to std::string.
Definition: str.cc:91
The Xapian namespace contains public interfaces for the Xapian library.
Definition: compactor.cc:82
XAPIAN_REVISION_TYPE rev
Revision number of a database.
Definition: types.h:108
#define AssertEq(A, B)
Definition: omassert.h:124
Pack types into strings and unpack them again.
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:346
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:315
Provides wrappers with POSIXy semantics.
#define posixy_open
RemoteConnection class used by the remote backend.
Replication protocol version and message numbers.
@ REPL_REPLY_CHANGESET
#define O_CLOEXEC
Definition: safefcntl.h:89
Convert types to std::string.
Various handy string-related helpers.
#define CONST_STRLEN(S)
Returns the length of a string constant.
Definition: stringutils.h:48
bool startswith(std::string_view s, char pfx)
Definition: stringutils.h:56