xapian-core  2.0.0
replication.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2008 Lemur Consulting Ltd
5  * Copyright (C) 2008,2009,2010,2011,2012,2013,2014,2015,2016,2017 Olly Betts
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, see
19  * <https://www.gnu.org/licenses/>.
20  */
21 
22 #include <config.h>
23 
24 #include "replication.h"
25 
26 #include "xapian/intrusive_ptr.h"
27 #include "xapian/constants.h"
28 #include "xapian/dbfactory.h"
29 #include "xapian/error.h"
30 #include "xapian/version.h"
31 
34 #include "debuglog.h"
35 #include "filetests.h"
36 #include "fileutils.h"
37 #include "io_utils.h"
38 #include "omassert.h"
39 #include "pack.h"
40 #include "realtime.h"
41 #include "net/remoteconnection.h"
42 #include "replicationprotocol.h"
43 #include "safesysstat.h"
44 #include "safeunistd.h"
45 #include "str.h"
47 
48 #include <cerrno>
49 #include <fstream>
50 #include <memory>
51 #include <string>
52 
53 using namespace std;
54 using namespace Xapian;
55 
56 // The banner comment used at the top of the replica's stub database file.
57 #define REPLICA_STUB_BANNER \
58 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
59 "# Do not manually edit - replication operations may regenerate this file.\n"
60 
61 [[noreturn]]
62 static void
64 {
65  throw Xapian::NetworkError("Connection closed unexpectedly");
66 }
67 
68 void
69 DatabaseMaster::write_changesets_to_fd(int fd,
70  const string & start_revision,
71  ReplicationInfo * info) const
72 {
73  LOGCALL_VOID(REPLICA, "DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
74  if (info != NULL)
75  info->clear();
76  Database db;
77  try {
78  db = Database(path);
79  } catch (const Xapian::DatabaseError & e) {
80  RemoteConnection conn(-1, fd);
82  "Can't open database: " + e.get_msg(),
83  0.0);
84  return;
85  }
86  if (db.internal->size() != 1) {
87  throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
88  }
89 
90  // Extract the UUID from start_revision and compare it to the database.
91  bool need_whole_db = false;
92  string revision;
93  if (start_revision.empty()) {
94  need_whole_db = true;
95  } else {
96  const char * ptr = start_revision.data();
97  const char * end = ptr + start_revision.size();
98  string request_uuid;
99  if (!unpack_string(&ptr, end, request_uuid)) {
101  }
102  string db_uuid = db.internal->get_uuid();
103  if (request_uuid != db_uuid) {
104  need_whole_db = true;
105  }
106  revision.assign(ptr, end - ptr);
107  }
108 
109  db.internal->write_changesets_to_fd(fd, revision, need_whole_db, info);
110 }
111 
112 string
113 DatabaseMaster::get_description() const
114 {
115  string desc = "DatabaseMaster(";
116  description_append(desc, path);
117  desc += ")";
118  return desc;
119 }
120 
124  void operator=(const Internal &);
125 
127  Internal(const Internal &);
128 
130  string path;
131 
133  int live_id;
134 
140 
142  bool live_db_corrupt = false;
143 
150 
154 
158 
161  string offline_uuid;
162 
167 
173 
176 
183  void update_stub_database() const;
184 
186  void remove_offline_db();
187 
190  void apply_db_copy(double end_time);
191 
196  void check_message_type(int type, int expected) const;
197 
204  bool possibly_make_offline_live();
205 
206  string get_replica_path(int id) const {
207  string p = path;
208  p += "/replica_";
209  p += char('0' + id);
210  return p;
211  }
212 
213  public:
215  explicit Internal(const string & path_);
216 
218  ~Internal() { delete conn; }
219 
221  string get_revision_info() const;
222 
224  void set_read_fd(int fd);
225 
227  bool apply_next_changeset(ReplicationInfo * info,
228  double reader_close_time);
229 
231  string get_description() const { return path; }
232 };
233 
234 // Methods of DatabaseReplica
235 
236 DatabaseReplica::DatabaseReplica(const string & path)
237  : internal(new DatabaseReplica::Internal(path))
238 {
239  LOGCALL_CTOR(REPLICA, "DatabaseReplica", path);
240 }
241 
243 {
244  LOGCALL_DTOR(REPLICA, "DatabaseReplica");
245  delete internal;
246 }
247 
248 string
250 {
251  LOGCALL(REPLICA, string, "DatabaseReplica::get_revision_info", NO_ARGS);
253 }
254 
255 void
257 {
258  LOGCALL_VOID(REPLICA, "DatabaseReplica::set_read_fd", fd);
259  internal->set_read_fd(fd);
260 }
261 
262 bool
264  double reader_close_time)
265 {
266  LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
267  if (info != NULL)
268  info->clear();
269  RETURN(internal->apply_next_changeset(info, reader_close_time));
270 }
271 
272 string
274 {
275  string desc("DatabaseReplica(");
276  desc += internal->get_description();
277  desc += ')';
278  return desc;
279 }
280 
281 // Methods of DatabaseReplica::Internal
282 
283 void
285 {
286  string stub_path = path;
287  stub_path += "/XAPIANDB";
288  string tmp_path = stub_path;
289  tmp_path += ".tmp";
290  {
291  ofstream stub(tmp_path.c_str());
292  stub << REPLICA_STUB_BANNER
293  "auto replica_" << live_id << endl;
294  }
295  if (!io_tmp_rename(tmp_path, stub_path)) {
296  string msg("Failed to update stub db file for replica: ");
297  msg += path;
298  throw Xapian::DatabaseOpeningError(msg, errno);
299  }
300 }
301 
303  : path(path_), live_id(0), live_db(), have_offline_db(false),
304  need_copy_next(false), offline_revision(), offline_needed_revision(),
305  last_live_changeset_time(), conn(NULL)
306 {
307  LOGCALL_CTOR(REPLICA, "DatabaseReplica::Internal", path_);
308 #ifndef XAPIAN_HAS_GLASS_BACKEND
309  throw FeatureUnavailableError("Replication requires the glass backend to be enabled");
310 #else
311  if (mkdir(path.c_str(), 0777) == 0) {
312  // The database doesn't already exist - make a directory, containing a
313  // stub database, and point it to a new database.
314  //
315  // Create an empty database - the backend doesn't matter as if the
316  // master is a different type, then the replica will become that type
317  // automatically.
321  } else {
322  if (errno != EEXIST) {
323  throw DatabaseOpeningError("Couldn't create directory '" + path + "'", errno);
324  }
325  if (!dir_exists(path)) {
326  throw DatabaseOpeningError("Replica path must be a directory");
327  }
328  string stub_path = path;
329  stub_path += "/XAPIANDB";
330  try {
331  live_db = WritableDatabase(stub_path,
333  } catch (const Xapian::DatabaseCorruptError &) {
334  // If the database is too corrupt to open, force a full copy so we
335  // auto-heal from this condition. Instance seen in the wild was
336  // that the replica had all files truncated to size 0.
337  live_db_corrupt = true;
338  }
339  // FIXME: simplify all this?
340  ifstream stub(stub_path.c_str());
341  string line;
342  while (getline(stub, line)) {
343  if (!line.empty() && line[0] != '#') {
344  live_id = line[line.size() - 1] - '0';
345  break;
346  }
347  }
348  }
349 #endif
350 }
351 
352 string
354 {
355  LOGCALL(REPLICA, string, "DatabaseReplica::Internal::get_revision_info", NO_ARGS);
356  if (live_db_corrupt) {
357  RETURN(string());
358  }
359 
360  switch (live_db.internal->size()) {
361  case 0:
362  live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
363  break;
364  case 1:
365  // OK
366  break;
367  default:
368  throw Xapian::InvalidOperationError("DatabaseReplica needs to be "
369  "pointed at exactly one "
370  "subdatabase");
371  }
372 
373  string buf;
374  pack_string(buf, live_db.get_uuid());
375  pack_uint(buf, live_db.get_revision());
376  RETURN(buf);
377 }
378 
379 void
381 {
382  // Delete the offline database.
383  removedir(get_replica_path(live_id ^ 1));
384  have_offline_db = false;
385 }
386 
387 void
389 {
390  have_offline_db = true;
391  last_live_changeset_time = 0;
392  string offline_path = get_replica_path(live_id ^ 1);
393  // If there's already an offline database, discard it. This happens if one
394  // copy of the database was sent, but further updates were needed before it
395  // could be made live, and the remote end was then unable to send those
396  // updates (probably due to not having changesets available, or the remote
397  // database being replaced by a new database).
398  removedir(offline_path);
399  if (mkdir(offline_path.c_str(), 0777)) {
400  throw Xapian::DatabaseError("Cannot make directory '" +
401  offline_path + "'", errno);
402  }
403 
404  {
405  string buf;
406  int type = conn->get_message(buf, end_time);
407  check_message_type(type, REPL_REPLY_DB_HEADER);
408  const char * ptr = buf.data();
409  const char * end = ptr + buf.size();
410  if (!unpack_string(&ptr, end, offline_uuid)) {
412  }
413  offline_revision.assign(ptr, end - ptr);
414  }
415 
416  // Now, read the files for the database from the connection and create it.
417  while (true) {
418  string filename;
419  int type = conn->sniff_next_message_type(end_time);
420  if (type < 0 || type == REPL_REPLY_FAIL)
421  return;
422  if (type == REPL_REPLY_DB_FOOTER)
423  break;
424 
425  type = conn->get_message(filename, end_time);
426  check_message_type(type, REPL_REPLY_DB_FILENAME);
427 
428  // Check that the filename doesn't contain '..'. No valid database
429  // file contains .., so we don't need to check that the .. is a path.
430  if (filename.find("..") != string::npos) {
431  throw NetworkError("Filename in database contains '..'");
432  }
433 
434  type = conn->sniff_next_message_type(end_time);
435  if (type < 0 || type == REPL_REPLY_FAIL)
436  return;
437 
438  string filepath = offline_path + "/" + filename;
439  type = conn->receive_file(filepath, end_time);
440  if (type < 0)
442  check_message_type(type, REPL_REPLY_DB_FILEDATA);
443  }
444  int type = conn->get_message(offline_needed_revision, end_time);
445  check_message_type(type, REPL_REPLY_DB_FOOTER);
446  need_copy_next = false;
447 }
448 
449 void
451 {
452  if (type != expected) {
453  if (type < 0)
455  string m = "Expected replication protocol message type #";
456  m += str(expected);
457  m += ", got #";
458  m += str(type);
459  throw NetworkError(m);
460  }
461 }
462 
463 bool
465 {
466  string replica_path(get_replica_path(live_id ^ 1));
467  unique_ptr<DatabaseReplicator> replicator;
468  try {
469  replicator.reset(DatabaseReplicator::open(replica_path));
470  } catch (const Xapian::DatabaseError &) {
471  return false;
472  }
473  if (offline_needed_revision.empty()) {
474  return false;
475  }
476  if (!replicator->check_revision_at_least(offline_revision,
477  offline_needed_revision)) {
478  return false;
479  }
480 
481  string replicated_uuid = replicator->get_uuid();
482  if (replicated_uuid.empty()) {
483  return false;
484  }
485 
486  if (replicated_uuid != offline_uuid) {
487  return false;
488  }
489 
490  live_id ^= 1;
491  // Open the database first, so that if there's a problem, an exception
492  // will be thrown before we make the new database live.
493  live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
494  live_db_corrupt = false;
495  update_stub_database();
496  remove_offline_db();
497  return true;
498 }
499 
500 void
502 {
503  delete conn;
504  conn = NULL;
505  conn = new RemoteConnection(fd, -1);
506 }
507 
508 bool
510  double reader_close_time)
511 {
512  LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
513  while (true) {
514  int type = conn->sniff_next_message_type(0.0);
515  switch (type) {
517  string buf;
518  type = conn->get_message(buf, 0.0);
519  check_message_type(type, REPL_REPLY_END_OF_CHANGES);
520  RETURN(false);
521  }
523  // Apply the copy - remove offline db in case of any error.
524  try {
525  apply_db_copy(0.0);
526  if (info != NULL)
527  ++(info->fullcopy_count);
528  string replica_uuid;
529  {
530  unique_ptr<DatabaseReplicator> replicator(
531  DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
532  replica_uuid = replicator->get_uuid();
533  }
534  if (replica_uuid != offline_uuid) {
535  remove_offline_db();
536  // We've been sent an database with the wrong uuid,
537  // which only happens if the database at the server
538  // got changed during the copy, so the only safe
539  // action next is a new copy. Set a flag to ensure
540  // that this happens, or we're at risk of database
541  // corruption.
542  need_copy_next = true;
543  }
544  } catch (...) {
545  remove_offline_db();
546  throw;
547  }
548  if (possibly_make_offline_live()) {
549  if (info != NULL)
550  info->changed = true;
551  }
552  break;
554  if (need_copy_next) {
555  throw NetworkError("Needed a database copy next");
556  }
557  if (!have_offline_db) {
558  // Close the live db.
559  string replica_path(get_replica_path(live_id));
560  live_db = WritableDatabase();
561 
562  if (last_live_changeset_time != 0.0) {
563  // Wait until at least "reader_close_time" seconds have
564  // passed since the last changeset was applied, to
565  // allow any active readers to finish and be reopened.
566  double until;
567  until = last_live_changeset_time + reader_close_time;
568  RealTime::sleep(until);
569  }
570 
571  // Open a replicator for the live path, and apply the
572  // changeset.
573  {
574  unique_ptr<DatabaseReplicator> replicator(
575  DatabaseReplicator::open(replica_path));
576 
577  // Ignore the returned revision number, since we are
578  // live so the changeset must be safe to apply to a
579  // live DB.
580  replicator->apply_changeset_from_conn(*conn, 0.0, true);
581  }
582  last_live_changeset_time = RealTime::now();
583 
584  if (info != NULL) {
585  ++(info->changeset_count);
586  info->changed = true;
587  }
588  // Now the replicator is closed, open the live db again.
589  live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
590  live_db_corrupt = false;
591  RETURN(true);
592  }
593 
594  {
595  unique_ptr<DatabaseReplicator> replicator(
596  DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
597 
598  offline_revision = replicator->
599  apply_changeset_from_conn(*conn, 0.0, false);
600 
601  if (info != NULL) {
602  ++(info->changeset_count);
603  }
604  }
605  if (possibly_make_offline_live()) {
606  if (info != NULL)
607  info->changed = true;
608  }
609  RETURN(true);
610  case REPL_REPLY_FAIL: {
611  string buf;
612  if (conn->get_message(buf, 0.0) < 0)
614  throw NetworkError("Unable to fully synchronise: " + buf);
615  }
616  case -1:
618  default:
619  throw NetworkError("Unknown replication protocol message (" +
620  str(type) + ")");
621  }
622  }
623 }
Internal implementation of DatabaseReplica.
Definition: replication.cc:122
void update_stub_database() const
Update the stub database which points to a single database.
Definition: replication.cc:284
bool possibly_make_offline_live()
Check if the offline database has reached the required version.
Definition: replication.cc:464
string path
The path to the replica directory.
Definition: replication.cc:130
double last_live_changeset_time
The time at which a changeset was last applied to the live database.
Definition: replication.cc:172
string offline_needed_revision
The revision that the secondary database must reach before it can be made live.
Definition: replication.cc:166
WritableDatabase live_db
The live database being replicated.
Definition: replication.cc:139
string offline_uuid
The UUID of the secondary database.
Definition: replication.cc:161
Internal(const Internal &)
Don't allow copying.
void apply_db_copy(double end_time)
Apply a set of DB copy messages from the connection.
Definition: replication.cc:388
void check_message_type(int type, int expected) const
Check that a message type is as expected.
Definition: replication.cc:450
void operator=(const Internal &)
Don't allow assignment.
int live_id
The id of the currently live database in the replica (0 or 1).
Definition: replication.cc:133
string get_replica_path(int id) const
Definition: replication.cc:206
bool have_offline_db
Do we have an offline database currently?
Definition: replication.cc:149
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
Definition: replication.cc:509
void remove_offline_db()
Delete the offline database.
Definition: replication.cc:380
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
Definition: replication.cc:501
bool need_copy_next
Flag to indicate that the only valid operation next is a full copy.
Definition: replication.cc:153
string offline_revision
The revision that the secondary database has been updated to.
Definition: replication.cc:157
string get_description() const
Return a string describing this object.
Definition: replication.cc:231
bool live_db_corrupt
Do we need to heal the replica?
Definition: replication.cc:142
RemoteConnection * conn
The remote connection we're using.
Definition: replication.cc:175
string get_revision_info() const
Get a string describing the current revision of the replica.
Definition: replication.cc:353
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
void send_message(char type, std::string_view s, double end_time)
Send a message.
DatabaseCorruptError indicates database corruption was detected.
Definition: error.h:397
DatabaseError indicates some sort of database related error.
Definition: error.h:355
DatabaseOpeningError indicates failure to open a database.
Definition: error.h:569
Access to a database replica, for applying replication to it.
Definition: replication.h:109
~DatabaseReplica()
Destructor.
Definition: replication.cc:242
std::string get_description() const
Return a string describing this object.
Definition: replication.cc:273
Internal * internal
Internals.
Definition: replication.h:111
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
Definition: replication.cc:263
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
Definition: replication.cc:256
std::string get_revision_info() const
Get a string describing the current revision of the replica.
Definition: replication.cc:249
An indexed database of documents.
Definition: database.h:75
Xapian::Internal::intrusive_ptr_nonnull< Internal > internal
Definition: database.h:95
const std::string & get_msg() const noexcept
Message giving details of the error, intended for human consumption.
Definition: error.h:111
Indicates an attempt to use a feature which is unavailable.
Definition: error.h:707
Base class for objects managed by intrusive_ptr.
Definition: intrusive_ptr.h:50
InvalidOperationError indicates the API was used in an invalid way.
Definition: error.h:271
Indicates a problem communicating with a remote database.
Definition: error.h:791
This class provides read/write access to a database.
Definition: database.h:964
Constants in the Xapian namespace.
PositionList * p
Virtual base class for Database internals.
Class to manage replication of databases.
Factory functions for constructing Database and WritableDatabase objects.
Debug logging macros.
#define RETURN(...)
Definition: debuglog.h:484
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:478
#define LOGCALL_CTOR(CATEGORY, CLASS, PARAMS)
Definition: debuglog.h:480
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Definition: debuglog.h:479
#define LOGCALL_DTOR(CATEGORY, CLASS)
Definition: debuglog.h:481
Append a string to an object description, escaping invalid UTF-8.
Hierarchy of classes which Xapian can throw as exceptions.
Utility functions for testing files.
bool dir_exists(const char *path)
Test if a directory exists.
Definition: filetests.h:145
void removedir(const string &dirname)
Remove a directory, and its contents.
Definition: fileutils.cc:50
File and path manipulation routines.
#define false
Definition: header.h:9
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Definition: io_utils.cc:573
Wrappers for low-level POSIX I/O routines.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
double now()
Return the current time.
Definition: realtime.h:49
void sleep(double t)
Sleep until the time represented by this object.
Definition: realtime.h:127
string str(int value)
Convert int to std::string.
Definition: str.cc:91
Database open(std::string_view host, unsigned int port, unsigned timeout=10000, unsigned connect_timeout=10000)
Construct a Database object for read-only access to a remote database accessed via a TCP connection.
The Xapian namespace contains public interfaces for the Xapian library.
Definition: compactor.cc:82
const int DB_CREATE
Create a new database.
Definition: constants.h:43
int revision()
Report the revision of the library which the program is linked with.
Definition: xapian.h:146
const int DB_BACKEND_STUB
Open a stub database file.
Definition: constants.h:166
const int DB_OPEN
Open an existing database.
Definition: constants.h:49
Various assertion macros.
void unpack_throw_serialisation_error(const char *p)
Throw appropriate SerialisationError.
Definition: pack.cc:29
Pack types into strings and unpack them again.
bool unpack_string(const char **p, const char *end, std::string &result)
Decode a std::string from a string.
Definition: pack.h:468
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:315
void pack_string(std::string &s, std::string_view value)
Append an encoded std::string to a string.
Definition: pack.h:442
Functions for handling a time or time interval in a double.
RemoteConnection class used by the remote backend.
#define REPLICA_STUB_BANNER
Definition: replication.cc:57
static void throw_connection_closed_unexpectedly()
Definition: replication.cc:63
Replication support for Xapian databases.
Replication protocol version and message numbers.
@ REPL_REPLY_FAIL
@ REPL_REPLY_DB_FOOTER
@ REPL_REPLY_CHANGESET
@ REPL_REPLY_DB_FILEDATA
@ REPL_REPLY_DB_HEADER
@ REPL_REPLY_DB_FILENAME
@ REPL_REPLY_END_OF_CHANGES
include <sys/stat.h> with portability enhancements
<unistd.h>, but with compat.
Convert types to std::string.
Information about the steps involved in performing a replication.
Definition: replication.h:32
bool changed
True if and only if the replication corresponds to a change in the live version of the database.
Definition: replication.h:44
int fullcopy_count
Number of times a full database copy was performed.
Definition: replication.h:37
int changeset_count
Number of changesets applied.
Definition: replication.h:34
void description_append(std::string &desc, std::string_view s)
Definition: unittest.cc:105
Define preprocessor symbols for the library version.