57 #define REPLICA_STUB_BANNER \
58 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
59 "# Do not manually edit - replication operations may regenerate this file.\n"
69 DatabaseMaster::write_changesets_to_fd(
int fd,
70 const string & start_revision,
73 LOGCALL_VOID(REPLICA,
"DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
82 "Can't open database: " + e.
get_msg(),
91 bool need_whole_db =
false;
93 if (start_revision.empty()) {
96 const char * ptr = start_revision.data();
97 const char * end = ptr + start_revision.size();
102 string db_uuid = db.
internal->get_uuid();
103 if (request_uuid != db_uuid) {
104 need_whole_db =
true;
113 DatabaseMaster::get_description()
const
115 string desc =
"DatabaseMaster(";
142 bool live_db_corrupt =
false;
183 void update_stub_database()
const;
186 void remove_offline_db();
190 void apply_db_copy(
double end_time);
196 void check_message_type(
int type,
int expected)
const;
204 bool possibly_make_offline_live();
215 explicit Internal(
const string & path_);
221 string get_revision_info()
const;
224 void set_read_fd(
int fd);
228 double reader_close_time);
236 DatabaseReplica::DatabaseReplica(
const string & path)
251 LOGCALL(REPLICA,
string,
"DatabaseReplica::get_revision_info", NO_ARGS);
258 LOGCALL_VOID(REPLICA,
"DatabaseReplica::set_read_fd", fd);
259 internal->set_read_fd(fd);
264 double reader_close_time)
266 LOGCALL(REPLICA,
bool,
"DatabaseReplica::apply_next_changeset", info | reader_close_time);
275 string desc(
"DatabaseReplica(");
276 desc +=
internal->get_description();
286 string stub_path =
path;
287 stub_path +=
"/XAPIANDB";
288 string tmp_path = stub_path;
291 ofstream stub(tmp_path.c_str());
293 "auto replica_" <<
live_id << endl;
296 string msg(
"Failed to update stub db file for replica: ");
303 : path(path_), live_id(0), live_db(), have_offline_db(
false),
304 need_copy_next(
false), offline_revision(), offline_needed_revision(),
305 last_live_changeset_time(), conn(NULL)
307 LOGCALL_CTOR(REPLICA,
"DatabaseReplica::Internal", path_);
308 #ifndef XAPIAN_HAS_GLASS_BACKEND
311 if (mkdir(
path.c_str(), 0777) == 0) {
322 if (errno != EEXIST) {
328 string stub_path =
path;
329 stub_path +=
"/XAPIANDB";
340 ifstream stub(stub_path.c_str());
342 while (getline(stub, line)) {
343 if (!line.empty() && line[0] !=
'#') {
344 live_id = line[line.size() - 1] -
'0';
355 LOGCALL(REPLICA,
string,
"DatabaseReplica::Internal::get_revision_info", NO_ARGS);
356 if (live_db_corrupt) {
360 switch (live_db.internal->size()) {
369 "pointed at exactly one "
383 removedir(get_replica_path(live_id ^ 1));
384 have_offline_db =
false;
390 have_offline_db =
true;
391 last_live_changeset_time = 0;
392 string offline_path = get_replica_path(live_id ^ 1);
399 if (mkdir(offline_path.c_str(), 0777)) {
401 offline_path +
"'", errno);
406 int type = conn->get_message(buf,
end_time);
408 const char * ptr = buf.data();
409 const char * end = ptr + buf.size();
413 offline_revision.assign(ptr, end - ptr);
419 int type = conn->sniff_next_message_type(
end_time);
425 type = conn->get_message(filename,
end_time);
430 if (filename.find(
"..") != string::npos) {
431 throw NetworkError(
"Filename in database contains '..'");
434 type = conn->sniff_next_message_type(
end_time);
438 string filepath = offline_path +
"/" + filename;
439 type = conn->receive_file(filepath,
end_time);
444 int type = conn->get_message(offline_needed_revision,
end_time);
446 need_copy_next =
false;
452 if (type != expected) {
455 string m =
"Expected replication protocol message type #";
466 string replica_path(get_replica_path(live_id ^ 1));
467 unique_ptr<DatabaseReplicator> replicator;
473 if (offline_needed_revision.empty()) {
476 if (!replicator->check_revision_at_least(offline_revision,
477 offline_needed_revision)) {
481 string replicated_uuid = replicator->get_uuid();
482 if (replicated_uuid.empty()) {
486 if (replicated_uuid != offline_uuid) {
494 live_db_corrupt =
false;
495 update_stub_database();
510 double reader_close_time)
512 LOGCALL(REPLICA,
bool,
"DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
514 int type = conn->sniff_next_message_type(0.0);
518 type = conn->get_message(buf, 0.0);
530 unique_ptr<DatabaseReplicator> replicator(
532 replica_uuid = replicator->get_uuid();
534 if (replica_uuid != offline_uuid) {
542 need_copy_next =
true;
548 if (possibly_make_offline_live()) {
554 if (need_copy_next) {
557 if (!have_offline_db) {
559 string replica_path(get_replica_path(live_id));
562 if (last_live_changeset_time != 0.0) {
567 until = last_live_changeset_time + reader_close_time;
574 unique_ptr<DatabaseReplicator> replicator(
580 replicator->apply_changeset_from_conn(*conn, 0.0,
true);
590 live_db_corrupt =
false;
595 unique_ptr<DatabaseReplicator> replicator(
598 offline_revision = replicator->
599 apply_changeset_from_conn(*conn, 0.0,
false);
605 if (possibly_make_offline_live()) {
612 if (conn->get_message(buf, 0.0) < 0)
614 throw NetworkError(
"Unable to fully synchronise: " + buf);
619 throw NetworkError(
"Unknown replication protocol message (" +
Internal implementation of DatabaseReplica.
void update_stub_database() const
Update the stub database which points to a single database.
bool possibly_make_offline_live()
Check if the offline database has reached the required version.
string path
The path to the replica directory.
double last_live_changeset_time
The time at which a changeset was last applied to the live database.
string offline_needed_revision
The revision that the secondary database must reach before it can be made live.
WritableDatabase live_db
The live database being replicated.
string offline_uuid
The UUID of the secondary database.
Internal(const Internal &)
Don't allow copying.
void apply_db_copy(double end_time)
Apply a set of DB copy messages from the connection.
void check_message_type(int type, int expected) const
Check that a message type is as expected.
void operator=(const Internal &)
Don't allow assignment.
int live_id
The id of the currently live database in the replica (0 or 1).
string get_replica_path(int id) const
bool have_offline_db
Do we have an offline database currently?
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
void remove_offline_db()
Delete the offline database.
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
bool need_copy_next
Flag to indicate that the only valid operation next is a full copy.
string offline_revision
The revision that the secondary database has been updated to.
string get_description() const
Return a string describing this object.
bool live_db_corrupt
Do we need to heal the replica?
RemoteConnection * conn
The remote connection we're using.
string get_revision_info() const
Get a string describing the current revision of the replica.
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
void send_message(char type, std::string_view s, double end_time)
Send a message.
DatabaseCorruptError indicates database corruption was detected.
DatabaseError indicates some sort of database related error.
DatabaseOpeningError indicates failure to open a database.
Access to a database replica, for applying replication to it.
~DatabaseReplica()
Destructor.
std::string get_description() const
Return a string describing this object.
Internal * internal
Internals.
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
std::string get_revision_info() const
Get a string describing the current revision of the replica.
An indexed database of documents.
Xapian::Internal::intrusive_ptr_nonnull< Internal > internal
const std::string & get_msg() const noexcept
Message giving details of the error, intended for human consumption.
Indicates an attempt to use a feature which is unavailable.
Base class for objects managed by intrusive_ptr.
InvalidOperationError indicates the API was used in an invalid way.
Indicates a problem communicating with a remote database.
This class provides read/write access to a database.
Constants in the Xapian namespace.
Virtual base class for Database internals.
Class to manage replication of databases.
Factory functions for constructing Database and WritableDatabase objects.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
#define LOGCALL_CTOR(CATEGORY, CLASS, PARAMS)
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
#define LOGCALL_DTOR(CATEGORY, CLASS)
Append a string to an object description, escaping invalid UTF-8.
Hierarchy of classes which Xapian can throw as exceptions.
Utility functions for testing files.
bool dir_exists(const char *path)
Test if a directory exists.
void removedir(const string &dirname)
Remove a directory, and its contents.
File and path manipulation routines.
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Wrappers for low-level POSIX I/O routines.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
double now()
Return the current time.
void sleep(double t)
Sleep until the time represented by this object.
string str(int value)
Convert int to std::string.
Database open(std::string_view host, unsigned int port, unsigned timeout=10000, unsigned connect_timeout=10000)
Construct a Database object for read-only access to a remote database accessed via a TCP connection.
The Xapian namespace contains public interfaces for the Xapian library.
const int DB_CREATE
Create a new database.
int revision()
Report the revision of the library which the program is linked with.
const int DB_BACKEND_STUB
Open a stub database file.
const int DB_OPEN
Open an existing database.
Various assertion macros.
void unpack_throw_serialisation_error(const char *p)
Throw appropriate SerialisationError.
Pack types into strings and unpack them again.
bool unpack_string(const char **p, const char *end, std::string &result)
Decode a std::string from a string.
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
void pack_string(std::string &s, std::string_view value)
Append an encoded std::string to a string.
Functions for handling a time or time interval in a double.
RemoteConnection class used by the remote backend.
#define REPLICA_STUB_BANNER
static void throw_connection_closed_unexpectedly()
Replication support for Xapian databases.
Replication protocol version and message numbers.
@ REPL_REPLY_END_OF_CHANGES
include <sys/stat.h> with portability enhancements
<unistd.h>, but with compat.
Convert types to std::string.
Information about the steps involved in performing a replication.
bool changed
True if and only if the replication corresponds to a change in the live version of the database.
int fullcopy_count
Number of times a full database copy was performed.
int changeset_count
Number of changesets applied.
void description_append(std::string &desc, std::string_view s)
Define preprocessor symbols for the library version.