00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include "remote-database.h"
00025
00026 #include "safeerrno.h"
00027 #include <signal.h>
00028
00029 #include "autoptr.h"
00030 #include "emptypostlist.h"
00031 #include "inmemory_positionlist.h"
00032 #include "net_postlist.h"
00033 #include "net_termlist.h"
00034 #include "remote-document.h"
00035 #include "omassert.h"
00036 #include "realtime.h"
00037 #include "serialise.h"
00038 #include "serialise-double.h"
00039 #include "str.h"
00040 #include "stringutils.h"
00041 #include "weightinternal.h"
00042
00043 #include <string>
00044 #include <vector>
00045
00046 #include "xapian/error.h"
00047 #include "xapian/matchspy.h"
00048
00049 using namespace std;
00050
00051 RemoteDatabase::RemoteDatabase(int fd, double timeout_,
00052 const string & context_, bool writable)
00053 : link(fd, fd, context_),
00054 context(context_),
00055 cached_stats_valid(),
00056 mru_valstats(),
00057 mru_slot(Xapian::BAD_VALUENO),
00058 timeout(timeout_)
00059 {
00060 #ifndef __WIN32__
00061
00062
00063 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
00064 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00065 }
00066 #endif
00067
00068 if (!writable) {
00069
00070
00071
00072
00073
00074 transaction_state = TRANSACTION_UNIMPLEMENTED;
00075 }
00076
00077 string message;
00078 char type = get_message(message);
00079
00080 if (reply_type(type) != REPLY_GREETING || message.size() < 3) {
00081 if (type == 'O' && message.size() == size_t('M') && message[0] == ' ') {
00082
00083
00084
00085 throw Xapian::NetworkError("Server protocol version too old", context);
00086 }
00087 throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context);
00088 }
00089
00090 const char *p = message.c_str();
00091 const char *p_end = p + message.size();
00092
00093
00094
00095 int protocol_major = static_cast<unsigned char>(*p++);
00096 int protocol_minor = static_cast<unsigned char>(*p++);
00097 if (protocol_major != XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION ||
00098 protocol_minor < XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION) {
00099 string errmsg("Unknown protocol version ");
00100 errmsg += str(protocol_major);
00101 errmsg += '.';
00102 errmsg += str(protocol_minor);
00103 errmsg += " ("STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION)"."STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION)" supported)";
00104 throw Xapian::NetworkError(errmsg, context);
00105 }
00106
00107 apply_stats_update(p, p_end);
00108
00109 if (writable) update_stats(MSG_WRITEACCESS);
00110 }
00111
00112 RemoteDatabase *
00113 RemoteDatabase::as_remotedatabase()
00114 {
00115 return this;
00116 }
00117
00118 void
00119 RemoteDatabase::keep_alive()
00120 {
00121 send_message(MSG_KEEPALIVE, string());
00122 string message;
00123 get_message(message, REPLY_DONE);
00124 }
00125
00126 TermList *
00127 RemoteDatabase::open_metadata_keylist(const std::string &prefix) const
00128 {
00129
00130 if (!cached_stats_valid) update_stats();
00131
00132 send_message(MSG_METADATAKEYLIST, prefix);
00133
00134 string message;
00135 AutoPtr<NetworkTermList> tlist(
00136 new NetworkTermList(0, doccount,
00137 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00138 0));
00139 vector<NetworkTermListItem> & items = tlist->items;
00140
00141 char type;
00142 while ((type = get_message(message)) == REPLY_METADATAKEYLIST) {
00143 NetworkTermListItem item;
00144 item.tname = message;
00145 items.push_back(item);
00146 }
00147 if (type != REPLY_DONE) {
00148 throw Xapian::NetworkError("Bad message received", context);
00149 }
00150
00151 tlist->current_position = tlist->items.begin();
00152 return tlist.release();
00153 }
00154
00155 TermList *
00156 RemoteDatabase::open_term_list(Xapian::docid did) const
00157 {
00158 Assert(did);
00159
00160
00161 if (!cached_stats_valid) update_stats();
00162
00163 send_message(MSG_TERMLIST, encode_length(did));
00164
00165 string message;
00166 get_message(message, REPLY_DOCLENGTH);
00167 const char * p = message.c_str();
00168 const char * p_end = p + message.size();
00169 Xapian::termcount doclen = decode_length(&p, p_end, false);
00170 if (p != p_end) {
00171 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00172 }
00173
00174 AutoPtr<NetworkTermList> tlist(
00175 new NetworkTermList(doclen, doccount,
00176 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00177 did));
00178 vector<NetworkTermListItem> & items = tlist->items;
00179
00180 char type;
00181 while ((type = get_message(message)) == REPLY_TERMLIST) {
00182 NetworkTermListItem item;
00183 p = message.data();
00184 p_end = p + message.size();
00185 item.wdf = decode_length(&p, p_end, false);
00186 item.termfreq = decode_length(&p, p_end, false);
00187 item.tname.assign(p, p_end);
00188 items.push_back(item);
00189 }
00190 if (type != REPLY_DONE) {
00191 throw Xapian::NetworkError("Bad message received", context);
00192 }
00193
00194 tlist->current_position = tlist->items.begin();
00195 return tlist.release();
00196 }
00197
00198 TermList *
00199 RemoteDatabase::open_allterms(const string & prefix) const {
00200
00201 if (!cached_stats_valid) update_stats();
00202
00203 send_message(MSG_ALLTERMS, prefix);
00204
00205 AutoPtr<NetworkTermList> tlist(
00206 new NetworkTermList(0, doccount,
00207 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00208 0));
00209 vector<NetworkTermListItem> & items = tlist->items;
00210
00211 string message;
00212 char type;
00213 while ((type = get_message(message)) == REPLY_ALLTERMS) {
00214 NetworkTermListItem item;
00215 const char * p = message.data();
00216 const char * p_end = p + message.size();
00217 item.termfreq = decode_length(&p, p_end, false);
00218 item.tname.assign(p, p_end);
00219 items.push_back(item);
00220 }
00221 if (type != REPLY_DONE) {
00222 throw Xapian::NetworkError("Bad message received", context);
00223 }
00224
00225 tlist->current_position = tlist->items.begin();
00226 return tlist.release();
00227 }
00228
00229 LeafPostList *
00230 RemoteDatabase::open_post_list(const string &term) const
00231 {
00232 return new NetworkPostList(Xapian::Internal::RefCntPtr<const RemoteDatabase>(this), term);
00233 }
00234
00235 Xapian::doccount
00236 RemoteDatabase::read_post_list(const string &term, NetworkPostList & pl) const
00237 {
00238 send_message(MSG_POSTLIST, term);
00239
00240 string message;
00241 char type;
00242 get_message(message, REPLY_POSTLISTSTART);
00243
00244 const char * p = message.data();
00245 const char * p_end = p + message.size();
00246 Xapian::doccount termfreq = decode_length(&p, p_end, false);
00247
00248 while ((type = get_message(message)) == REPLY_POSTLISTITEM) {
00249 pl.append_posting(message);
00250 }
00251 if (type != REPLY_DONE) {
00252 throw Xapian::NetworkError("Bad message received", context);
00253 }
00254
00255 return termfreq;
00256 }
00257
00258 PositionList *
00259 RemoteDatabase::open_position_list(Xapian::docid did, const string &term) const
00260 {
00261 send_message(MSG_POSITIONLIST, encode_length(did) + term);
00262
00263 vector<Xapian::termpos> positions;
00264
00265 string message;
00266 char type;
00267 Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00268 while ((type = get_message(message)) == REPLY_POSITIONLIST) {
00269 const char * p = message.data();
00270 const char * p_end = p + message.size();
00271 lastpos += decode_length(&p, p_end, false) + 1;
00272 positions.push_back(lastpos);
00273 }
00274 if (type != REPLY_DONE) {
00275 throw Xapian::NetworkError("Bad message received", context);
00276 }
00277
00278 return new InMemoryPositionList(positions);
00279 }
00280
00281 bool
00282 RemoteDatabase::has_positions() const
00283 {
00284 if (!cached_stats_valid) update_stats();
00285 return has_positional_info;
00286 }
00287
00288 void
00289 RemoteDatabase::reopen()
00290 {
00291 update_stats(MSG_REOPEN);
00292 mru_slot = Xapian::BAD_VALUENO;
00293 }
00294
00295 void
00296 RemoteDatabase::close()
00297 {
00298 do_close();
00299 }
00300
00301
00302
00303
00304
00305
00306
00307
00308 Xapian::Document::Internal *
00309 RemoteDatabase::open_document(Xapian::docid did, bool ) const
00310 {
00311 Assert(did);
00312
00313 send_message(MSG_DOCUMENT, encode_length(did));
00314 string doc_data;
00315 map<Xapian::valueno, string> values;
00316 get_message(doc_data, REPLY_DOCDATA);
00317
00318 reply_type type;
00319 string message;
00320 while ((type = get_message(message)) == REPLY_VALUE) {
00321 const char * p = message.data();
00322 const char * p_end = p + message.size();
00323 Xapian::valueno slot = decode_length(&p, p_end, false);
00324 values.insert(make_pair(slot, string(p, p_end)));
00325 }
00326 if (type != REPLY_DONE) {
00327 throw Xapian::NetworkError("Bad message received", context);
00328 }
00329
00330 return new RemoteDocument(this, did, doc_data, values);
00331 }
00332
00333 void
00334 RemoteDatabase::update_stats(message_type msg_code) const
00335 {
00336 send_message(msg_code, string());
00337 string message;
00338 get_message(message, REPLY_UPDATE);
00339 const char * p = message.c_str();
00340 const char * p_end = p + message.size();
00341 apply_stats_update(p, p_end);
00342 }
00343
00344 void
00345 RemoteDatabase::apply_stats_update(const char * p, const char * p_end) const
00346 {
00347 doccount = decode_length(&p, p_end, false);
00348 lastdocid = decode_length(&p, p_end, false);
00349 doclen_lbound = decode_length(&p, p_end, false);
00350 doclen_ubound = decode_length(&p, p_end, false);
00351 if (p == p_end) {
00352 throw Xapian::NetworkError("Bad stats update message received", context);
00353 }
00354 has_positional_info = (*p++ == '1');
00355 total_length = decode_length(&p, p_end, false);
00356 uuid.assign(p, p_end);
00357 cached_stats_valid = true;
00358 }
00359
00360 Xapian::doccount
00361 RemoteDatabase::get_doccount() const
00362 {
00363 if (!cached_stats_valid) update_stats();
00364 return doccount;
00365 }
00366
00367 Xapian::docid
00368 RemoteDatabase::get_lastdocid() const
00369 {
00370 if (!cached_stats_valid) update_stats();
00371 return lastdocid;
00372 }
00373
00374 totlen_t
00375 RemoteDatabase::get_total_length() const
00376 {
00377 if (!cached_stats_valid) update_stats();
00378 return total_length;
00379 }
00380
00381 Xapian::doclength
00382 RemoteDatabase::get_avlength() const
00383 {
00384 if (!cached_stats_valid) update_stats();
00385 return Xapian::doclength(total_length) / doccount;
00386 }
00387
00388 bool
00389 RemoteDatabase::term_exists(const string & tname) const
00390 {
00391 Assert(!tname.empty());
00392 send_message(MSG_TERMEXISTS, tname);
00393 string message;
00394 reply_type type = get_message(message);
00395 if (type != REPLY_TERMEXISTS && type != REPLY_TERMDOESNTEXIST) {
00396 throw Xapian::NetworkError("Bad message received", context);
00397 }
00398 return (type == REPLY_TERMEXISTS);
00399 }
00400
00401 Xapian::doccount
00402 RemoteDatabase::get_termfreq(const string & tname) const
00403 {
00404 Assert(!tname.empty());
00405 send_message(MSG_TERMFREQ, tname);
00406 string message;
00407 get_message(message, REPLY_TERMFREQ);
00408 const char * p = message.data();
00409 const char * p_end = p + message.size();
00410 return decode_length(&p, p_end, false);
00411 }
00412
00413 Xapian::termcount
00414 RemoteDatabase::get_collection_freq(const string & tname) const
00415 {
00416 Assert(!tname.empty());
00417 send_message(MSG_COLLFREQ, tname);
00418 string message;
00419 get_message(message, REPLY_COLLFREQ);
00420 const char * p = message.data();
00421 const char * p_end = p + message.size();
00422 return decode_length(&p, p_end, false);
00423 }
00424
00425
00426 void
00427 RemoteDatabase::read_value_stats(Xapian::valueno slot) const
00428 {
00429 if (mru_slot != slot) {
00430 send_message(MSG_VALUESTATS, encode_length(slot));
00431 string message;
00432 get_message(message, REPLY_VALUESTATS);
00433 const char * p = message.data();
00434 const char * p_end = p + message.size();
00435 mru_slot = slot;
00436 mru_valstats.freq = decode_length(&p, p_end, false);
00437 size_t len = decode_length(&p, p_end, true);
00438 mru_valstats.lower_bound.assign(p, len);
00439 p += len;
00440 len = decode_length(&p, p_end, true);
00441 mru_valstats.upper_bound.assign(p, len);
00442 p += len;
00443 if (p != p_end) {
00444 throw Xapian::NetworkError("Bad REPLY_VALUESTATS message received", context);
00445 }
00446 }
00447 }
00448
00449 Xapian::doccount
00450 RemoteDatabase::get_value_freq(Xapian::valueno slot) const
00451 {
00452 read_value_stats(slot);
00453 return mru_valstats.freq;
00454 }
00455
00456 std::string
00457 RemoteDatabase::get_value_lower_bound(Xapian::valueno slot) const
00458 {
00459 read_value_stats(slot);
00460 return mru_valstats.lower_bound;
00461 }
00462
00463 std::string
00464 RemoteDatabase::get_value_upper_bound(Xapian::valueno slot) const
00465 {
00466 read_value_stats(slot);
00467 return mru_valstats.upper_bound;
00468 }
00469
00470 Xapian::termcount
00471 RemoteDatabase::get_doclength_lower_bound() const
00472 {
00473 return doclen_lbound;
00474 }
00475
00476 Xapian::termcount
00477 RemoteDatabase::get_doclength_upper_bound() const
00478 {
00479 return doclen_ubound;
00480 }
00481
00482 Xapian::termcount
00483 RemoteDatabase::get_wdf_upper_bound(const string &) const
00484 {
00485
00486
00487
00488
00489 return doclen_ubound;
00490 }
00491
00492 Xapian::termcount
00493 RemoteDatabase::get_doclength(Xapian::docid did) const
00494 {
00495 Assert(did != 0);
00496 send_message(MSG_DOCLENGTH, encode_length(did));
00497 string message;
00498 get_message(message, REPLY_DOCLENGTH);
00499 const char * p = message.c_str();
00500 const char * p_end = p + message.size();
00501 Xapian::termcount doclen = decode_length(&p, p_end, false);
00502 if (p != p_end) {
00503 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00504 }
00505 return doclen;
00506 }
00507
00508 reply_type
00509 RemoteDatabase::get_message(string &result, reply_type required_type) const
00510 {
00511 double end_time = RealTime::end_time(timeout);
00512 reply_type type = static_cast<reply_type>(link.get_message(result, end_time));
00513 if (type == REPLY_EXCEPTION) {
00514 unserialise_error(result, "REMOTE:", context);
00515 }
00516 if (required_type != REPLY_MAX && type != required_type) {
00517 string errmsg("Expecting reply type ");
00518 errmsg += str(int(required_type));
00519 errmsg += ", got ";
00520 errmsg += str(int(type));
00521 throw Xapian::NetworkError(errmsg);
00522 }
00523
00524 return type;
00525 }
00526
00527 void
00528 RemoteDatabase::send_message(message_type type, const string &message) const
00529 {
00530 double end_time = RealTime::end_time(timeout);
00531 link.send_message(static_cast<unsigned char>(type), message, end_time);
00532 }
00533
00534 void
00535 RemoteDatabase::do_close()
00536 {
00537
00538
00539
00540 bool writable = (transaction_state != TRANSACTION_UNIMPLEMENTED);
00541
00542
00543 if (writable) dtor_called();
00544
00545
00546
00547
00548
00549 link.do_close(writable);
00550 }
00551
00552 void
00553 RemoteDatabase::set_query(const Xapian::Query::Internal *query,
00554 Xapian::termcount qlen,
00555 Xapian::doccount collapse_max,
00556 Xapian::valueno collapse_key,
00557 Xapian::Enquire::docid_order order,
00558 Xapian::valueno sort_key,
00559 Xapian::Enquire::Internal::sort_setting sort_by,
00560 bool sort_value_forward,
00561 int percent_cutoff, Xapian::weight weight_cutoff,
00562 const Xapian::Weight *wtscheme,
00563 const Xapian::RSet &omrset,
00564 const vector<Xapian::MatchSpy *> & matchspies)
00565 {
00566 string tmp = query->serialise();
00567 string message = encode_length(tmp.size());
00568 message += tmp;
00569
00570
00571 message += encode_length(qlen);
00572 message += encode_length(collapse_max);
00573 if (collapse_max) message += encode_length(collapse_key);
00574 message += char('0' + order);
00575 message += encode_length(sort_key);
00576 message += char('0' + sort_by);
00577 message += char('0' + sort_value_forward);
00578 message += char(percent_cutoff);
00579 message += serialise_double(weight_cutoff);
00580
00581 tmp = wtscheme->name();
00582 message += encode_length(tmp.size());
00583 message += tmp;
00584
00585 tmp = wtscheme->serialise();
00586 message += encode_length(tmp.size());
00587 message += tmp;
00588
00589 tmp = serialise_rset(omrset);
00590 message += encode_length(tmp.size());
00591 message += tmp;
00592
00593 vector<Xapian::MatchSpy *>::const_iterator i;
00594 for (i = matchspies.begin(); i != matchspies.end(); ++i) {
00595 tmp = (*i)->name();
00596 if (tmp.empty()) {
00597 throw Xapian::UnimplementedError("MatchSpy not suitable for use with remote searches - name() method returned empty string");
00598 }
00599 message += encode_length(tmp.size());
00600 message += tmp;
00601
00602 tmp = (*i)->serialise();
00603 message += encode_length(tmp.size());
00604 message += tmp;
00605 }
00606
00607 send_message(MSG_QUERY, message);
00608 }
00609
00610 bool
00611 RemoteDatabase::get_remote_stats(bool nowait, Xapian::Weight::Internal &out)
00612 {
00613 if (nowait && !link.ready_to_read()) return false;
00614
00615 string message;
00616 get_message(message, REPLY_STATS);
00617 out = unserialise_stats(message);
00618
00619 return true;
00620 }
00621
00622 void
00623 RemoteDatabase::send_global_stats(Xapian::doccount first,
00624 Xapian::doccount maxitems,
00625 Xapian::doccount check_at_least,
00626 const Xapian::Weight::Internal &stats)
00627 {
00628 string message = encode_length(first);
00629 message += encode_length(maxitems);
00630 message += encode_length(check_at_least);
00631 message += serialise_stats(stats);
00632 send_message(MSG_GETMSET, message);
00633 }
00634
00635 void
00636 RemoteDatabase::get_mset(Xapian::MSet &mset,
00637 const vector<Xapian::MatchSpy *> & matchspies)
00638 {
00639 string message;
00640 get_message(message, REPLY_RESULTS);
00641 const char * p = message.data();
00642 const char * p_end = p + message.size();
00643
00644 vector<Xapian::MatchSpy *>::const_iterator i;
00645 for (i = matchspies.begin(); i != matchspies.end(); ++i) {
00646 if (p == p_end)
00647 throw Xapian::NetworkError("Expected serialised matchspy");
00648 size_t len = decode_length(&p, p_end, true);
00649 string spyresults(p, len);
00650 p += len;
00651 (*i)->merge_results(spyresults);
00652 }
00653 mset = unserialise_mset(p, p_end);
00654 }
00655
00656 void
00657 RemoteDatabase::commit()
00658 {
00659 send_message(MSG_COMMIT, string());
00660
00661
00662 string message;
00663 get_message(message, REPLY_DONE);
00664 }
00665
00666 void
00667 RemoteDatabase::cancel()
00668 {
00669 cached_stats_valid = false;
00670 mru_slot = Xapian::BAD_VALUENO;
00671
00672 send_message(MSG_CANCEL, string());
00673 }
00674
00675 Xapian::docid
00676 RemoteDatabase::add_document(const Xapian::Document & doc)
00677 {
00678 cached_stats_valid = false;
00679 mru_slot = Xapian::BAD_VALUENO;
00680
00681 send_message(MSG_ADDDOCUMENT, serialise_document(doc));
00682
00683 string message;
00684 get_message(message, REPLY_ADDDOCUMENT);
00685
00686 const char * p = message.data();
00687 const char * p_end = p + message.size();
00688 return decode_length(&p, p_end, false);
00689 }
00690
00691 void
00692 RemoteDatabase::delete_document(Xapian::docid did)
00693 {
00694 cached_stats_valid = false;
00695 mru_slot = Xapian::BAD_VALUENO;
00696
00697 send_message(MSG_DELETEDOCUMENT, encode_length(did));
00698 string dummy;
00699 get_message(dummy, REPLY_DONE);
00700 }
00701
00702 void
00703 RemoteDatabase::delete_document(const std::string & unique_term)
00704 {
00705 cached_stats_valid = false;
00706 mru_slot = Xapian::BAD_VALUENO;
00707
00708 send_message(MSG_DELETEDOCUMENTTERM, unique_term);
00709 }
00710
00711 void
00712 RemoteDatabase::replace_document(Xapian::docid did,
00713 const Xapian::Document & doc)
00714 {
00715 cached_stats_valid = false;
00716 mru_slot = Xapian::BAD_VALUENO;
00717
00718 string message = encode_length(did);
00719 message += serialise_document(doc);
00720
00721 send_message(MSG_REPLACEDOCUMENT, message);
00722 }
00723
00724 Xapian::docid
00725 RemoteDatabase::replace_document(const std::string & unique_term,
00726 const Xapian::Document & doc)
00727 {
00728 cached_stats_valid = false;
00729 mru_slot = Xapian::BAD_VALUENO;
00730
00731 string message = encode_length(unique_term.size());
00732 message += unique_term;
00733 message += serialise_document(doc);
00734
00735 send_message(MSG_REPLACEDOCUMENTTERM, message);
00736
00737 get_message(message, REPLY_ADDDOCUMENT);
00738
00739 const char * p = message.data();
00740 const char * p_end = p + message.size();
00741 return decode_length(&p, p_end, false);
00742 }
00743
00744 string
00745 RemoteDatabase::get_uuid() const
00746 {
00747 return uuid;
00748 }
00749
00750 string
00751 RemoteDatabase::get_metadata(const string & key) const
00752 {
00753 send_message(MSG_GETMETADATA, key);
00754 string metadata;
00755 get_message(metadata, REPLY_METADATA);
00756 return metadata;
00757 }
00758
00759 void
00760 RemoteDatabase::set_metadata(const string & key, const string & value)
00761 {
00762 string data = encode_length(key.size());
00763 data += key;
00764 data += value;
00765 send_message(MSG_SETMETADATA, data);
00766 }
00767
00768 void
00769 RemoteDatabase::add_spelling(const string & word,
00770 Xapian::termcount freqinc) const
00771 {
00772 string data = encode_length(freqinc);
00773 data += word;
00774 send_message(MSG_ADDSPELLING, data);
00775 }
00776
00777 void
00778 RemoteDatabase::remove_spelling(const string & word,
00779 Xapian::termcount freqdec) const
00780 {
00781 string data = encode_length(freqdec);
00782 data += word;
00783 send_message(MSG_REMOVESPELLING, data);
00784 }