58 #define REPLICA_STUB_BANNER \
59 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
60 "# Do not manually edit - replication operations may regenerate this file.\n"
70 DatabaseMaster::write_changesets_to_fd(
int fd,
71 const string & start_revision,
74 LOGCALL_VOID(REPLICA,
"DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
83 "Can't open database: " + e.
get_msg(),
92 bool need_whole_db =
false;
94 if (start_revision.empty()) {
97 const char * ptr = start_revision.data();
98 const char * end = ptr + start_revision.size();
101 string request_uuid(ptr, uuid_length);
103 string db_uuid = db.
internal[0]->get_uuid();
104 if (request_uuid != db_uuid) {
105 need_whole_db =
true;
114 DatabaseMaster::get_description()
const
116 string desc =
"DatabaseMaster(";
181 void update_stub_database()
const;
184 void remove_offline_db();
188 void apply_db_copy(
double end_time);
194 void check_message_type(
int type,
int expected)
const;
202 bool possibly_make_offline_live();
213 explicit Internal(
const string & path_);
219 string get_revision_info()
const;
222 void set_read_fd(
int fd);
226 double reader_close_time);
234 DatabaseReplica::DatabaseReplica(
const string & path)
249 LOGCALL(REPLICA,
string,
"DatabaseReplica::get_revision_info", NO_ARGS);
256 LOGCALL_VOID(REPLICA,
"DatabaseReplica::set_read_fd", fd);
257 internal->set_read_fd(fd);
262 double reader_close_time)
264 LOGCALL(REPLICA,
bool,
"DatabaseReplica::apply_next_changeset", info | reader_close_time);
273 string desc(
"DatabaseReplica(");
274 desc +=
internal->get_description();
284 string stub_path =
path;
285 stub_path +=
"/XAPIANDB";
286 string tmp_path = stub_path;
289 ofstream stub(tmp_path.c_str());
291 "auto replica_" <<
live_id << endl;
294 string msg(
"Failed to update stub db file for replica: ");
301 : path(path_), live_id(0), live_db(), have_offline_db(
false),
302 need_copy_next(
false), offline_revision(), offline_needed_revision(),
303 last_live_changeset_time(), conn(NULL)
305 LOGCALL_CTOR(REPLICA,
"DatabaseReplica::Internal", path_);
306 #if !defined XAPIAN_HAS_CHERT_BACKEND && !defined XAPIAN_HAS_GLASS_BACKEND
309 if (mkdir(
path.c_str(), 0777) == 0) {
320 if (errno != EEXIST) {
326 string stub_path =
path;
327 stub_path +=
"/XAPIANDB";
338 ifstream stub(stub_path.c_str());
340 while (getline(stub, line)) {
341 if (!line.empty() && line[0] !=
'#') {
342 live_id = line[line.size() - 1] -
'0';
353 LOGCALL(REPLICA,
string,
"DatabaseReplica::Internal::get_revision_info", NO_ARGS);
354 if (live_db.internal.empty())
356 if (live_db.internal.size() != 1)
359 if (live_db.internal[0].get() == NULL)
RETURN(
string());
361 string uuid = (live_db.internal[0])->get_uuid();
364 buf += (live_db.internal[0])->get_revision_info();
372 removedir(get_replica_path(live_id ^ 1));
373 have_offline_db =
false;
379 have_offline_db =
true;
380 last_live_changeset_time = 0;
381 string offline_path = get_replica_path(live_id ^ 1);
388 if (mkdir(offline_path.c_str(), 0777)) {
390 offline_path +
"'", errno);
395 int type = conn->get_message(buf,
end_time);
397 const char * ptr = buf.data();
398 const char * end = ptr + buf.size();
401 offline_uuid.assign(ptr, uuid_length);
402 offline_revision.assign(buf, ptr + uuid_length - buf.data(), buf.npos);
408 int type = conn->sniff_next_message_type(
end_time);
414 type = conn->get_message(filename,
end_time);
419 if (filename.find(
"..") != string::npos) {
420 throw NetworkError(
"Filename in database contains '..'");
423 type = conn->sniff_next_message_type(
end_time);
427 string filepath = offline_path +
"/" + filename;
428 type = conn->receive_file(filepath,
end_time);
433 int type = conn->get_message(offline_needed_revision,
end_time);
435 need_copy_next =
false;
441 if (type != expected) {
444 string m =
"Expected replication protocol message type #";
455 string replica_path(get_replica_path(live_id ^ 1));
456 AutoPtr<DatabaseReplicator> replicator;
462 if (offline_needed_revision.empty()) {
465 if (!replicator->check_revision_at_least(offline_revision,
466 offline_needed_revision)) {
470 string replicated_uuid = replicator->get_uuid();
471 if (replicated_uuid.empty()) {
475 if (replicated_uuid != offline_uuid) {
483 update_stub_database();
498 double reader_close_time)
500 LOGCALL(REPLICA,
bool,
"DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
502 int type = conn->sniff_next_message_type(0.0);
506 type = conn->get_message(buf, 0.0);
518 AutoPtr<DatabaseReplicator> replicator(
520 replica_uuid = replicator->get_uuid();
522 if (replica_uuid != offline_uuid) {
530 need_copy_next =
true;
536 if (possibly_make_offline_live()) {
542 if (need_copy_next) {
545 if (!have_offline_db) {
547 string replica_path(get_replica_path(live_id));
550 if (last_live_changeset_time != 0.0) {
555 until = last_live_changeset_time + reader_close_time;
562 AutoPtr<DatabaseReplicator> replicator(
568 replicator->apply_changeset_from_conn(*conn, 0.0,
true);
582 AutoPtr<DatabaseReplicator> replicator(
585 offline_revision = replicator->
586 apply_changeset_from_conn(*conn, 0.0,
false);
592 if (possibly_make_offline_live()) {
599 if (conn->get_message(buf, 0.0) < 0)
601 throw NetworkError(
"Unable to fully synchronise: " + buf);
606 throw NetworkError(
"Unknown replication protocol message (" +
Wrapper around standard unique_ptr template.
database class declarations
Internal implementation of DatabaseReplica.
void update_stub_database() const
Update the stub database which points to a single database.
bool possibly_make_offline_live()
Check if the offline database has reached the required version.
string path
The path to the replica directory.
double last_live_changeset_time
The time at which a changeset was last applied to the live database.
string offline_needed_revision
The revision that the secondary database must reach before it can be made live.
WritableDatabase live_db
The live database being replicated.
string offline_uuid
The UUID of the secondary database.
Internal(const Internal &)
Don't allow copying.
void apply_db_copy(double end_time)
Apply a set of DB copy messages from the connection.
void check_message_type(int type, int expected) const
Check that a message type is as expected.
void operator=(const Internal &)
Don't allow assignment.
int live_id
The id of the currently live database in the replica (0 or 1).
string get_replica_path(int id) const
bool have_offline_db
Do we have an offline database currently?
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
void remove_offline_db()
Delete the offline database.
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
bool need_copy_next
Flag to indicate that the only valid operation next is a full copy.
string offline_revision
The revision that the secondary database has been updated to.
string get_description() const
Return a string describing this object.
RemoteConnection * conn
The remote connection we're using.
string get_revision_info() const
Get a string describing the current revision of the replica.
A RemoteConnection object provides a bidirectional connection to another RemoteConnection object on a...
void send_message(char type, const std::string &s, double end_time)
Send a message.
DatabaseCorruptError indicates database corruption was detected.
DatabaseError indicates some sort of database related error.
DatabaseOpeningError indicates failure to open a database.
Access to a database replica, for applying replication to it.
~DatabaseReplica()
Destructor.
std::string get_description() const
Return a string describing this object.
Internal * internal
Internals.
bool apply_next_changeset(ReplicationInfo *info, double reader_close_time)
Read and apply the next changeset.
void set_read_fd(int fd)
Set the file descriptor to read changesets from.
std::string get_revision_info() const
Get a string describing the current revision of the replica.
This class is used to access a database, or a group of databases.
std::vector< Xapian::Internal::intrusive_ptr< Internal > > internal
const std::string & get_msg() const
Message giving details of the error, intended for human consumption.
Indicates an attempt to use a feature which is unavailable.
Base class for objects managed by intrusive_ptr.
InvalidOperationError indicates the API was used in an invalid way.
Indicates a problem communicating with a remote database.
This class provides read/write access to a database.
Constants in the Xapian namespace.
Class to manage replication of databases.
Factory functions for constructing Database and WritableDatabase objects.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
#define LOGCALL_CTOR(CATEGORY, CLASS, PARAMS)
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
#define LOGCALL_DTOR(CATEGORY, CLASS)
Append a string to an object description, escaping invalid UTF-8.
Hierarchy of classes which Xapian can throw as exceptions.
Utility functions for testing files.
bool dir_exists(const char *path)
Test if a directory exists.
void removedir(const string &dirname)
Remove a directory, and its contents.
File and path manipulation routines.
bool io_tmp_rename(const std::string &tmp_file, const std::string &real_file)
Rename a temporary file to its final position.
Wrappers for low-level POSIX I/O routines.
void decode_length_and_check(const char **p, const char *end, unsigned &out)
Decode a length encoded by encode_length.
length encoded as a string
std::string encode_length(T len)
Encode a length as a variable-length string.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
double now()
Return the current time.
void sleep(double t)
Sleep until the time represented by this object.
WritableDatabase open()
Construct a WritableDatabase object for a new, empty InMemory database.
string str(int value)
Convert int to std::string.
The Xapian namespace contains public interfaces for the Xapian library.
const int DB_CREATE
Create a new database.
int revision()
Report the revision of the library which the program is linked with.
const int DB_BACKEND_STUB
Open a stub database file.
const int DB_OPEN
Open an existing database.
Define the XAPIAN_NORETURN macro.
Various assertion macros.
Functions for handling a time or time interval in a double.
RemoteConnection class used by the remote backend.
#define REPLICA_STUB_BANNER
static void throw_connection_closed_unexpectedly()
Replication support for Xapian databases.
Replication protocol version and message numbers.
@ REPL_REPLY_END_OF_CHANGES
include <sys/stat.h> with portability enhancements
<unistd.h>, but with compat.
Convert types to std::string.
Information about the steps involved in performing a replication.
bool changed
True if and only if the replication corresponds to a change in the live version of the database.
int fullcopy_count
Number of times a full database copy was performed.
int changeset_count
Number of changesets applied.
void description_append(std::string &desc, const std::string &s)
Define preprocessor symbols for the library version.