xapian-core  1.4.25
glass_databasereplicator.cc
Go to the documentation of this file.
1 
4 /* Copyright 2008 Lemur Consulting Ltd
5  * Copyright 2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20  * USA
21  */
22 
23 #include <config.h>
24 
26 
27 #include "xapian/error.h"
28 
29 #include "../flint_lock.h"
30 #include "glass_defs.h"
32 #include "glass_version.h"
33 #include "compression_stream.h"
34 #include "debuglog.h"
35 #include "fd.h"
36 #include "internaltypes.h"
37 #include "io_utils.h"
38 #include "noreturn.h"
39 #include "pack.h"
40 #include "posixy_wrapper.h"
41 #include "net/remoteconnection.h"
42 #include "replicationprotocol.h"
43 #include "str.h"
44 #include "stringutils.h"
45 
46 #include <algorithm>
47 #include <cerrno>
48 
49 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
50 static void
52 {
53  throw Xapian::NetworkError("Connection closed unexpectedly");
54 }
55 
56 using namespace std;
57 using namespace Xapian;
58 
59 static const char * dbnames =
60  "/postlist." GLASS_TABLE_EXTENSION "\0"
61  "/docdata." GLASS_TABLE_EXTENSION "\0\0"
62  "/termlist." GLASS_TABLE_EXTENSION "\0"
63  "/position." GLASS_TABLE_EXTENSION "\0"
64  "/spelling." GLASS_TABLE_EXTENSION "\0"
65  "/synonym." GLASS_TABLE_EXTENSION;
66 
68  : db_dir(db_dir_)
69 {
70  std::fill_n(fds, sizeof(fds) / sizeof(fds[0]), -1);
71 }
72 
73 void
75 {
76  for (size_t i = 0; i != Glass::MAX_; ++i) {
77  int fd = fds[i];
78  if (fd >= 0) {
79  io_sync(fd);
80 #if 0 // FIXME: close or keep open?
81  close(fd);
82  fds[i] = -1;
83 #endif
84  }
85  }
86 }
87 
89 {
90  for (size_t i = 0; i != Glass::MAX_; ++i) {
91  int fd = fds[i];
92  if (fd >= 0) {
93  close(fd);
94  }
95  }
96 }
97 
98 bool
100  const string & target) const
101 {
102  LOGCALL(DB, bool, "GlassDatabaseReplicator::check_revision_at_least", rev | target);
103 
104  glass_revision_number_t rev_val;
105  glass_revision_number_t target_val;
106 
107  const char * ptr = rev.data();
108  const char * end = ptr + rev.size();
109  if (!unpack_uint(&ptr, end, &rev_val)) {
110  throw NetworkError("Invalid revision string supplied to check_revision_at_least");
111  }
112 
113  ptr = target.data();
114  end = ptr + target.size();
115  if (!unpack_uint(&ptr, end, &target_val)) {
116  throw NetworkError("Invalid revision string supplied to check_revision_at_least");
117  }
118 
119  RETURN(rev_val >= target_val);
120 }
121 
122 void
124  RemoteConnection & conn,
125  double end_time) const
126 {
127  const char *ptr = buf.data();
128  const char *end = ptr + buf.size();
129 
131  if (!unpack_uint(&ptr, end, &rev))
132  throw NetworkError("Invalid revision in changeset");
133 
134  string::size_type size;
135  if (!unpack_uint(&ptr, end, &size))
136  throw NetworkError("Invalid version file size in changeset");
137 
138  // Get the new version file into buf.
139  buf.erase(0, ptr - buf.data());
140  int res = conn.get_message_chunk(buf, size, end_time);
141  if (res <= 0) {
142  if (res < 0)
144  throw NetworkError("Unexpected end of changeset (6)");
145  }
146 
147  // Write size bytes from start of buf to new version file.
148  string tmpfile = db_dir;
149  tmpfile += "/v.rtmp";
150  int fd = posixy_open(tmpfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 0666);
151  if (fd == -1) {
152  string msg = "Failed to open ";
153  msg += tmpfile;
154  throw DatabaseError(msg, errno);
155  }
156  {
157  FD closer(fd);
158  io_write(fd, buf.data(), size);
159  io_sync(fd);
160  }
161  string version_file = db_dir;
162  version_file += "/iamglass";
163  if (!io_tmp_rename(tmpfile, version_file)) {
164  string msg("Couldn't create new version file ");
165  msg += version_file;
166  throw DatabaseError(msg, errno);
167  }
168 
169  buf.erase(0, size);
170 }
171 
172 void
174  unsigned v,
175  string & buf,
176  RemoteConnection & conn,
177  double end_time) const
178 {
179  const char *ptr = buf.data();
180  const char *end = ptr + buf.size();
181 
182  unsigned int changeset_blocksize = 2048 << v;
183  if (changeset_blocksize > 65536 ||
184  (changeset_blocksize & (changeset_blocksize - 1))) {
185  throw NetworkError("Invalid blocksize in changeset");
186  }
187  uint4 block_number;
188  if (!unpack_uint(&ptr, end, &block_number))
189  throw NetworkError("Invalid block number in changeset");
190 
191  buf.erase(0, ptr - buf.data());
192 
193  int fd = fds[table];
194  if (fd == -1) {
195  string db_path = db_dir;
196  db_path += dbnames + table * (11 + CONST_STRLEN(GLASS_TABLE_EXTENSION));
197  fd = posixy_open(db_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
198  if (fd == -1) {
199  string msg = "Failed to open ";
200  msg += db_path;
201  throw DatabaseError(msg, errno);
202  }
203  fds[table] = fd;
204  }
205 
206  int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
207  if (res <= 0) {
208  if (res < 0)
210  throw NetworkError("Unexpected end of changeset (4)");
211  }
212 
213  io_write_block(fd, buf.data(), changeset_blocksize, block_number);
214  buf.erase(0, changeset_blocksize);
215 }
216 
217 string
219  double end_time,
220  bool valid) const
221 {
222  LOGCALL(DB, string, "GlassDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
223 
224  // Lock the database to perform modifications.
225  FlintLock lock(db_dir);
226  string explanation;
227  FlintLock::reason why = lock.lock(true, false, explanation);
228  if (why != FlintLock::SUCCESS) {
229  lock.throw_databaselockerror(why, db_dir, explanation);
230  }
231 
232  int type = conn.get_message_chunked(end_time);
233  if (type < 0)
236 
237  string buf;
238  // Read enough to be certain that we've got the header part of the
239  // changeset.
240 
241  if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
243  const char *ptr = buf.data();
244  const char *end = ptr + buf.size();
245  // Check the magic string.
246  if (!startswith(buf, CHANGES_MAGIC_STRING)) {
247  throw NetworkError("Invalid ChangeSet magic string");
248  }
250  if (ptr == end)
251  throw NetworkError("Couldn't read a valid version number from changeset");
252  unsigned int changes_version = *ptr++;
253  if (changes_version != CHANGES_VERSION)
254  throw NetworkError("Unsupported changeset version");
255 
256  glass_revision_number_t startrev;
258 
259  if (!unpack_uint(&ptr, end, &startrev))
260  throw NetworkError("Couldn't read a valid start revision from changeset");
261  if (!unpack_uint(&ptr, end, &endrev))
262  throw NetworkError("Couldn't read a valid end revision from changeset");
263 
264  if (endrev <= startrev)
265  throw NetworkError("End revision in changeset is not later than start revision");
266 
267  if (ptr == end)
268  throw NetworkError("Unexpected end of changeset (1)");
269 
270  if (valid) {
271  // Check the revision number.
272  // If the database was not known to be valid, we cannot
273  // reliably determine its revision number, so must skip this
274  // check.
275  GlassVersion version_file(db_dir);
276  version_file.read();
277  if (startrev != version_file.get_revision())
278  throw NetworkError("Changeset supplied is for wrong revision number");
279  }
280 
281  unsigned char changes_type = *ptr++;
282  if (changes_type != 0) {
283  throw NetworkError("Unsupported changeset type: " + str(changes_type));
284  // FIXME - support changes of type 1, produced when DANGEROUS mode is
285  // on.
286  }
287 
288  // Clear the bits of the buffer which have been read.
289  buf.erase(0, ptr - buf.data());
290 
291  // Read the items from the changeset.
292  while (true) {
293  if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
295  ptr = buf.data();
296  end = ptr + buf.size();
297  if (ptr == end)
298  throw NetworkError("Unexpected end of changeset (3)");
299 
300  // Read the type of the next chunk of data
301  // chunk type can be (in binary):
302  //
303  // 11111111 - last chunk
304  // 11111110 - version file
305  // 00BBBTTT - table block:
306  // Block size = (2048<<BBB) BBB=0..5; Table TTT=0..(Glass::MAX_-1)
307  unsigned char chunk_type = *ptr++;
308  if (chunk_type == 0xff)
309  break;
310  if (chunk_type == 0xfe) {
311  // Version file.
312  buf.erase(0, ptr - buf.data());
313  process_changeset_chunk_version(buf, conn, end_time);
314  continue;
315  }
316  size_t table_code = (chunk_type & 0x07);
317  if (table_code >= Glass::MAX_)
318  throw NetworkError("Bad table code in changeset file");
319  Glass::table_type table = static_cast<Glass::table_type>(table_code);
320  unsigned char v = (chunk_type >> 3) & 0x0f;
321 
322  // Process the chunk
323  buf.erase(0, ptr - buf.data());
324  process_changeset_chunk_blocks(table, v, buf, conn, end_time);
325  }
326 
327  if (ptr != end)
328  throw NetworkError("Junk found at end of changeset");
329 
330  buf.resize(0);
331  pack_uint(buf, endrev);
332 
333  commit();
334 
335  RETURN(buf);
336 }
337 
338 string
340 {
341  LOGCALL(DB, string, "GlassDatabaseReplicator::get_uuid", NO_ARGS);
342  GlassVersion version_file(db_dir);
343  try {
344  version_file.read();
345  } catch (const Xapian::DatabaseError &) {
346  RETURN(string());
347  }
348  RETURN(version_file.get_uuid_string());
349 }
The Xapian namespace contains public interfaces for the Xapian library.
Definition: compactor.cc:80
int close(FD &fd)
Definition: fd.h:63
#define RETURN(A)
Definition: debuglog.h:493
void throw_databaselockerror(FlintLock::reason why, const std::string &db_dir, const std::string &explanation) const
Throw Xapian::DatabaseLockError.
Definition: flint_lock.cc:499
Define the XAPIAN_NORETURN macro.
GlassVersion class.
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
RemoteConnection class used by the remote backend.
void io_write(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
Definition: io_utils.cc:145
bool check_revision_at_least(const std::string &rev, const std::string &target) const
Virtual methods of DatabaseReplicator.
#define AssertEq(A, B)
Definition: omassert.h:124
XAPIAN_REVISION_TYPE rev
Revision number of a database.
Definition: types.h:133
table_type
Definition: glass_defs.h:53
void process_changeset_chunk_blocks(Glass::table_type table, unsigned v, std::string &buf, RemoteConnection &conn, double end_time) const
Process a chunk which holds a list of changed blocks in the database.
Internal definitions for glass database replication.
uint4 glass_revision_number_t
The revision number of a glass database.
Definition: glass_defs.h:68
bool io_sync(int fd)
Ensure all data previously written to file descriptor fd has been written to disk.
Definition: io_utils.h:73
Provides wrappers with POSIXy semantics.
The GlassVersion class manages the revision files.
Definition: glass_version.h:94
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
int fds[Glass::MAX_]
File descriptors for writing to each table.
static const char * dbnames
STL namespace.
Definitions, types, etc for use inside glass.
Convert types to std::string.
#define O_CLOEXEC
Definition: safefcntl.h:90
std::string db_dir
Path of database.
#define GLASS_TABLE_EXTENSION
Glass table extension.
Definition: glass_defs.h:27
GlassDatabaseReplicator(const std::string &db_dir_)
#define REASONABLE_CHANGESET_SIZE
#define CHANGES_MAGIC_STRING
Hierarchy of classes which Xapian can throw as exceptions.
class wrapper around zlib
uint32_t uint4
Definition: internaltypes.h:32
std::string apply_changeset_from_conn(RemoteConnection &conn, double end_time, bool valid) const
Virtual methods of DatabaseReplicator.
Definition: fd.h:30
static void throw_connection_closed_unexpectedly()
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
void io_write_block(int fd, const char *p, size_t n, off_t b, off_t o)
Write block b size n bytes from buffer p to file descriptor fd, offset o.
Definition: io_utils.cc:228
string str(int value)
Convert int to std::string.
Definition: str.cc:90
Wrapper class around a file descriptor to avoid leaks.
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Definition: io_utils.cc:271
#define CONST_STRLEN(S)
Returns the length of a string constant.
Definition: stringutils.h:43
bool startswith(const std::string &s, char pfx)
Definition: stringutils.h:51
void read()
Read the version file and check it&#39;s a version we understand.
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:382
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Replication protocol version and message numbers.
Indicates a problem communicating with a remote database.
Definition: error.h:803
Pack types into strings and unpack them again.
Wrappers for low-level POSIX I/O routines.
Various handy helpers which std::string really should provide.
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:413
#define posixy_open
std::string get_uuid() const
Virtual methods of DatabaseReplicator.
reason lock(bool exclusive, bool wait, std::string &explanation)
Attempt to obtain the lock.
Definition: flint_lock.cc:125
Support for glass database replication.
#define CHANGES_VERSION
void process_changeset_chunk_version(std::string &buf, RemoteConnection &conn, double end_time) const
Process a chunk which holds a version file.
DatabaseError indicates some sort of database related error.
Definition: error.h:367
glass_revision_number_t get_revision() const
std::string get_uuid_string() const
Return UUID in the standard 36 character string format.
Types used internally.
Debug logging macros.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:487