00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <config.h>
00024
00025 #include "brass_databasereplicator.h"
00026
00027 #include "xapian/error.h"
00028
00029 #include "../flint_lock.h"
00030 #include "brass_record.h"
00031 #include "brass_replicate_internal.h"
00032 #include "brass_types.h"
00033 #include "brass_version.h"
00034 #include "debuglog.h"
00035 #include "io_utils.h"
00036 #include "pack.h"
00037 #include "remoteconnection.h"
00038 #include "replicationprotocol.h"
00039 #include "safeerrno.h"
00040 #include "str.h"
00041 #include "stringutils.h"
00042 #include "utils.h"
00043
00044 #ifdef __WIN32__
00045 # include "msvc_posix_wrapper.h"
00046 #endif
00047
00048 #include <cstdio>
00049
00050 using namespace std;
00051 using namespace Xapian;
00052
00053 BrassDatabaseReplicator::BrassDatabaseReplicator(const string & db_dir_)
00054 : db_dir(db_dir_)
00055 {
00056 }
00057
00058 bool
00059 BrassDatabaseReplicator::check_revision_at_least(const string & rev,
00060 const string & target) const
00061 {
00062 LOGCALL(DB, bool, "BrassDatabaseReplicator::check_revision_at_least", rev | target);
00063
00064 brass_revision_number_t rev_val;
00065 brass_revision_number_t target_val;
00066
00067 const char * ptr = rev.data();
00068 const char * end = ptr + rev.size();
00069 if (!unpack_uint(&ptr, end, &rev_val)) {
00070 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
00071 }
00072
00073 ptr = target.data();
00074 end = ptr + target.size();
00075 if (!unpack_uint(&ptr, end, &target_val)) {
00076 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
00077 }
00078
00079 RETURN(rev_val >= target_val);
00080 }
00081
00082 void
00083 BrassDatabaseReplicator::process_changeset_chunk_base(const string & tablename,
00084 string & buf,
00085 RemoteConnection & conn,
00086 double end_time) const
00087 {
00088 const char *ptr = buf.data();
00089 const char *end = ptr + buf.size();
00090
00091
00092 char letter = ptr[0];
00093 if (letter != 'A' && letter != 'B')
00094 throw NetworkError("Invalid base file letter in changeset");
00095 ++ptr;
00096
00097
00098
00099 if (ptr == end)
00100 throw NetworkError("Unexpected end of changeset (5)");
00101 string::size_type base_size;
00102 if (!unpack_uint(&ptr, end, &base_size))
00103 throw NetworkError("Invalid base file size in changeset");
00104
00105
00106 buf.erase(0, ptr - buf.data());
00107 conn.get_message_chunk(buf, base_size, end_time);
00108
00109 if (buf.size() < base_size)
00110 throw NetworkError("Unexpected end of changeset (6)");
00111
00112
00113 string tmp_path = db_dir + "/" + tablename + "tmp";
00114 string base_path = db_dir + "/" + tablename + ".base" + letter;
00115 #ifdef __WIN32__
00116 int fd = msvc_posix_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
00117 #else
00118 int fd = ::open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
00119 #endif
00120 if (fd == -1) {
00121 string msg = "Failed to open ";
00122 msg += tmp_path;
00123 throw DatabaseError(msg, errno);
00124 }
00125 {
00126 fdcloser closer(fd);
00127
00128 io_write(fd, buf.data(), base_size);
00129 io_sync(fd);
00130 }
00131 #if defined __WIN32__
00132 if (msvc_posix_rename(tmp_path.c_str(), base_path.c_str()) < 0) {
00133 #else
00134 if (rename(tmp_path.c_str(), base_path.c_str()) < 0) {
00135 #endif
00136
00137
00138
00139
00140
00141 int saved_errno = errno;
00142 if (unlink(tmp_path) == 0 || errno != ENOENT) {
00143 string msg("Couldn't update base file ");
00144 msg += tablename;
00145 msg += ".base";
00146 msg += letter;
00147 throw DatabaseError(msg, saved_errno);
00148 }
00149 }
00150
00151 buf.erase(0, base_size);
00152 }
00153
00154 void
00155 BrassDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename,
00156 string & buf,
00157 RemoteConnection & conn,
00158 double end_time) const
00159 {
00160 const char *ptr = buf.data();
00161 const char *end = ptr + buf.size();
00162
00163 unsigned int changeset_blocksize;
00164 if (!unpack_uint(&ptr, end, &changeset_blocksize))
00165 throw NetworkError("Invalid blocksize in changeset");
00166 buf.erase(0, ptr - buf.data());
00167
00168 string db_path = db_dir + "/" + tablename + ".DB";
00169 #ifdef __WIN32__
00170 int fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_CREAT | O_BINARY);
00171 #else
00172 int fd = ::open(db_path.c_str(), O_WRONLY | O_CREAT | O_BINARY, 0666);
00173 #endif
00174 if (fd == -1) {
00175 string msg = "Failed to open ";
00176 msg += db_path;
00177 throw DatabaseError(msg, errno);
00178 }
00179 {
00180 fdcloser closer(fd);
00181
00182 while (true) {
00183 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
00184 ptr = buf.data();
00185 end = ptr + buf.size();
00186
00187 uint4 block_number;
00188 if (!unpack_uint(&ptr, end, &block_number))
00189 throw NetworkError("Invalid block number in changeset");
00190 buf.erase(0, ptr - buf.data());
00191 if (block_number == 0)
00192 break;
00193 --block_number;
00194
00195 conn.get_message_chunk(buf, changeset_blocksize, end_time);
00196 if (buf.size() < changeset_blocksize)
00197 throw NetworkError("Incomplete block in changeset");
00198
00199
00200
00201 if (lseek(fd, off_t(changeset_blocksize) * block_number, SEEK_SET) == -1) {
00202 string msg = "Failed to seek to block ";
00203 msg += str(block_number);
00204 throw DatabaseError(msg, errno);
00205 }
00206 io_write(fd, buf.data(), changeset_blocksize);
00207
00208 buf.erase(0, changeset_blocksize);
00209 }
00210 io_sync(fd);
00211 }
00212 }
00213
00214 string
00215 BrassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
00216 double end_time,
00217 bool valid) const
00218 {
00219 LOGCALL(DB, string, "BrassDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
00220
00221
00222 FlintLock lock(db_dir);
00223 string explanation;
00224 FlintLock::reason why = lock.lock(true, explanation);
00225 if (why != FlintLock::SUCCESS) {
00226 lock.throw_databaselockerror(why, db_dir, explanation);
00227 }
00228
00229 char type = conn.get_message_chunked(end_time);
00230 (void) type;
00231 AssertEq(type, REPL_REPLY_CHANGESET);
00232
00233 string buf;
00234
00235
00236
00237 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
00238
00239 if (!startswith(buf, CHANGES_MAGIC_STRING)) {
00240 throw NetworkError("Invalid ChangeSet magic string");
00241 }
00242 buf.erase(0, 12);
00243 const char *ptr = buf.data();
00244 const char *end = ptr + buf.size();
00245
00246 unsigned int changes_version;
00247 if (!unpack_uint(&ptr, end, &changes_version))
00248 throw NetworkError("Couldn't read a valid version number from changeset");
00249 if (changes_version != CHANGES_VERSION)
00250 throw NetworkError("Unsupported changeset version");
00251
00252 brass_revision_number_t startrev;
00253 brass_revision_number_t endrev;
00254
00255 if (!unpack_uint(&ptr, end, &startrev))
00256 throw NetworkError("Couldn't read a valid start revision from changeset");
00257 if (!unpack_uint(&ptr, end, &endrev))
00258 throw NetworkError("Couldn't read a valid end revision from changeset");
00259
00260 if (endrev <= startrev)
00261 throw NetworkError("End revision in changeset is not later than start revision");
00262
00263 if (ptr == end)
00264 throw NetworkError("Unexpected end of changeset (1)");
00265
00266 if (valid) {
00267
00268
00269
00270
00271 BrassRecordTable record_table(db_dir, true);
00272 record_table.open();
00273 if (startrev != record_table.get_open_revision_number())
00274 throw NetworkError("Changeset supplied is for wrong revision number");
00275 }
00276
00277 unsigned char changes_type = ptr[0];
00278 if (changes_type != 0) {
00279 throw NetworkError("Unsupported changeset type: " + str(changes_type));
00280
00281
00282 }
00283
00284
00285 buf.erase(0, ptr + 1 - buf.data());
00286
00287
00288 while (true) {
00289 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
00290 ptr = buf.data();
00291 end = ptr + buf.size();
00292
00293
00294 if (ptr == end)
00295 throw NetworkError("Unexpected end of changeset (2)");
00296 unsigned char chunk_type = ptr[0];
00297 ++ptr;
00298 if (chunk_type == 0)
00299 break;
00300
00301
00302 string tablename;
00303 if (!unpack_string(&ptr, end, tablename))
00304 throw NetworkError("Unexpected end of changeset (3)");
00305 if (tablename.empty())
00306 throw NetworkError("Missing tablename in changeset");
00307 if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
00308 tablename.npos)
00309 throw NetworkError("Invalid character in tablename in changeset");
00310
00311
00312 if (ptr == end)
00313 throw NetworkError("Unexpected end of changeset (4)");
00314 buf.erase(0, ptr - buf.data());
00315
00316 switch (chunk_type) {
00317 case 1:
00318 process_changeset_chunk_base(tablename, buf, conn, end_time);
00319 break;
00320 case 2:
00321 process_changeset_chunk_blocks(tablename, buf, conn, end_time);
00322 break;
00323 default:
00324 throw NetworkError("Unrecognised item type in changeset");
00325 }
00326 }
00327 brass_revision_number_t reqrev;
00328 if (!unpack_uint(&ptr, end, &reqrev))
00329 throw NetworkError("Couldn't read a valid required revision from changeset");
00330 if (reqrev < endrev)
00331 throw NetworkError("Required revision in changeset is earlier than end revision");
00332 if (ptr != end)
00333 throw NetworkError("Junk found at end of changeset");
00334
00335 buf.resize(0);
00336 pack_uint(buf, reqrev);
00337 RETURN(buf);
00338 }
00339
00340 string
00341 BrassDatabaseReplicator::get_uuid() const
00342 {
00343 LOGCALL(DB, string, "BrassDatabaseReplicator::get_uuid", NO_ARGS);
00344 BrassVersion version_file(db_dir);
00345 try {
00346 version_file.read_and_check();
00347 } catch (const Xapian::DatabaseError &) {
00348 RETURN(string());
00349 }
00350 RETURN(version_file.get_uuid_string());
00351 }