00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023 #include "remoteserver.h"
00024
00025 #include "xapian/database.h"
00026 #include "xapian/enquire.h"
00027 #include "xapian/error.h"
00028 #include "xapian/matchspy.h"
00029 #include "xapian/valueiterator.h"
00030
00031 #include "safeerrno.h"
00032 #include <signal.h>
00033 #include <cstdlib>
00034
00035 #include "autoptr.h"
00036 #include "multimatch.h"
00037 #include "omassert.h"
00038 #include "realtime.h"
00039 #include "serialise.h"
00040 #include "serialise-double.h"
00041 #include "str.h"
00042 #include "weightinternal.h"
00043
00045 struct ConnectionClosed { };
00046
00047 RemoteServer::RemoteServer(const std::vector<std::string> &dbpaths,
00048 int fdin_, int fdout_,
00049 double active_timeout_, double idle_timeout_,
00050 bool writable_)
00051 : RemoteConnection(fdin_, fdout_, std::string()),
00052 db(NULL), wdb(NULL), writable(writable_),
00053 active_timeout(active_timeout_), idle_timeout(idle_timeout_)
00054 {
00055
00056 try {
00057 Assert(!dbpaths.empty());
00058
00059
00060
00061 db = new Xapian::Database(dbpaths[0]);
00062
00063
00064
00065 context = dbpaths[0];
00066
00067 if (!writable) {
00068 vector<std::string>::const_iterator i(dbpaths.begin());
00069 for (++i; i != dbpaths.end(); ++i) {
00070 db->add_database(Xapian::Database(*i));
00071 context += ' ';
00072 context += *i;
00073 }
00074 } else {
00075 AssertEq(dbpaths.size(), 1);
00076 }
00077 } catch (const Xapian::Error &err) {
00078
00079 send_message(REPLY_EXCEPTION, serialise_error(err));
00080
00081 throw;
00082 }
00083
00084 #ifndef __WIN32__
00085
00086
00087 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
00088 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00089 #endif
00090
00091
00092 string message;
00093 message += char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION);
00094 message += char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION);
00095 message += encode_length(db->get_doccount());
00096 message += encode_length(db->get_lastdocid());
00097 message += encode_length(db->get_doclength_lower_bound());
00098 message += encode_length(db->get_doclength_upper_bound());
00099 message += (db->has_positions() ? '1' : '0');
00100
00101 totlen_t total_len = totlen_t(db->get_avlength() * db->get_doccount() + .5);
00102 message += encode_length(total_len);
00103
00104 string uuid = db->get_uuid();
00105 message += uuid;
00106 send_message(REPLY_GREETING, message);
00107 }
00108
00109 RemoteServer::~RemoteServer()
00110 {
00111 delete db;
00112
00113 }
00114
00115 message_type
00116 RemoteServer::get_message(double timeout, string & result,
00117 message_type required_type)
00118 {
00119 double end_time = RealTime::end_time(timeout);
00120 unsigned int type = RemoteConnection::get_message(result, end_time);
00121
00122
00123 if (type == MSG_SHUTDOWN) throw ConnectionClosed();
00124 if (type >= MSG_MAX) {
00125 string errmsg("Invalid message type ");
00126 errmsg += str(type);
00127 throw Xapian::NetworkError(errmsg);
00128 }
00129 if (required_type != MSG_MAX && type != unsigned(required_type)) {
00130 string errmsg("Expecting message type ");
00131 errmsg += str(int(required_type));
00132 errmsg += ", got ";
00133 errmsg += str(int(type));
00134 throw Xapian::NetworkError(errmsg);
00135 }
00136 return static_cast<message_type>(type);
00137 }
00138
00139 void
00140 RemoteServer::send_message(reply_type type, const string &message)
00141 {
00142 double end_time = RealTime::end_time(active_timeout);
00143 unsigned char type_as_char = static_cast<unsigned char>(type);
00144 RemoteConnection::send_message(type_as_char, message, end_time);
00145 }
00146
00147 typedef void (RemoteServer::* dispatch_func)(const string &);
00148
00149 void
00150 RemoteServer::run()
00151 {
00152 while (true) {
00153 try {
00154
00155
00156
00157
00158
00159 static const dispatch_func dispatch[] = {
00160 &RemoteServer::msg_allterms,
00161 &RemoteServer::msg_collfreq,
00162 &RemoteServer::msg_document,
00163 &RemoteServer::msg_termexists,
00164 &RemoteServer::msg_termfreq,
00165 &RemoteServer::msg_valuestats,
00166 &RemoteServer::msg_keepalive,
00167 &RemoteServer::msg_doclength,
00168 &RemoteServer::msg_query,
00169 &RemoteServer::msg_termlist,
00170 &RemoteServer::msg_positionlist,
00171 &RemoteServer::msg_postlist,
00172 &RemoteServer::msg_reopen,
00173 &RemoteServer::msg_update,
00174 &RemoteServer::msg_adddocument,
00175 &RemoteServer::msg_cancel,
00176 &RemoteServer::msg_deletedocumentterm,
00177 &RemoteServer::msg_commit,
00178 &RemoteServer::msg_replacedocument,
00179 &RemoteServer::msg_replacedocumentterm,
00180 &RemoteServer::msg_deletedocument,
00181 &RemoteServer::msg_writeaccess,
00182 &RemoteServer::msg_getmetadata,
00183 &RemoteServer::msg_setmetadata,
00184 &RemoteServer::msg_addspelling,
00185 &RemoteServer::msg_removespelling,
00186 0,
00187 0,
00188 &RemoteServer::msg_openmetadatakeylist,
00189 };
00190
00191 string message;
00192 size_t type = get_message(idle_timeout, message);
00193 if (type >= sizeof(dispatch)/sizeof(dispatch[0]) || !dispatch[type]) {
00194 string errmsg("Unexpected message type ");
00195 errmsg += str(type);
00196 throw Xapian::InvalidArgumentError(errmsg);
00197 }
00198 (this->*(dispatch[type]))(message);
00199 } catch (const Xapian::NetworkTimeoutError & e) {
00200 try {
00201
00202
00203
00204 send_message(REPLY_EXCEPTION, serialise_error(e), 1.0);
00205 } catch (...) {
00206 }
00207
00208
00209 throw;
00210 } catch (const Xapian::NetworkError &) {
00211
00212
00213
00214
00215
00216 throw;
00217 } catch (const Xapian::Error &e) {
00218
00219
00220 send_message(REPLY_EXCEPTION, serialise_error(e));
00221 } catch (ConnectionClosed &) {
00222 return;
00223 } catch (...) {
00224
00225 send_message(REPLY_EXCEPTION, string());
00226
00227
00228 throw;
00229 }
00230 }
00231 }
00232
00233 void
00234 RemoteServer::msg_allterms(const string &message)
00235 {
00236 const string & prefix = message;
00237
00238 const Xapian::TermIterator end = db->allterms_end(prefix);
00239 for (Xapian::TermIterator t = db->allterms_begin(prefix); t != end; ++t) {
00240 string item = encode_length(t.get_termfreq());
00241 item += *t;
00242 send_message(REPLY_ALLTERMS, item);
00243 }
00244
00245 send_message(REPLY_DONE, string());
00246 }
00247
00248 void
00249 RemoteServer::msg_termlist(const string &message)
00250 {
00251 const char *p = message.data();
00252 const char *p_end = p + message.size();
00253 Xapian::docid did = decode_length(&p, p_end, false);
00254
00255 send_message(REPLY_DOCLENGTH, encode_length(db->get_doclength(did)));
00256 const Xapian::TermIterator end = db->termlist_end(did);
00257 for (Xapian::TermIterator t = db->termlist_begin(did); t != end; ++t) {
00258 string item = encode_length(t.get_wdf());
00259 item += encode_length(t.get_termfreq());
00260 item += *t;
00261 send_message(REPLY_TERMLIST, item);
00262 }
00263
00264 send_message(REPLY_DONE, string());
00265 }
00266
00267 void
00268 RemoteServer::msg_positionlist(const string &message)
00269 {
00270 const char *p = message.data();
00271 const char *p_end = p + message.size();
00272 Xapian::docid did = decode_length(&p, p_end, false);
00273 string term(p, p_end - p);
00274
00275 Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00276 const Xapian::PositionIterator end = db->positionlist_end(did, term);
00277 for (Xapian::PositionIterator i = db->positionlist_begin(did, term);
00278 i != end; ++i) {
00279 Xapian::termpos pos = *i;
00280 send_message(REPLY_POSITIONLIST, encode_length(pos - lastpos - 1));
00281 lastpos = pos;
00282 }
00283
00284 send_message(REPLY_DONE, string());
00285 }
00286
00287 void
00288 RemoteServer::msg_postlist(const string &message)
00289 {
00290 const string & term = message;
00291
00292 Xapian::doccount termfreq = db->get_termfreq(term);
00293 Xapian::termcount collfreq = db->get_collection_freq(term);
00294 send_message(REPLY_POSTLISTSTART, encode_length(termfreq) + encode_length(collfreq));
00295
00296 Xapian::docid lastdocid = 0;
00297 const Xapian::PostingIterator end = db->postlist_end(term);
00298 for (Xapian::PostingIterator i = db->postlist_begin(term);
00299 i != end; ++i) {
00300
00301 Xapian::docid newdocid = *i;
00302 string reply = encode_length(newdocid - lastdocid - 1);
00303 reply += encode_length(i.get_wdf());
00304
00305 send_message(REPLY_POSTLISTITEM, reply);
00306 lastdocid = newdocid;
00307 }
00308
00309 send_message(REPLY_DONE, string());
00310 }
00311
00312 void
00313 RemoteServer::msg_writeaccess(const string & msg)
00314 {
00315 if (!writable)
00316 throw Xapian::InvalidOperationError("Server is read-only");
00317
00318 wdb = new Xapian::WritableDatabase(context, Xapian::DB_OPEN);
00319 delete db;
00320 db = wdb;
00321 msg_update(msg);
00322 }
00323
00324 void
00325 RemoteServer::msg_reopen(const string & msg)
00326 {
00327 db->reopen();
00328 msg_update(msg);
00329 }
00330
00331 void
00332 RemoteServer::msg_update(const string &)
00333 {
00334 string message = encode_length(db->get_doccount());
00335 message += encode_length(db->get_lastdocid());
00336 message += encode_length(db->get_doclength_lower_bound());
00337 message += encode_length(db->get_doclength_upper_bound());
00338 message += (db->has_positions() ? '1' : '0');
00339
00340 totlen_t total_len = totlen_t(db->get_avlength() * db->get_doccount() + .5);
00341 message += encode_length(total_len);
00342
00343 string uuid = db->get_uuid();
00344 message += uuid;
00345 send_message(REPLY_UPDATE, message);
00346 }
00347
00353 struct MatchSpyList {
00354 vector<Xapian::MatchSpy *> spies;
00355
00356 ~MatchSpyList() {
00357 vector<Xapian::MatchSpy *>::const_iterator i;
00358 for (i = spies.begin(); i != spies.end(); ++i) {
00359 delete *i;
00360 }
00361 }
00362 };
00363
00364 void
00365 RemoteServer::msg_query(const string &message_in)
00366 {
00367 const char *p = message_in.c_str();
00368 const char *p_end = p + message_in.size();
00369 size_t len;
00370
00371
00372 len = decode_length(&p, p_end, true);
00373 AutoPtr<Xapian::Query::Internal> query(Xapian::Query::Internal::unserialise(string(p, len), reg));
00374 p += len;
00375
00376
00377 Xapian::termcount qlen = decode_length(&p, p_end, false);
00378
00379 Xapian::valueno collapse_max = decode_length(&p, p_end, false);
00380
00381 Xapian::valueno collapse_key = Xapian::BAD_VALUENO;
00382 if (collapse_max) collapse_key = decode_length(&p, p_end, false);
00383
00384 if (p_end - p < 4 || *p < '0' || *p > '2') {
00385 throw Xapian::NetworkError("bad message (docid_order)");
00386 }
00387 Xapian::Enquire::docid_order order;
00388 order = static_cast<Xapian::Enquire::docid_order>(*p++ - '0');
00389
00390 Xapian::valueno sort_key = decode_length(&p, p_end, false);
00391
00392 if (*p < '0' || *p > '3') {
00393 throw Xapian::NetworkError("bad message (sort_by)");
00394 }
00395 Xapian::Enquire::Internal::sort_setting sort_by;
00396 sort_by = static_cast<Xapian::Enquire::Internal::sort_setting>(*p++ - '0');
00397
00398 if (*p < '0' || *p > '1') {
00399 throw Xapian::NetworkError("bad message (sort_value_forward)");
00400 }
00401 bool sort_value_forward(*p++ != '0');
00402
00403 int percent_cutoff = *p++;
00404 if (percent_cutoff < 0 || percent_cutoff > 100) {
00405 throw Xapian::NetworkError("bad message (percent_cutoff)");
00406 }
00407
00408 Xapian::weight weight_cutoff = unserialise_double(&p, p_end);
00409 if (weight_cutoff < 0) {
00410 throw Xapian::NetworkError("bad message (weight_cutoff)");
00411 }
00412
00413
00414 len = decode_length(&p, p_end, true);
00415 string wtname(p, len);
00416 p += len;
00417
00418 const Xapian::Weight * wttype = reg.get_weighting_scheme(wtname);
00419 if (wttype == NULL) {
00420
00421
00422
00423 throw Xapian::InvalidArgumentError("Weighting scheme " +
00424 wtname + " not registered");
00425 }
00426
00427 len = decode_length(&p, p_end, true);
00428 AutoPtr<Xapian::Weight> wt(wttype->unserialise(string(p, len)));
00429 p += len;
00430
00431
00432 len = decode_length(&p, p_end, true);
00433 Xapian::RSet rset = unserialise_rset(string(p, len));
00434 p += len;
00435
00436
00437 MatchSpyList matchspies;
00438 while (p != p_end) {
00439 len = decode_length(&p, p_end, true);
00440 string spytype(p, len);
00441 const Xapian::MatchSpy * spyclass = reg.get_match_spy(spytype);
00442 if (spyclass == NULL) {
00443 throw Xapian::InvalidArgumentError("Match spy " + spytype +
00444 " not registered");
00445 }
00446 p += len;
00447
00448 len = decode_length(&p, p_end, true);
00449 matchspies.spies.push_back(spyclass->unserialise(string(p, len), reg));
00450 p += len;
00451 }
00452
00453 Xapian::Weight::Internal local_stats;
00454 MultiMatch match(*db, query.get(), qlen, &rset, collapse_max, collapse_key,
00455 percent_cutoff, weight_cutoff, order,
00456 sort_key, sort_by, sort_value_forward, NULL,
00457 local_stats, wt.get(), matchspies.spies, false, false);
00458
00459 send_message(REPLY_STATS, serialise_stats(local_stats));
00460
00461 string message;
00462 get_message(active_timeout, message, MSG_GETMSET);
00463 p = message.c_str();
00464 p_end = p + message.size();
00465
00466 Xapian::termcount first = decode_length(&p, p_end, false);
00467 Xapian::termcount maxitems = decode_length(&p, p_end, false);
00468
00469 Xapian::termcount check_at_least = 0;
00470 check_at_least = decode_length(&p, p_end, false);
00471
00472 message.erase(0, message.size() - (p_end - p));
00473 Xapian::Weight::Internal total_stats(unserialise_stats(message));
00474 total_stats.set_bounds_from_db(*db);
00475
00476 Xapian::MSet mset;
00477 match.get_mset(first, maxitems, check_at_least, mset, total_stats, 0, 0, 0);
00478
00479 message.resize(0);
00480 vector<Xapian::MatchSpy *>::const_iterator i;
00481 for (i = matchspies.spies.begin(); i != matchspies.spies.end(); ++i) {
00482 string spy_results = (*i)->serialise_results();
00483 message += encode_length(spy_results.size());
00484 message += spy_results;
00485 }
00486 message += serialise_mset(mset);
00487 send_message(REPLY_RESULTS, message);
00488 }
00489
00490 void
00491 RemoteServer::msg_document(const string &message)
00492 {
00493 const char *p = message.data();
00494 const char *p_end = p + message.size();
00495 Xapian::docid did = decode_length(&p, p_end, false);
00496
00497 Xapian::Document doc = db->get_document(did);
00498
00499 send_message(REPLY_DOCDATA, doc.get_data());
00500
00501 Xapian::ValueIterator i;
00502 for (i = doc.values_begin(); i != doc.values_end(); ++i) {
00503 string item = encode_length(i.get_valueno());
00504 item += *i;
00505 send_message(REPLY_VALUE, item);
00506 }
00507 send_message(REPLY_DONE, string());
00508 }
00509
00510 void
00511 RemoteServer::msg_keepalive(const string &)
00512 {
00513
00514 db->keep_alive();
00515 send_message(REPLY_DONE, string());
00516 }
00517
00518 void
00519 RemoteServer::msg_termexists(const string &term)
00520 {
00521 send_message((db->term_exists(term) ? REPLY_TERMEXISTS : REPLY_TERMDOESNTEXIST), string());
00522 }
00523
00524 void
00525 RemoteServer::msg_collfreq(const string &term)
00526 {
00527 send_message(REPLY_COLLFREQ, encode_length(db->get_collection_freq(term)));
00528 }
00529
00530 void
00531 RemoteServer::msg_termfreq(const string &term)
00532 {
00533 send_message(REPLY_TERMFREQ, encode_length(db->get_termfreq(term)));
00534 }
00535
00536 void
00537 RemoteServer::msg_valuestats(const string & message)
00538 {
00539 const char *p = message.data();
00540 const char *p_end = p + message.size();
00541 while (p != p_end) {
00542 Xapian::valueno slot = decode_length(&p, p_end, false);
00543 string message_out;
00544 message_out += encode_length(db->get_value_freq(slot));
00545 string bound = db->get_value_lower_bound(slot);
00546 message_out += encode_length(bound.size());
00547 message_out += bound;
00548 bound = db->get_value_upper_bound(slot);
00549 message_out += encode_length(bound.size());
00550 message_out += bound;
00551
00552 send_message(REPLY_VALUESTATS, message_out);
00553 }
00554 }
00555
00556 void
00557 RemoteServer::msg_doclength(const string &message)
00558 {
00559 const char *p = message.data();
00560 const char *p_end = p + message.size();
00561 Xapian::docid did = decode_length(&p, p_end, false);
00562 send_message(REPLY_DOCLENGTH, encode_length(db->get_doclength(did)));
00563 }
00564
00565 void
00566 RemoteServer::msg_commit(const string &)
00567 {
00568 if (!wdb)
00569 throw Xapian::InvalidOperationError("Server is read-only");
00570
00571 wdb->commit();
00572
00573 send_message(REPLY_DONE, string());
00574 }
00575
00576 void
00577 RemoteServer::msg_cancel(const string &)
00578 {
00579 if (!wdb)
00580 throw Xapian::InvalidOperationError("Server is read-only");
00581
00582
00583
00584 wdb->begin_transaction(false);
00585 wdb->cancel_transaction();
00586 }
00587
00588 void
00589 RemoteServer::msg_adddocument(const string & message)
00590 {
00591 if (!wdb)
00592 throw Xapian::InvalidOperationError("Server is read-only");
00593
00594 Xapian::docid did = wdb->add_document(unserialise_document(message));
00595
00596 send_message(REPLY_ADDDOCUMENT, encode_length(did));
00597 }
00598
00599 void
00600 RemoteServer::msg_deletedocument(const string & message)
00601 {
00602 if (!wdb)
00603 throw Xapian::InvalidOperationError("Server is read-only");
00604
00605 const char *p = message.data();
00606 const char *p_end = p + message.size();
00607 Xapian::docid did = decode_length(&p, p_end, false);
00608
00609 wdb->delete_document(did);
00610
00611 send_message(REPLY_DONE, string());
00612 }
00613
00614 void
00615 RemoteServer::msg_deletedocumentterm(const string & message)
00616 {
00617 if (!wdb)
00618 throw Xapian::InvalidOperationError("Server is read-only");
00619
00620 wdb->delete_document(message);
00621 }
00622
00623 void
00624 RemoteServer::msg_replacedocument(const string & message)
00625 {
00626 if (!wdb)
00627 throw Xapian::InvalidOperationError("Server is read-only");
00628
00629 const char *p = message.data();
00630 const char *p_end = p + message.size();
00631 Xapian::docid did = decode_length(&p, p_end, false);
00632
00633 wdb->replace_document(did, unserialise_document(string(p, p_end)));
00634 }
00635
00636 void
00637 RemoteServer::msg_replacedocumentterm(const string & message)
00638 {
00639 if (!wdb)
00640 throw Xapian::InvalidOperationError("Server is read-only");
00641
00642 const char *p = message.data();
00643 const char *p_end = p + message.size();
00644 size_t len = decode_length(&p, p_end, true);
00645 string unique_term(p, len);
00646 p += len;
00647
00648 Xapian::docid did = wdb->replace_document(unique_term, unserialise_document(string(p, p_end)));
00649
00650 send_message(REPLY_ADDDOCUMENT, encode_length(did));
00651 }
00652
00653 void
00654 RemoteServer::msg_getmetadata(const string & message)
00655 {
00656 send_message(REPLY_METADATA, db->get_metadata(message));
00657 }
00658
00659 void
00660 RemoteServer::msg_openmetadatakeylist(const string & message)
00661 {
00662 const Xapian::TermIterator end = db->metadata_keys_end(message);
00663 Xapian::TermIterator t = db->metadata_keys_begin(message);
00664 for (; t != end; ++t) {
00665 send_message(REPLY_METADATAKEYLIST, *t);
00666 }
00667
00668 send_message(REPLY_DONE, string());
00669 }
00670
00671 void
00672 RemoteServer::msg_setmetadata(const string & message)
00673 {
00674 if (!wdb)
00675 throw Xapian::InvalidOperationError("Server is read-only");
00676 const char *p = message.data();
00677 const char *p_end = p + message.size();
00678 size_t keylen = decode_length(&p, p_end, false);
00679 string key(p, keylen);
00680 p += keylen;
00681 string val(p, p_end - p);
00682 wdb->set_metadata(key, val);
00683 }
00684
00685 void
00686 RemoteServer::msg_addspelling(const string & message)
00687 {
00688 if (!wdb)
00689 throw Xapian::InvalidOperationError("Server is read-only");
00690 const char *p = message.data();
00691 const char *p_end = p + message.size();
00692 Xapian::termcount freqinc = decode_length(&p, p_end, false);
00693 wdb->add_spelling(string(p, p_end - p), freqinc);
00694 }
00695
00696 void
00697 RemoteServer::msg_removespelling(const string & message)
00698 {
00699 if (!wdb)
00700 throw Xapian::InvalidOperationError("Server is read-only");
00701 const char *p = message.data();
00702 const char *p_end = p + message.size();
00703 Xapian::termcount freqdec = decode_length(&p, p_end, false);
00704 wdb->remove_spelling(string(p, p_end - p), freqdec);
00705 }