xapian-core  1.4.25
replication.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2008 Lemur Consulting Ltd
5  * Copyright (C) 2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <config.h>
23 
24 #include "replication.h"
25 
26 #include "xapian/intrusive_ptr.h"
27 #include "xapian/constants.h"
28 #include "xapian/dbfactory.h"
29 #include "xapian/error.h"
30 #include "xapian/version.h"
31 
32 #include "backends/database.h"
34 #include "debuglog.h"
35 #include "filetests.h"
36 #include "fileutils.h"
37 #include "io_utils.h"
38 #include "omassert.h"
39 #include "realtime.h"
40 #include "net/remoteconnection.h"
41 #include "noreturn.h"
42 #include "replicationprotocol.h"
43 #include "safesysstat.h"
44 #include "safeunistd.h"
45 #include "net/length.h"
46 #include "str.h"
48 
49 #include "autoptr.h"
50 #include <cerrno>
51 #include <fstream>
52 #include <string>
53 
54 using namespace std;
55 using namespace Xapian;
56 
57 // The banner comment used at the top of the replica's stub database file.
58 #define REPLICA_STUB_BANNER \
59 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
60 "# Do not manually edit - replication operations may regenerate this file.\n"
61 
62 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
63 static void
65 {
66  throw Xapian::NetworkError("Connection closed unexpectedly");
67 }
68 
69 void
70 DatabaseMaster::write_changesets_to_fd(int fd,
71  const string & start_revision,
72  ReplicationInfo * info) const
73 {
74  LOGCALL_VOID(REPLICA, "DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
75  if (info != NULL)
76  info->clear();
77  Database db;
78  try {
79  db = Database(path);
80  } catch (const Xapian::DatabaseError & e) {
81  RemoteConnection conn(-1, fd);
83  "Can't open database: " + e.get_msg(),
84  0.0);
85  return;
86  }
87  if (db.internal.size() != 1) {
88  throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
89  }
90 
91  // Extract the UUID from start_revision and compare it to the database.
92  bool need_whole_db = false;
93  string revision;
94  if (start_revision.empty()) {
95  need_whole_db = true;
96  } else {
97  const char * ptr = start_revision.data();
98  const char * end = ptr + start_revision.size();
99  size_t uuid_length;
100  decode_length_and_check(&ptr, end, uuid_length);
101  string request_uuid(ptr, uuid_length);
102  ptr += uuid_length;
103  string db_uuid = db.internal[0]->get_uuid();
104  if (request_uuid != db_uuid) {
105  need_whole_db = true;
106  }
107  revision.assign(ptr, end - ptr);
108  }
109 
110  db.internal[0]->write_changesets_to_fd(fd, revision, need_whole_db, info);
111 }
112 
113 string
114 DatabaseMaster::get_description() const
115 {
116  string desc = "DatabaseMaster(";
117  description_append(desc, path);
118  desc += ")";
119  return desc;
120 }
121 
125  void operator=(const Internal &);
126 
128  Internal(const Internal &);
129 
131  string path;
132 
134  int live_id;
135 
141 
148 
152 
156 
159  string offline_uuid;
160 
165 
171 
174 
181  void update_stub_database() const;
182 
184  void remove_offline_db();
185 
188  void apply_db_copy(double end_time);
189 
194  void check_message_type(int type, int expected) const;
195 
202  bool possibly_make_offline_live();
203 
204  string get_replica_path(int id) const {
205  string p = path;
206  p += "/replica_";
207  p += char('0' + id);
208  return p;
209  }
210 
211  public:
213  explicit Internal(const string & path_);
214 
216  ~Internal() { delete conn; }
217 
219  string get_revision_info() const;
220 
222  void set_read_fd(int fd);
223 
225  bool apply_next_changeset(ReplicationInfo * info,
226  double reader_close_time);
227 
229  string get_description() const { return path; }
230 };
231 
232 // Methods of DatabaseReplica
233 
234 DatabaseReplica::DatabaseReplica(const string & path)
235  : internal(new DatabaseReplica::Internal(path))
236 {
237  LOGCALL_CTOR(REPLICA, "DatabaseReplica", path);
238 }
239 
241 {
242  LOGCALL_DTOR(REPLICA, "DatabaseReplica");
243  delete internal;
244 }
245 
246 string
248 {
249  LOGCALL(REPLICA, string, "DatabaseReplica::get_revision_info", NO_ARGS);
251 }
252 
253 void
255 {
256  LOGCALL_VOID(REPLICA, "DatabaseReplica::set_read_fd", fd);
257  internal->set_read_fd(fd);
258 }
259 
260 bool
262  double reader_close_time)
263 {
264  LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
265  if (info != NULL)
266  info->clear();
267  RETURN(internal->apply_next_changeset(info, reader_close_time));
268 }
269 
270 string
272 {
273  string desc("DatabaseReplica(");
274  desc += internal->get_description();
275  desc += ')';
276  return desc;
277 }
278 
279 // Methods of DatabaseReplica::Internal
280 
281 void
283 {
284  string stub_path = path;
285  stub_path += "/XAPIANDB";
286  string tmp_path = stub_path;
287  tmp_path += ".tmp";
288  {
289  ofstream stub(tmp_path.c_str());
290  stub << REPLICA_STUB_BANNER
291  "auto replica_" << live_id << endl;
292  }
293  if (!io_tmp_rename(tmp_path, stub_path)) {
294  string msg("Failed to update stub db file for replica: ");
295  msg += path;
296  throw Xapian::DatabaseOpeningError(msg, errno);
297  }
298 }
299 
301  : path(path_), live_id(0), live_db(), have_offline_db(false),
302  need_copy_next(false), offline_revision(), offline_needed_revision(),
303  last_live_changeset_time(), conn(NULL)
304 {
305  LOGCALL_CTOR(REPLICA, "DatabaseReplica::Internal", path_);
306 #if !defined XAPIAN_HAS_CHERT_BACKEND && !defined XAPIAN_HAS_GLASS_BACKEND
307  throw FeatureUnavailableError("Replication requires the chert or glass backends to be enabled");
308 #else
309  if (mkdir(path.c_str(), 0777) == 0) {
310  // The database doesn't already exist - make a directory, containing a
311  // stub database, and point it to a new database.
312  //
313  // Create an empty database - the backend doesn't matter as if the
314  // master is a different type, then the replica will become that type
315  // automatically.
319  } else {
320  if (errno != EEXIST) {
321  throw DatabaseOpeningError("Couldn't create directory '" + path + "'", errno);
322  }
323  if (!dir_exists(path)) {
324  throw DatabaseOpeningError("Replica path must be a directory");
325  }
326  string stub_path = path;
327  stub_path += "/XAPIANDB";
328  try {
329  live_db = WritableDatabase(stub_path,
331  } catch (const Xapian::DatabaseCorruptError &) {
332  // If the database is too corrupt to open, force a full copy so we
333  // auto-heal from this condition. Instance seen in the wild was
334  // that the replica had all files truncated to size 0.
335  live_db.internal.push_back(NULL);
336  }
337  // FIXME: simplify all this?
338  ifstream stub(stub_path.c_str());
339  string line;
340  while (getline(stub, line)) {
341  if (!line.empty() && line[0] != '#') {
342  live_id = line[line.size() - 1] - '0';
343  break;
344  }
345  }
346  }
347 #endif
348 }
349 
350 string
352 {
353  LOGCALL(REPLICA, string, "DatabaseReplica::Internal::get_revision_info", NO_ARGS);
354  if (live_db.internal.empty())
356  if (live_db.internal.size() != 1)
357  throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
358 
359  if (live_db.internal[0].get() == NULL) RETURN(string());
360 
361  string uuid = (live_db.internal[0])->get_uuid();
362  string buf = encode_length(uuid.size());
363  buf += uuid;
364  buf += (live_db.internal[0])->get_revision_info();
365  RETURN(buf);
366 }
367 
368 void
370 {
371  // Delete the offline database.
373  have_offline_db = false;
374 }
375 
376 void
378 {
379  have_offline_db = true;
381  string offline_path = get_replica_path(live_id ^ 1);
382  // If there's already an offline database, discard it. This happens if one
383  // copy of the database was sent, but further updates were needed before it
384  // could be made live, and the remote end was then unable to send those
385  // updates (probably due to not having changesets available, or the remote
386  // database being replaced by a new database).
387  removedir(offline_path);
388  if (mkdir(offline_path.c_str(), 0777)) {
389  throw Xapian::DatabaseError("Cannot make directory '" +
390  offline_path + "'", errno);
391  }
392 
393  {
394  string buf;
395  int type = conn->get_message(buf, end_time);
397  const char * ptr = buf.data();
398  const char * end = ptr + buf.size();
399  size_t uuid_length;
400  decode_length_and_check(&ptr, end, uuid_length);
401  offline_uuid.assign(ptr, uuid_length);
402  offline_revision.assign(buf, ptr + uuid_length - buf.data(), buf.npos);
403  }
404 
405  // Now, read the files for the database from the connection and create it.
406  while (true) {
407  string filename;
408  int type = conn->sniff_next_message_type(end_time);
409  if (type < 0 || type == REPL_REPLY_FAIL)
410  return;
411  if (type == REPL_REPLY_DB_FOOTER)
412  break;
413 
414  type = conn->get_message(filename, end_time);
416 
417  // Check that the filename doesn't contain '..'. No valid database
418  // file contains .., so we don't need to check that the .. is a path.
419  if (filename.find("..") != string::npos) {
420  throw NetworkError("Filename in database contains '..'");
421  }
422 
423  type = conn->sniff_next_message_type(end_time);
424  if (type < 0 || type == REPL_REPLY_FAIL)
425  return;
426 
427  string filepath = offline_path + "/" + filename;
428  type = conn->receive_file(filepath, end_time);
429  if (type < 0)
432  }
433  int type = conn->get_message(offline_needed_revision, end_time);
435  need_copy_next = false;
436 }
437 
438 void
440 {
441  if (type != expected) {
442  if (type < 0)
444  string m = "Expected replication protocol message type #";
445  m += str(expected);
446  m += ", got #";
447  m += str(type);
448  throw NetworkError(m);
449  }
450 }
451 
452 bool
454 {
455  string replica_path(get_replica_path(live_id ^ 1));
456  AutoPtr<DatabaseReplicator> replicator;
457  try {
458  replicator.reset(DatabaseReplicator::open(replica_path));
459  } catch (const Xapian::DatabaseError &) {
460  return false;
461  }
462  if (offline_needed_revision.empty()) {
463  return false;
464  }
465  if (!replicator->check_revision_at_least(offline_revision,
467  return false;
468  }
469 
470  string replicated_uuid = replicator->get_uuid();
471  if (replicated_uuid.empty()) {
472  return false;
473  }
474 
475  if (replicated_uuid != offline_uuid) {
476  return false;
477  }
478 
479  live_id ^= 1;
480  // Open the database first, so that if there's a problem, an exception
481  // will be thrown before we make the new database live.
482  live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
485  return true;
486 }
487 
488 void
490 {
491  delete conn;
492  conn = NULL;
493  conn = new RemoteConnection(fd, -1);
494 }
495 
496 bool
498  double reader_close_time)
499 {
500  LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
501  while (true) {
502  int type = conn->sniff_next_message_type(0.0);
503  switch (type) {
505  string buf;
506  type = conn->get_message(buf, 0.0);
508  RETURN(false);
509  }
511  // Apply the copy - remove offline db in case of any error.
512  try {
513  apply_db_copy(0.0);
514  if (info != NULL)
515  ++(info->fullcopy_count);
516  string replica_uuid;
517  {
518  AutoPtr<DatabaseReplicator> replicator(
520  replica_uuid = replicator->get_uuid();
521  }
522  if (replica_uuid != offline_uuid) {
524  // We've been sent an database with the wrong uuid,
525  // which only happens if the database at the server
526  // got changed during the copy, so the only safe
527  // action next is a new copy. Set a flag to ensure
528  // that this happens, or we're at risk of database
529  // corruption.
530  need_copy_next = true;
531  }
532  } catch (...) {
534  throw;
535  }
537  if (info != NULL)
538  info->changed = true;
539  }
540  break;
542  if (need_copy_next) {
543  throw NetworkError("Needed a database copy next");
544  }
545  if (!have_offline_db) {
546  // Close the live db.
547  string replica_path(get_replica_path(live_id));
549 
550  if (last_live_changeset_time != 0.0) {
551  // Wait until at least "reader_close_time" seconds have
552  // passed since the last changeset was applied, to
553  // allow any active readers to finish and be reopened.
554  double until;
555  until = last_live_changeset_time + reader_close_time;
556  RealTime::sleep(until);
557  }
558 
559  // Open a replicator for the live path, and apply the
560  // changeset.
561  {
562  AutoPtr<DatabaseReplicator> replicator(
563  DatabaseReplicator::open(replica_path));
564 
565  // Ignore the returned revision number, since we are
566  // live so the changeset must be safe to apply to a
567  // live DB.
568  replicator->apply_changeset_from_conn(*conn, 0.0, true);
569  }
571 
572  if (info != NULL) {
573  ++(info->changeset_count);
574  info->changed = true;
575  }
576  // Now the replicator is closed, open the live db again.
577  live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
578  RETURN(true);
579  }
580 
581  {
582  AutoPtr<DatabaseReplicator> replicator(
584 
585  offline_revision = replicator->
586  apply_changeset_from_conn(*conn, 0.0, false);
587 
588  if (info != NULL) {
589  ++(info->changeset_count);
590  }
591  }
593  if (info != NULL)
594  info->changed = true;
595  }
596  RETURN(true);
597  case REPL_REPLY_FAIL: {
598  string buf;
599  if (conn->get_message(buf, 0.0) < 0)
601  throw NetworkError("Unable to fully synchronise: " + buf);
602  }
603  case -1:
605  default:
606  throw NetworkError("Unknown replication protocol message (" +
607  str(type) + ")");
608  }
609  }
610 }
void check_message_type(int type, int expected) const
Check that a message type is as expected.
Definition: replication.cc:439
The Xapian namespace contains public interfaces for the Xapian library.
Definition: compactor.cc:80
#define RETURN(A)
Definition: debuglog.h:493
Define the XAPIAN_NORETURN macro.
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
RemoteConnection class used by the remote backend.
bool possibly_make_offline_live()
Check if the offline database has reached the required version.
Definition: replication.cc:453
length encoded as a string
int get_message(std::string &result, double end_time)
Read one message from fdin.
This class is used to access a database, or a group of databases.
Definition: database.h:68
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
Definition: replication.cc:254
int fullcopy_count
Number of times a full database copy was performed.
Definition: replication.h:38
Internal(const Internal &)
Don&#39;t allow copying.
const int DB_CREATE
Create a new database.
Definition: constants.h:44
Internal * internal
Internals.
Definition: replication.h:116
InvalidOperationError indicates the API was used in an invalid way.
Definition: error.h:283
string path
The path to the replica directory.
Definition: replication.cc:131
DatabaseOpeningError indicates failure to open a database.
Definition: error.h:581
void apply_db_copy(double end_time)
Apply a set of DB copy messages from the connection.
Definition: replication.cc:377
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
Definition: replication.cc:489
Access to a database replica, for applying replication to it.
Definition: replication.h:114
Constants in the Xapian namespace.
const std::string & get_msg() const
Message giving details of the error, intended for human consumption.
Definition: error.h:122
void sleep(double t)
Sleep until the time represented by this object.
Definition: realtime.h:127
#define LOGCALL_DTOR(CATEGORY, CLASS)
Definition: debuglog.h:490
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
WritableDatabase open()
Construct a WritableDatabase object for a new, empty InMemory database.
Definition: dbfactory.h:104
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Definition: debuglog.h:488
STL namespace.
Convert types to std::string.
string get_description() const
Return a string describing this object.
Definition: replication.cc:229
static void throw_connection_closed_unexpectedly()
Definition: replication.cc:64
#define REPLICA_STUB_BANNER
Definition: replication.cc:58
bool have_offline_db
Do we have an offline database currently?
Definition: replication.cc:147
int revision()
Report the revision of the library which the program is linked with.
Definition: xapian.h:142
Utility functions for testing files.
std::string encode_length(T len)
Encode a length as a variable-length string.
Definition: length.h:36
#define false
Definition: header.h:9
std::vector< Xapian::Internal::intrusive_ptr< Internal > > internal
Definition: database.h:81
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
Definition: replication.cc:497
std::string get_revision_info() const
Get a string describing the current revision of the replica.
Definition: replication.cc:247
include <sys/stat.h> with portability enhancements
string offline_revision
The revision that the secondary database has been updated to.
Definition: replication.cc:155
string get_revision_info() const
Get a string describing the current revision of the replica.
Definition: replication.cc:351
Hierarchy of classes which Xapian can throw as exceptions.
int live_id
The id of the currently live database in the replica (0 or 1).
Definition: replication.cc:134
std::string get_description() const
Return a string describing this object.
Definition: replication.cc:271
Class to manage replication of databases.
void description_append(std::string &desc, const std::string &s)
Definition: unittest.cc:102
const int DB_OPEN
Open an existing database.
Definition: constants.h:50
This class provides read/write access to a database.
Definition: database.h:789
Indicates an attempt to use a feature which is unavailable.
Definition: error.h:719
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
Definition: replication.cc:261
bool need_copy_next
Flag to indicate that the only valid operation next is a full copy.
Definition: replication.cc:151
void remove_offline_db()
Delete the offline database.
Definition: replication.cc:369
string str(int value)
Convert int to std::string.
Definition: str.cc:90
Information about the steps involved in performing a replication.
Definition: replication.h:33
int changeset_count
Number of changesets applied.
Definition: replication.h:35
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Definition: io_utils.cc:271
void update_stub_database() const
Update the stub database which points to a single database.
Definition: replication.cc:282
string get_replica_path(int id) const
Definition: replication.cc:204
Base class for objects managed by intrusive_ptr.
Definition: intrusive_ptr.h:49
#define LOGCALL_CTOR(CATEGORY, CLASS, PARAMS)
Definition: debuglog.h:489
bool dir_exists(const char *path)
Test if a directory exists.
Definition: filetests.h:136
Define preprocessor symbols for the library version.
bool changed
True if and only if the replication corresponds to a change in the live version of the database...
Definition: replication.h:45
size_t size() const
Return number of shards in this Database object.
Definition: database.h:93
DatabaseCorruptError indicates database corruption was detected.
Definition: error.h:409
Append a string to an object description, escaping invalid UTF-8.
string offline_needed_revision
The revision that the secondary database must reach before it can be made live.
Definition: replication.cc:164
Internal implementation of DatabaseReplica.
Definition: replication.cc:123
const int DB_BACKEND_STUB
Open a stub database file.
Definition: constants.h:179
void decode_length_and_check(const char **p, const char *end, unsigned &out)
Decode a length encoded by encode_length.
Definition: length.cc:112
Replication protocol version and message numbers.
Replication support for Xapian databases.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
double now()
Return the current time.
Definition: realtime.h:49
RemoteConnection * conn
The remote connection we&#39;re using.
Definition: replication.cc:173
Indicates a problem communicating with a remote database.
Definition: error.h:803
Wrappers for low-level POSIX I/O routines.
void send_message(char type, const std::string &s, double end_time)
Send a message.
<unistd.h>, but with compat.
void removedir(const string &dirname)
Remove a directory, and its contents.
Definition: fileutils.cc:50
Various assertion macros.
Functions for handling a time or time interval in a double.
DatabaseError indicates some sort of database related error.
Definition: error.h:367
int sniff_next_message_type(double end_time)
Check what the next message type is.
Factory functions for constructing Database and WritableDatabase objects.
string offline_uuid
The UUID of the secondary database.
Definition: replication.cc:159
File and path manipulation routines.
Wrapper around standard unique_ptr template.
Debug logging macros.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:487
WritableDatabase live_db
The live database being replicated.
Definition: replication.cc:140
~DatabaseReplica()
Destructor.
Definition: replication.cc:240
double last_live_changeset_time
The time at which a changeset was last applied to the live database.
Definition: replication.cc:170