00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include "replication.h"
00025
00026 #include "xapian/base.h"
00027 #include "xapian/dbfactory.h"
00028 #include "xapian/error.h"
00029 #include "xapian/version.h"
00030
00031 #include "database.h"
00032 #include "databasereplicator.h"
00033 #include "debuglog.h"
00034 #include "fileutils.h"
00035 #ifdef __WIN32__
00036 # include "msvc_posix_wrapper.h"
00037 #endif
00038 #include "omassert.h"
00039 #include "realtime.h"
00040 #include "remoteconnection.h"
00041 #include "replicationprotocol.h"
00042 #include "safeerrno.h"
00043 #include "safesysstat.h"
00044 #include "safeunistd.h"
00045 #include "serialise.h"
00046 #include "str.h"
00047 #include "utils.h"
00048
00049 #include "autoptr.h"
00050 #include <cstdio>
00051 #include <fstream>
00052 #include <string>
00053
00054 using namespace std;
00055 using namespace Xapian;
00056
00057
00058 #define REPLICA_STUB_BANNER \
00059 "# Automatically generated by Xapian::DatabaseReplica v"XAPIAN_VERSION".\n" \
00060 "# Do not manually edit - replication operations may regenerate this file.\n"
00061
00062 void
00063 DatabaseMaster::write_changesets_to_fd(int fd,
00064 const string & start_revision,
00065 ReplicationInfo * info) const
00066 {
00067 LOGCALL_VOID(REPLICA, "DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
00068 if (info != NULL)
00069 info->clear();
00070 Database db;
00071 try {
00072 db = Database(path);
00073 } catch (const Xapian::DatabaseError & e) {
00074 RemoteConnection conn(-1, fd);
00075 conn.send_message(REPL_REPLY_FAIL,
00076 "Can't open database: " + e.get_msg(),
00077 0.0);
00078 return;
00079 }
00080 if (db.internal.size() != 1) {
00081 throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
00082 }
00083
00084
00085 bool need_whole_db = false;
00086 string revision;
00087 if (start_revision.empty()) {
00088 need_whole_db = true;
00089 } else {
00090 const char * ptr = start_revision.data();
00091 const char * end = ptr + start_revision.size();
00092 size_t uuid_length = decode_length(&ptr, end, true);
00093 string request_uuid(ptr, uuid_length);
00094 ptr += uuid_length;
00095 string db_uuid = db.internal[0]->get_uuid();
00096 if (request_uuid != db_uuid) {
00097 need_whole_db = true;
00098 }
00099 revision.assign(ptr, end - ptr);
00100 }
00101
00102 db.internal[0]->write_changesets_to_fd(fd, revision, need_whole_db, info);
00103 }
00104
00105 string
00106 DatabaseMaster::get_description() const
00107 {
00108 return "DatabaseMaster(" + path + ")";
00109 }
00110
00112 class DatabaseReplica::Internal : public Xapian::Internal::RefCntBase {
00114 void operator=(const Internal &);
00115
00117 Internal(const Internal &);
00118
00120 string path;
00121
00123 int live_id;
00124
00129 mutable WritableDatabase live_db;
00130
00136 bool have_offline_db;
00137
00140 bool need_copy_next;
00141
00144 string offline_revision;
00145
00148 string offline_uuid;
00149
00153 string offline_needed_revision;
00154
00159 double last_live_changeset_time;
00160
00162 RemoteConnection * conn;
00163
00170 void update_stub_database() const;
00171
00173 void remove_offline_db();
00174
00177 void apply_db_copy(double end_time);
00178
00183 void check_message_type(char type, char expected) const;
00184
00191 bool possibly_make_offline_live();
00192
00193 string get_replica_path(int id) const {
00194 string p = path;
00195 p += "/replica_";
00196 p += char('0' + id);
00197 return p;
00198 }
00199
00200 public:
00202 Internal(const string & path_);
00203
00205 ~Internal() { delete conn; }
00206
00208 string get_revision_info() const;
00209
00211 void set_read_fd(int fd);
00212
00214 bool apply_next_changeset(ReplicationInfo * info,
00215 double reader_close_time);
00216
00218 string get_description() const { return path; }
00219 };
00220
00221
00222
00223 DatabaseReplica::DatabaseReplica(const DatabaseReplica & other)
00224 : internal(other.internal)
00225 {
00226 LOGCALL_CTOR(REPLICA, "DatabaseReplica", other);
00227 }
00228
00229 void
00230 DatabaseReplica::operator=(const DatabaseReplica & other)
00231 {
00232 LOGCALL_VOID(REPLICA, "DatabaseReplica::operator=", other);
00233 internal = other.internal;
00234 }
00235
00236 DatabaseReplica::DatabaseReplica()
00237 : internal(0)
00238 {
00239 LOGCALL_CTOR(REPLICA, "DatabaseReplica", NO_ARGS);
00240 }
00241
00242 DatabaseReplica::DatabaseReplica(const string & path)
00243 : internal(new DatabaseReplica::Internal(path))
00244 {
00245 LOGCALL_CTOR(REPLICA, "DatabaseReplica", path);
00246 }
00247
00248 DatabaseReplica::~DatabaseReplica()
00249 {
00250 LOGCALL_DTOR(REPLICA, "DatabaseReplica");
00251 }
00252
00253 string
00254 DatabaseReplica::get_revision_info() const
00255 {
00256 LOGCALL(REPLICA, string, "DatabaseReplica::get_revision_info", NO_ARGS);
00257 if (internal.get() == NULL)
00258 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::get_revision_info on a closed replica.");
00259 RETURN(internal->get_revision_info());
00260 }
00261
00262 void
00263 DatabaseReplica::set_read_fd(int fd)
00264 {
00265 LOGCALL_VOID(REPLICA, "DatabaseReplica::set_read_fd", fd);
00266 if (internal.get() == NULL)
00267 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::set_read_fd on a closed replica.");
00268 internal->set_read_fd(fd);
00269 }
00270
00271 bool
00272 DatabaseReplica::apply_next_changeset(ReplicationInfo * info,
00273 double reader_close_time)
00274 {
00275 LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
00276 if (info != NULL)
00277 info->clear();
00278 if (internal.get() == NULL)
00279 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::apply_next_changeset on a closed replica.");
00280 RETURN(internal->apply_next_changeset(info, reader_close_time));
00281 }
00282
00283 void
00284 DatabaseReplica::close()
00285 {
00286 LOGCALL(REPLICA, bool, "DatabaseReplica::close", NO_ARGS);
00287 internal = NULL;
00288 }
00289
00290 string
00291 DatabaseReplica::get_description() const
00292 {
00293 string desc("DatabaseReplica(");
00294 if (internal.get()) {
00295 desc += internal->get_description();
00296 }
00297 desc += ')';
00298 return desc;
00299 }
00300
00301
00302
00303 void
00304 DatabaseReplica::Internal::update_stub_database() const
00305 {
00306 string stub_path = path;
00307 stub_path += "/XAPIANDB";
00308 string tmp_path = stub_path;
00309 tmp_path += ".tmp";
00310 {
00311 ofstream stub(tmp_path.c_str());
00312 stub << REPLICA_STUB_BANNER
00313 "auto replica_" << live_id << endl;
00314 }
00315 int result;
00316 #ifdef __WIN32__
00317 result = msvc_posix_rename(tmp_path.c_str(), stub_path.c_str());
00318 #else
00319 result = rename(tmp_path.c_str(), stub_path.c_str());
00320 #endif
00321 if (result == -1) {
00322 string msg("Failed to update stub db file for replica: ");
00323 msg += path;
00324 throw Xapian::DatabaseOpeningError(msg);
00325 }
00326 }
00327
00328 DatabaseReplica::Internal::Internal(const string & path_)
00329 : path(path_), live_id(0), live_db(), have_offline_db(false),
00330 need_copy_next(false), offline_revision(), offline_needed_revision(),
00331 last_live_changeset_time(), conn(NULL)
00332 {
00333 LOGCALL_CTOR(REPLICA, "DatabaseReplica::Internal", path_);
00334 #if ! defined XAPIAN_HAS_FLINT_BACKEND && ! defined XAPIAN_HAS_CHERT_BACKEND
00335 throw FeatureUnavailableError("Replication requires the Flint or Chert backend to be enabled");
00336 #else
00337 if (mkdir(path, 0777) == 0) {
00338
00339
00340
00341
00342
00343
00344 live_db = WritableDatabase(get_replica_path(live_id),
00345 Xapian::DB_CREATE);
00346 update_stub_database();
00347 } else {
00348 if (errno != EEXIST) {
00349 throw DatabaseOpeningError("Couldn't create directory '" + path + "'", errno);
00350 }
00351 if (!dir_exists(path)) {
00352 throw DatabaseOpeningError("Replica path must be a directory");
00353 }
00354 string stub_path = path;
00355 stub_path += "/XAPIANDB";
00356 live_db = Auto::open_stub(stub_path, Xapian::DB_OPEN);
00357
00358 ifstream stub(stub_path.c_str());
00359 string line;
00360 while (getline(stub, line)) {
00361 if (!line.empty() && line[0] != '#') {
00362 live_id = line[line.size() - 1] - '0';
00363 break;
00364 }
00365 }
00366 }
00367 #endif
00368 }
00369
00370 string
00371 DatabaseReplica::Internal::get_revision_info() const
00372 {
00373 LOGCALL(REPLICA, string, "DatabaseReplica::Internal::get_revision_info", NO_ARGS);
00374 if (live_db.internal.empty())
00375 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
00376 if (live_db.internal.size() != 1)
00377 throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
00378
00379 string uuid = (live_db.internal[0])->get_uuid();
00380 string buf = encode_length(uuid.size());
00381 buf += uuid;
00382 buf += (live_db.internal[0])->get_revision_info();
00383 RETURN(buf);
00384 }
00385
00386 void
00387 DatabaseReplica::Internal::remove_offline_db()
00388 {
00389
00390 removedir(get_replica_path(live_id ^ 1));
00391 have_offline_db = false;
00392 }
00393
00394 void
00395 DatabaseReplica::Internal::apply_db_copy(double end_time)
00396 {
00397 have_offline_db = true;
00398 last_live_changeset_time = 0;
00399 string offline_path = get_replica_path(live_id ^ 1);
00400
00401
00402
00403
00404
00405 removedir(offline_path);
00406 if (mkdir(offline_path, 0777)) {
00407 throw Xapian::DatabaseError("Cannot make directory '" +
00408 offline_path + "'", errno);
00409 }
00410
00411 {
00412 string buf;
00413 char type = conn->get_message(buf, end_time);
00414 check_message_type(type, REPL_REPLY_DB_HEADER);
00415 const char * ptr = buf.data();
00416 const char * end = ptr + buf.size();
00417 size_t uuid_length = decode_length(&ptr, end, true);
00418 offline_uuid.assign(ptr, uuid_length);
00419 offline_revision.assign(buf, ptr + uuid_length - buf.data(), buf.npos);
00420 }
00421
00422
00423 while (true) {
00424 string filename;
00425 char type = conn->sniff_next_message_type(end_time);
00426 if (type == REPL_REPLY_FAIL)
00427 return;
00428 if (type == REPL_REPLY_DB_FOOTER)
00429 break;
00430
00431 type = conn->get_message(filename, end_time);
00432 check_message_type(type, REPL_REPLY_DB_FILENAME);
00433
00434
00435
00436 if (filename.find("..") != string::npos) {
00437 throw NetworkError("Filename in database contains '..'");
00438 }
00439
00440 type = conn->sniff_next_message_type(end_time);
00441 if (type == REPL_REPLY_FAIL)
00442 return;
00443
00444 string filepath = offline_path + "/" + filename;
00445 type = conn->receive_file(filepath, end_time);
00446 check_message_type(type, REPL_REPLY_DB_FILEDATA);
00447 }
00448 char type = conn->get_message(offline_needed_revision, end_time);
00449 check_message_type(type, REPL_REPLY_DB_FOOTER);
00450 need_copy_next = false;
00451 }
00452
00453 void
00454 DatabaseReplica::Internal::check_message_type(char type, char expected) const
00455 {
00456 if (type != expected) {
00457 throw NetworkError("Unexpected replication protocol message type (got "
00458 + str(type) + ", expected "
00459 + str(expected) + ")");
00460 }
00461 }
00462
00463 bool
00464 DatabaseReplica::Internal::possibly_make_offline_live()
00465 {
00466 string replica_path(get_replica_path(live_id ^ 1));
00467 AutoPtr<DatabaseReplicator> replicator;
00468 try {
00469 replicator.reset(DatabaseReplicator::open(replica_path));
00470 } catch (const Xapian::DatabaseError &) {
00471 return false;
00472 }
00473 if (offline_needed_revision.empty()) {
00474 return false;
00475 }
00476 if (!replicator->check_revision_at_least(offline_revision,
00477 offline_needed_revision)) {
00478 return false;
00479 }
00480
00481 string replicated_uuid = replicator->get_uuid();
00482 if (replicated_uuid.empty()) {
00483 return false;
00484 }
00485
00486 if (replicated_uuid != offline_uuid) {
00487 return false;
00488 }
00489
00490 live_id ^= 1;
00491
00492
00493 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
00494 update_stub_database();
00495 remove_offline_db();
00496 return true;
00497 }
00498
00499 void
00500 DatabaseReplica::Internal::set_read_fd(int fd)
00501 {
00502 delete conn;
00503 conn = NULL;
00504 conn = new RemoteConnection(fd, -1);
00505 }
00506
00507 bool
00508 DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,
00509 double reader_close_time)
00510 {
00511 LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
00512 if (live_db.internal.empty())
00513 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
00514 if (live_db.internal.size() != 1)
00515 throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
00516
00517 while (true) {
00518 char type = conn->sniff_next_message_type(0.0);
00519 switch (type) {
00520 case REPL_REPLY_END_OF_CHANGES: {
00521 string buf;
00522 (void)conn->get_message(buf, 0.0);
00523 RETURN(false);
00524 }
00525 case REPL_REPLY_DB_HEADER:
00526
00527 try {
00528 apply_db_copy(0.0);
00529 if (info != NULL)
00530 ++(info->fullcopy_count);
00531 string replica_uuid;
00532 {
00533 AutoPtr<DatabaseReplicator> replicator(
00534 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
00535 replica_uuid = replicator->get_uuid();
00536 }
00537 if (replica_uuid != offline_uuid) {
00538 remove_offline_db();
00539
00540
00541
00542
00543
00544
00545 need_copy_next = true;
00546 }
00547 } catch (...) {
00548 remove_offline_db();
00549 throw;
00550 }
00551 if (possibly_make_offline_live()) {
00552 if (info != NULL)
00553 info->changed = true;
00554 }
00555 break;
00556 case REPL_REPLY_CHANGESET:
00557 if (need_copy_next) {
00558 throw NetworkError("Needed a database copy next");
00559 }
00560 if (!have_offline_db) {
00561
00562 string replica_path(get_replica_path(live_id));
00563 live_db = WritableDatabase();
00564
00565 if (last_live_changeset_time != 0.0) {
00566
00567
00568
00569 double until;
00570 until = last_live_changeset_time + reader_close_time;
00571 RealTime::sleep(until);
00572 }
00573
00574
00575
00576 {
00577 AutoPtr<DatabaseReplicator> replicator(
00578 DatabaseReplicator::open(replica_path));
00579
00580
00581
00582
00583 replicator->apply_changeset_from_conn(*conn, 0.0, true);
00584 }
00585 last_live_changeset_time = RealTime::now();
00586
00587 if (info != NULL) {
00588 ++(info->changeset_count);
00589 info->changed = true;
00590 }
00591
00592 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
00593 RETURN(true);
00594 }
00595
00596 {
00597 AutoPtr<DatabaseReplicator> replicator(
00598 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
00599
00600 offline_revision = replicator->
00601 apply_changeset_from_conn(*conn, 0.0, false);
00602
00603 if (info != NULL) {
00604 ++(info->changeset_count);
00605 }
00606 }
00607 if (possibly_make_offline_live()) {
00608 if (info != NULL)
00609 info->changed = true;
00610 }
00611 RETURN(true);
00612 case REPL_REPLY_FAIL: {
00613 string buf;
00614 (void)conn->get_message(buf, 0.0);
00615 throw NetworkError("Unable to fully synchronise: " + buf);
00616 }
00617 default:
00618 throw NetworkError("Unknown replication protocol message ("
00619 + str(type) + ")");
00620 }
00621 }
00622 }