00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include "safeerrno.h"
00025 #include <signal.h>
00026
00027 #include <xapian/error.h>
00028
00029 #include "autoptr.h"
00030 #include "emptypostlist.h"
00031 #include "inmemory_positionlist.h"
00032 #include "net_postlist.h"
00033 #include "net_termlist.h"
00034 #include "net_document.h"
00035 #include "omassert.h"
00036 #include "serialise.h"
00037 #include "serialise-double.h"
00038 #include "stats.h"
00039 #include "stringutils.h"
00040 #include "utils.h"
00041
00042 #include <string>
00043 #include <vector>
00044
00045 using namespace std;
00046
00047 RemoteDatabase::RemoteDatabase(int fd, Xapian::timeout timeout_,
00048 const string & context_, bool writable)
00049 : link(fd, fd, context_),
00050 context(context_),
00051 cached_stats_valid(),
00052 timeout(timeout_)
00053 {
00054 #ifndef __WIN32__
00055
00056
00057 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
00058 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00059 }
00060 #endif
00061
00062 if (!writable) {
00063
00064
00065
00066
00067
00068 transaction_state = TRANSACTION_UNIMPLEMENTED;
00069 }
00070
00071 string message;
00072 char type = get_message(message);
00073
00074 if (reply_type(type) != REPLY_GREETING || message.size() < 3) {
00075 if (type == 'O' && message.size() == size_t('M') && message[0] == ' ') {
00076
00077
00078
00079 throw Xapian::NetworkError("Server protocol version too old", context);
00080 }
00081 throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context);
00082 }
00083
00084 const char *p = message.c_str();
00085 const char *p_end = p + message.size();
00086
00087
00088
00089 int protocol_major = static_cast<unsigned char>(*p++);
00090 int protocol_minor = static_cast<unsigned char>(*p++);
00091 if (protocol_major != XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION ||
00092 protocol_minor < XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION) {
00093 string errmsg("Unknown protocol version ");
00094 errmsg += om_tostring(protocol_major);
00095 errmsg += '.';
00096 errmsg += om_tostring(protocol_minor);
00097 errmsg += " ("STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION)"."STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION)" supported)";
00098 throw Xapian::NetworkError(errmsg, context);
00099 }
00100
00101 doccount = decode_length(&p, p_end, false);
00102 lastdocid = decode_length(&p, p_end, false);
00103 if (p == p_end) {
00104 throw Xapian::NetworkError("Bad greeting message received (bool)", context);
00105 }
00106 has_positional_info = (*p++ == '1');
00107 avlength = unserialise_double(&p, p_end);
00108 if (p != p_end || avlength < 0) {
00109 throw Xapian::NetworkError("Bad greeting message received (double)", context);
00110 }
00111 }
00112
00113 RemoteDatabase *
00114 RemoteDatabase::as_remotedatabase()
00115 {
00116 return this;
00117 }
00118
00119 void
00120 RemoteDatabase::keep_alive()
00121 {
00122 send_message(MSG_KEEPALIVE, "");
00123 string message;
00124 get_message(message, REPLY_DONE);
00125 }
00126
00127 TermList *
00128 RemoteDatabase::open_term_list(Xapian::docid did) const
00129 {
00130 if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00131
00132
00133 if (!cached_stats_valid) update_stats();
00134
00135 send_message(MSG_TERMLIST, encode_length(did));
00136
00137 string message;
00138 get_message(message, REPLY_DOCLENGTH);
00139 const char * p = message.c_str();
00140 const char * p_end = p + message.size();
00141 Xapian::doclength doclen = unserialise_double(&p, p_end);
00142 if (p != p_end || doclen < 0) {
00143 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00144 }
00145
00146 AutoPtr<NetworkTermList> tlist;
00147 tlist = new NetworkTermList(doclen, doccount,
00148 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00149 did);
00150 vector<NetworkTermListItem> & items = tlist->items;
00151
00152 char type;
00153 while ((type = get_message(message)) == REPLY_TERMLIST) {
00154 NetworkTermListItem item;
00155 p = message.data();
00156 p_end = p + message.size();
00157 item.wdf = decode_length(&p, p_end, false);
00158 item.termfreq = decode_length(&p, p_end, false);
00159 item.tname.assign(p, p_end);
00160 items.push_back(item);
00161 }
00162 if (type != REPLY_DONE) {
00163 throw Xapian::NetworkError("Bad message received", context);
00164 }
00165
00166 tlist->current_position = tlist->items.begin();
00167 return tlist.release();
00168 }
00169
00170 TermList *
00171 RemoteDatabase::open_allterms(const string & prefix) const {
00172
00173 if (!cached_stats_valid) update_stats();
00174
00175 send_message(MSG_ALLTERMS, prefix);
00176
00177 AutoPtr<NetworkTermList> tlist;
00178 tlist = new NetworkTermList(0.0, doccount,
00179 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00180 0);
00181 vector<NetworkTermListItem> & items = tlist->items;
00182
00183 string message;
00184 char type;
00185 while ((type = get_message(message)) == REPLY_ALLTERMS) {
00186 NetworkTermListItem item;
00187 const char * p = message.data();
00188 const char * p_end = p + message.size();
00189 item.termfreq = decode_length(&p, p_end, false);
00190 item.tname.assign(p, p_end);
00191 items.push_back(item);
00192 }
00193 if (type != REPLY_DONE) {
00194 throw Xapian::NetworkError("Bad message received", context);
00195 }
00196
00197 tlist->current_position = tlist->items.begin();
00198 return tlist.release();
00199 }
00200
00201 LeafPostList *
00202 RemoteDatabase::open_post_list(const string &term) const
00203 {
00204 return new NetworkPostList(Xapian::Internal::RefCntPtr<const RemoteDatabase>(this), term);
00205 }
00206
00207 Xapian::doccount
00208 RemoteDatabase::read_post_list(const string &term, NetworkPostList & pl) const
00209 {
00210 send_message(MSG_POSTLIST, term);
00211
00212 string message;
00213 char type;
00214 get_message(message, REPLY_POSTLISTSTART);
00215
00216 const char * p = message.data();
00217 const char * p_end = p + message.size();
00218 Xapian::doccount termfreq = decode_length(&p, p_end, false);
00219
00220 while ((type = get_message(message)) == REPLY_POSTLISTITEM) {
00221 pl.append_posting(message);
00222 }
00223 if (type != REPLY_DONE) {
00224 throw Xapian::NetworkError("Bad message received", context);
00225 }
00226
00227 return termfreq;
00228 }
00229
00230 PositionList *
00231 RemoteDatabase::open_position_list(Xapian::docid did, const string &term) const
00232 {
00233 send_message(MSG_POSITIONLIST, encode_length(did) + term);
00234
00235 vector<Xapian::termpos> positions;
00236
00237 string message;
00238 char type;
00239 Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00240 while ((type = get_message(message)) == REPLY_POSITIONLIST) {
00241 const char * p = message.data();
00242 const char * p_end = p + message.size();
00243 lastpos += decode_length(&p, p_end, false) + 1;
00244 positions.push_back(lastpos);
00245 }
00246 if (type != REPLY_DONE) {
00247 throw Xapian::NetworkError("Bad message received", context);
00248 }
00249
00250 return new InMemoryPositionList(positions);
00251 }
00252
00253 bool
00254 RemoteDatabase::has_positions() const
00255 {
00256 if (!cached_stats_valid) update_stats();
00257 return has_positional_info;
00258 }
00259
00260 void
00261 RemoteDatabase::reopen()
00262 {
00263 update_stats(MSG_REOPEN);
00264 }
00265
00266
00267
00268
00269
00270
00271
00272
00273 Xapian::Document::Internal *
00274 RemoteDatabase::open_document(Xapian::docid did, bool ) const
00275 {
00276 if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00277
00278 send_message(MSG_DOCUMENT, encode_length(did));
00279 string doc_data;
00280 map<Xapian::valueno, string> values;
00281 get_message(doc_data, REPLY_DOCDATA);
00282
00283 reply_type type;
00284 string message;
00285 while ((type = get_message(message)) == REPLY_VALUE) {
00286 const char * p = message.data();
00287 const char * p_end = p + message.size();
00288 Xapian::valueno valueno = decode_length(&p, p_end, false);
00289 values.insert(make_pair(valueno, string(p, p_end)));
00290 }
00291 if (type != REPLY_DONE) {
00292 throw Xapian::NetworkError("Bad message received", context);
00293 }
00294
00295 return new NetworkDocument(this, did, doc_data, values);
00296 }
00297
00298 void
00299 RemoteDatabase::update_stats(message_type msg_code) const
00300 {
00301 send_message(msg_code, "");
00302 string message;
00303 get_message(message, REPLY_UPDATE);
00304 const char * p = message.c_str();
00305 const char * p_end = p + message.size();
00306 doccount = decode_length(&p, p_end, false);
00307 lastdocid = decode_length(&p, p_end, false);
00308 if (p == p_end) {
00309 throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00310 }
00311 has_positional_info = (*p++ == '1');
00312 avlength = unserialise_double(&p, p_end);
00313 if (p != p_end || avlength < 0) {
00314 throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00315 }
00316 cached_stats_valid = true;
00317 }
00318
00319 Xapian::doccount
00320 RemoteDatabase::get_doccount() const
00321 {
00322 if (!cached_stats_valid) update_stats();
00323 return doccount;
00324 }
00325
00326 Xapian::docid
00327 RemoteDatabase::get_lastdocid() const
00328 {
00329 if (!cached_stats_valid) update_stats();
00330 return lastdocid;
00331 }
00332
00333 Xapian::doclength
00334 RemoteDatabase::get_avlength() const
00335 {
00336 if (!cached_stats_valid) update_stats();
00337 return avlength;
00338 }
00339
00340 bool
00341 RemoteDatabase::term_exists(const string & tname) const
00342 {
00343 Assert(!tname.empty());
00344 send_message(MSG_TERMEXISTS, tname);
00345 string message;
00346 reply_type type = get_message(message);
00347 if (type != REPLY_TERMEXISTS && type != REPLY_TERMDOESNTEXIST) {
00348 throw Xapian::NetworkError("Bad message received", context);
00349 }
00350 return (type == REPLY_TERMEXISTS);
00351 }
00352
00353 Xapian::doccount
00354 RemoteDatabase::get_termfreq(const string & tname) const
00355 {
00356 Assert(!tname.empty());
00357 send_message(MSG_TERMFREQ, tname);
00358 string message;
00359 get_message(message, REPLY_TERMFREQ);
00360 const char * p = message.data();
00361 const char * p_end = p + message.size();
00362 return decode_length(&p, p_end, false);
00363 }
00364
00365 Xapian::termcount
00366 RemoteDatabase::get_collection_freq(const string & tname) const
00367 {
00368 Assert(!tname.empty());
00369 send_message(MSG_COLLFREQ, tname);
00370 string message;
00371 get_message(message, REPLY_COLLFREQ);
00372 const char * p = message.data();
00373 const char * p_end = p + message.size();
00374 return decode_length(&p, p_end, false);
00375 }
00376
00377 Xapian::doclength
00378 RemoteDatabase::get_doclength(Xapian::docid did) const
00379 {
00380 Assert(did != 0);
00381 send_message(MSG_DOCLENGTH, encode_length(did));
00382 string message;
00383 get_message(message, REPLY_DOCLENGTH);
00384 const char * p = message.c_str();
00385 const char * p_end = p + message.size();
00386 Xapian::doclength doclen = unserialise_double(&p, p_end);
00387 if (p != p_end || doclen < 0) {
00388 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00389 }
00390 return doclen;
00391 }
00392
00393 reply_type
00394 RemoteDatabase::get_message(string &result, reply_type required_type) const
00395 {
00396 OmTime end_time;
00397 if (timeout) end_time = OmTime::now() + timeout;
00398
00399 reply_type type = static_cast<reply_type>(link.get_message(result, end_time));
00400 if (type == REPLY_EXCEPTION) {
00401 unserialise_error(result, "REMOTE:", context);
00402 }
00403 if (required_type != REPLY_MAX && type != required_type) {
00404 string errmsg("Expecting reply type ");
00405 errmsg += om_tostring(int(required_type));
00406 errmsg += ", got ";
00407 errmsg += om_tostring(int(type));
00408 throw Xapian::NetworkError(errmsg);
00409 }
00410
00411 return type;
00412 }
00413
00414 void
00415 RemoteDatabase::send_message(message_type type, const string &message) const
00416 {
00417 OmTime end_time;
00418 if (timeout) end_time = OmTime::now() + timeout;
00419
00420 link.send_message(static_cast<unsigned char>(type), message, end_time);
00421 }
00422
00423 void
00424 RemoteDatabase::do_close()
00425 {
00426
00427
00428
00429 bool writable = (transaction_state != TRANSACTION_UNIMPLEMENTED);
00430
00431
00432 if (writable) dtor_called();
00433
00434
00435
00436
00437
00438 link.do_close(writable);
00439 }
00440
00441 void
00442 RemoteDatabase::set_query(const Xapian::Query::Internal *query,
00443 Xapian::termcount qlen,
00444 Xapian::valueno collapse_key,
00445 Xapian::Enquire::docid_order order,
00446 Xapian::valueno sort_key,
00447 Xapian::Enquire::Internal::sort_setting sort_by,
00448 bool sort_value_forward,
00449 int percent_cutoff, Xapian::weight weight_cutoff,
00450 const Xapian::Weight *wtscheme,
00451 const Xapian::RSet &omrset)
00452 {
00453 string tmp = query->serialise();
00454 string message = encode_length(tmp.size());
00455 message += tmp;
00456
00457
00458 message += encode_length(qlen);
00459 message += encode_length(collapse_key);
00460 message += char('0' + order);
00461 message += encode_length(sort_key);
00462 message += char('0' + sort_by);
00463 message += char('0' + sort_value_forward);
00464 message += char(percent_cutoff);
00465 message += serialise_double(weight_cutoff);
00466
00467 tmp = wtscheme->name();
00468 message += encode_length(tmp.size());
00469 message += tmp;
00470
00471 tmp = wtscheme->serialise();
00472 message += encode_length(tmp.size());
00473 message += tmp;
00474
00475 message += serialise_rset(omrset);
00476
00477 send_message(MSG_QUERY, message);
00478 }
00479
00480 bool
00481 RemoteDatabase::get_remote_stats(bool nowait, Stats &out)
00482 {
00483 if (nowait && !link.ready_to_read()) return false;
00484
00485 string message;
00486 get_message(message, REPLY_STATS);
00487 out = unserialise_stats(message);
00488
00489 return true;
00490 }
00491
00492 void
00493 RemoteDatabase::send_global_stats(Xapian::doccount first,
00494 Xapian::doccount maxitems,
00495 Xapian::doccount check_at_least,
00496 const Stats &stats)
00497 {
00498 string message = encode_length(first);
00499 message += encode_length(maxitems);
00500 message += encode_length(check_at_least);
00501 message += serialise_stats(stats);
00502 send_message(MSG_GETMSET, message);
00503 }
00504
00505 void
00506 RemoteDatabase::get_mset(Xapian::MSet &mset)
00507 {
00508 string message;
00509 get_message(message, REPLY_RESULTS);
00510 mset = unserialise_mset(message);
00511 }
00512
00513 void
00514 RemoteDatabase::flush()
00515 {
00516 send_message(MSG_FLUSH, "");
00517
00518
00519 string message;
00520 get_message(message, REPLY_DONE);
00521 }
00522
00523 void
00524 RemoteDatabase::cancel()
00525 {
00526 cached_stats_valid = false;
00527
00528 send_message(MSG_CANCEL, "");
00529 }
00530
00531 Xapian::docid
00532 RemoteDatabase::add_document(const Xapian::Document & doc)
00533 {
00534 cached_stats_valid = false;
00535
00536 send_message(MSG_ADDDOCUMENT, serialise_document(doc));
00537
00538 string message;
00539 get_message(message, REPLY_ADDDOCUMENT);
00540
00541 const char * p = message.data();
00542 const char * p_end = p + message.size();
00543 return decode_length(&p, p_end, false);
00544 }
00545
00546 void
00547 RemoteDatabase::delete_document(Xapian::docid did)
00548 {
00549 cached_stats_valid = false;
00550
00551
00552 send_message(MSG_DELETEDOCUMENT, encode_length(did));
00553 string dummy;
00554 get_message(dummy, REPLY_DONE);
00555 }
00556
00557 void
00558 RemoteDatabase::delete_document(const std::string & unique_term)
00559 {
00560 cached_stats_valid = false;
00561
00562 send_message(MSG_DELETEDOCUMENTTERM, unique_term);
00563 }
00564
00565 void
00566 RemoteDatabase::replace_document(Xapian::docid did,
00567 const Xapian::Document & doc)
00568 {
00569 cached_stats_valid = false;
00570
00571 string message = encode_length(did);
00572 message += serialise_document(doc);
00573
00574 send_message(MSG_REPLACEDOCUMENT, message);
00575 }
00576
00577 Xapian::docid
00578 RemoteDatabase::replace_document(const std::string & unique_term,
00579 const Xapian::Document & doc)
00580 {
00581 cached_stats_valid = false;
00582
00583 string message = encode_length(unique_term.size());
00584 message += unique_term;
00585 message += serialise_document(doc);
00586
00587 send_message(MSG_REPLACEDOCUMENTTERM, message);
00588
00589 get_message(message, REPLY_ADDDOCUMENT);
00590
00591 const char * p = message.data();
00592 const char * p_end = p + message.size();
00593 return decode_length(&p, p_end, false);
00594 }