00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <config.h>
00025
00026 #include "flint_databasereplicator.h"
00027
00028 #include "xapian/error.h"
00029
00030 #include "../flint_lock.h"
00031 #include "flint_record.h"
00032 #include "flint_replicate_internal.h"
00033 #include "flint_types.h"
00034 #include "flint_utils.h"
00035 #include "flint_version.h"
00036 #include "debuglog.h"
00037 #include "io_utils.h"
00038 #include "remoteconnection.h"
00039 #include "replicate_utils.h"
00040 #include "replicationprotocol.h"
00041 #include "safeerrno.h"
00042 #include "str.h"
00043 #include "stringutils.h"
00044 #include "utils.h"
00045
00046 #ifdef __WIN32__
00047 # include "msvc_posix_wrapper.h"
00048 #endif
00049
00050 #include <cstdio>
00051
00052 using namespace std;
00053 using namespace Xapian;
00054
00055 FlintDatabaseReplicator::FlintDatabaseReplicator(const string & db_dir_)
00056 : db_dir(db_dir_),
00057 max_changesets(0)
00058 {
00059 const char *p = getenv("XAPIAN_MAX_CHANGESETS");
00060 if (p)
00061 max_changesets = atoi(p);
00062 }
00063
00064 bool
00065 FlintDatabaseReplicator::check_revision_at_least(const string & rev,
00066 const string & target) const
00067 {
00068 LOGCALL(DB, bool, "FlintDatabaseReplicator::check_revision_at_least", rev | target);
00069
00070 flint_revision_number_t rev_val;
00071 flint_revision_number_t target_val;
00072
00073 const char * ptr = rev.data();
00074 const char * end = ptr + rev.size();
00075 if (!F_unpack_uint(&ptr, end, &rev_val)) {
00076 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
00077 }
00078
00079 ptr = target.data();
00080 end = ptr + target.size();
00081 if (!F_unpack_uint(&ptr, end, &target_val)) {
00082 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
00083 }
00084
00085 RETURN(rev_val >= target_val);
00086 }
00087
00088 void
00089 FlintDatabaseReplicator::process_changeset_chunk_base(const string & tablename,
00090 string & buf,
00091 RemoteConnection & conn,
00092 double end_time,
00093 int changes_fd) const
00094 {
00095 const char *ptr = buf.data();
00096 const char *end = ptr + buf.size();
00097
00098
00099 char letter = ptr[0];
00100 if (letter != 'A' && letter != 'B')
00101 throw NetworkError("Invalid base file letter in changeset");
00102 ++ptr;
00103
00104
00105
00106 if (ptr == end)
00107 throw NetworkError("Unexpected end of changeset (5)");
00108 string::size_type base_size;
00109 if (!F_unpack_uint(&ptr, end, &base_size))
00110 throw NetworkError("Invalid base file size in changeset");
00111
00112
00113 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
00114 conn.get_message_chunk(buf, base_size, end_time);
00115
00116 if (buf.size() < base_size)
00117 throw NetworkError("Unexpected end of changeset (6)");
00118
00119
00120 string tmp_path = db_dir + "/" + tablename + "tmp";
00121 string base_path = db_dir + "/" + tablename + ".base" + letter;
00122 #ifdef __WIN32__
00123 int fd = msvc_posix_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
00124 #else
00125 int fd = ::open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
00126 #endif
00127 if (fd == -1) {
00128 string msg = "Failed to open ";
00129 msg += tmp_path;
00130 throw DatabaseError(msg, errno);
00131 }
00132 {
00133 fdcloser closer(fd);
00134
00135 io_write(fd, buf.data(), base_size);
00136 io_sync(fd);
00137 }
00138
00139
00140 write_and_clear_changes(changes_fd, buf, base_size);
00141
00142 #if defined __WIN32__
00143 if (msvc_posix_rename(tmp_path.c_str(), base_path.c_str()) < 0) {
00144 #else
00145 if (rename(tmp_path.c_str(), base_path.c_str()) < 0) {
00146 #endif
00147
00148
00149
00150
00151
00152 int saved_errno = errno;
00153 if (unlink(tmp_path) == 0 || errno != ENOENT) {
00154 string msg("Couldn't update base file ");
00155 msg += tablename;
00156 msg += ".base";
00157 msg += letter;
00158 throw DatabaseError(msg, saved_errno);
00159 }
00160 }
00161 }
00162
00163 void
00164 FlintDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename,
00165 string & buf,
00166 RemoteConnection & conn,
00167 double end_time,
00168 int changes_fd) const
00169 {
00170 const char *ptr = buf.data();
00171 const char *end = ptr + buf.size();
00172
00173 unsigned int changeset_blocksize;
00174 if (!F_unpack_uint(&ptr, end, &changeset_blocksize))
00175 throw NetworkError("Invalid blocksize in changeset");
00176 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
00177
00178 string db_path = db_dir + "/" + tablename + ".DB";
00179 #ifdef __WIN32__
00180 int fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_BINARY);
00181 #else
00182 int fd = ::open(db_path.c_str(), O_WRONLY | O_BINARY, 0666);
00183 #endif
00184 if (fd == -1) {
00185 if (file_exists(db_path)) {
00186 string msg = "Failed to open ";
00187 msg += db_path;
00188 throw DatabaseError(msg, errno);
00189 }
00190 #ifdef __WIN32__
00191 fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
00192 #else
00193 fd = ::open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
00194 #endif
00195 if (fd == -1) {
00196 string msg = "Failed to create and open ";
00197 msg += db_path;
00198 throw DatabaseError(msg, errno);
00199 }
00200 }
00201 {
00202 fdcloser closer(fd);
00203
00204 while (true) {
00205 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
00206 ptr = buf.data();
00207 end = ptr + buf.size();
00208
00209 uint4 block_number;
00210 if (!F_unpack_uint(&ptr, end, &block_number))
00211 throw NetworkError("Invalid block number in changeset");
00212 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
00213 if (block_number == 0)
00214 break;
00215 --block_number;
00216
00217 conn.get_message_chunk(buf, changeset_blocksize, end_time);
00218 if (buf.size() < changeset_blocksize)
00219 throw NetworkError("Incomplete block in changeset");
00220
00221
00222
00223 if (lseek(fd, off_t(changeset_blocksize) * block_number, SEEK_SET) == -1) {
00224 string msg = "Failed to seek to block ";
00225 msg += str(block_number);
00226 throw DatabaseError(msg, errno);
00227 }
00228 io_write(fd, buf.data(), changeset_blocksize);
00229
00230 write_and_clear_changes(changes_fd, buf, changeset_blocksize);
00231 }
00232 io_sync(fd);
00233 }
00234 }
00235
00236 string
00237 FlintDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
00238 double end_time,
00239 bool valid) const
00240 {
00241 LOGCALL(DB, string, "FlintDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
00242
00243
00244 FlintLock lock(db_dir);
00245 string explanation;
00246 FlintLock::reason why = lock.lock(true, explanation);
00247 if (why != FlintLock::SUCCESS) {
00248 lock.throw_databaselockerror(why, db_dir, explanation);
00249 }
00250
00251 char type = conn.get_message_chunked(end_time);
00252 (void) type;
00253 AssertEq(type, REPL_REPLY_CHANGESET);
00254
00255 string buf;
00256
00257
00258
00259 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
00260
00261 if (!startswith(buf, CHANGES_MAGIC_STRING)) {
00262 throw NetworkError("Invalid ChangeSet magic string");
00263 }
00264 const char *ptr = buf.data();
00265 const char *end = ptr + buf.size();
00266 ptr += CONST_STRLEN(CHANGES_MAGIC_STRING);
00267
00268 unsigned int changes_version;
00269 if (!F_unpack_uint(&ptr, end, &changes_version))
00270 throw NetworkError("Couldn't read a valid version number from changeset");
00271 if (changes_version != CHANGES_VERSION)
00272 throw NetworkError("Unsupported changeset version");
00273
00274 flint_revision_number_t startrev;
00275 flint_revision_number_t endrev;
00276
00277 if (!F_unpack_uint(&ptr, end, &startrev))
00278 throw NetworkError("Couldn't read a valid start revision from changeset");
00279 if (!F_unpack_uint(&ptr, end, &endrev))
00280 throw NetworkError("Couldn't read a valid end revision from changeset");
00281
00282 if (endrev <= startrev)
00283 throw NetworkError("End revision in changeset is not later than start revision");
00284
00285 if (ptr == end)
00286 throw NetworkError("Unexpected end of changeset (1)");
00287
00288 int changes_fd = -1;
00289 string changes_name;
00290 if (max_changesets > 0) {
00291 changes_fd = create_changeset_file(db_dir, "changes" + str(startrev),
00292 changes_name);
00293 }
00294 fdcloser closer(changes_fd);
00295
00296 if (valid) {
00297
00298
00299
00300
00301 FlintRecordTable record_table(db_dir, true);
00302 record_table.open();
00303 if (startrev != record_table.get_open_revision_number())
00304 throw NetworkError("Changeset supplied is for wrong revision number");
00305 }
00306
00307 unsigned char changes_type = ptr[0];
00308 if (changes_type != 0) {
00309 throw NetworkError("Unsupported changeset type: " + str(changes_type));
00310
00311
00312 }
00313
00314
00315 write_and_clear_changes(changes_fd, buf, ptr + 1 - buf.data());
00316
00317
00318 while (true) {
00319 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
00320 ptr = buf.data();
00321 end = ptr + buf.size();
00322
00323
00324 if (ptr == end)
00325 throw NetworkError("Unexpected end of changeset (2)");
00326 unsigned char chunk_type = ptr[0];
00327 ++ptr;
00328 if (chunk_type == 0)
00329 break;
00330
00331
00332 string tablename;
00333 if (!F_unpack_string(&ptr, end, tablename))
00334 throw NetworkError("Unexpected end of changeset (3)");
00335 if (tablename.empty())
00336 throw NetworkError("Missing tablename in changeset");
00337 if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
00338 tablename.npos)
00339 throw NetworkError("Invalid character in tablename in changeset");
00340
00341
00342 if (ptr == end)
00343 throw NetworkError("Unexpected end of changeset (4)");
00344 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
00345
00346 switch (chunk_type) {
00347 case 1:
00348 process_changeset_chunk_base(tablename, buf, conn, end_time,
00349 changes_fd);
00350 break;
00351 case 2:
00352 process_changeset_chunk_blocks(tablename, buf, conn, end_time,
00353 changes_fd);
00354 break;
00355 default:
00356 throw NetworkError("Unrecognised item type in changeset");
00357 }
00358 }
00359 flint_revision_number_t reqrev;
00360 if (!F_unpack_uint(&ptr, end, &reqrev))
00361 throw NetworkError("Couldn't read a valid required revision from changeset");
00362 if (reqrev < endrev)
00363 throw NetworkError("Required revision in changeset is earlier than end revision");
00364 if (ptr != end)
00365 throw NetworkError("Junk found at end of changeset");
00366
00367 write_and_clear_changes(changes_fd, buf, buf.size());
00368 buf = F_pack_uint(reqrev);
00369 RETURN(buf);
00370 }
00371
00372 string
00373 FlintDatabaseReplicator::get_uuid() const
00374 {
00375 LOGCALL(DB, string, "FlintDatabaseReplicator::get_uuid", NO_ARGS);
00376 FlintVersion version_file(db_dir);
00377 try {
00378 version_file.read_and_check(true);
00379 } catch (const DatabaseError &) {
00380 RETURN(string());
00381 }
00382 RETURN(version_file.get_uuid_string());
00383 }