backends/remote/remote-database.cc

Go to the documentation of this file.
00001 
00004 /* Copyright (C) 2006,2007 Olly Betts
00005  * Copyright (C) 2007 Lemur Consulting Ltd
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License as
00009  * published by the Free Software Foundation; either version 2 of the
00010  * License, or (at your option) any later version.
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with this program; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
00020  */
00021 
00022 #include <config.h>
00023 
00024 #include "safeerrno.h"
00025 #include <signal.h>
00026 
00027 #include <xapian/error.h>
00028 
00029 #include "autoptr.h"
00030 #include "emptypostlist.h"
00031 #include "inmemory_positionlist.h"
00032 #include "net_postlist.h"
00033 #include "net_termlist.h"
00034 #include "net_document.h"
00035 #include "omassert.h"
00036 #include "serialise.h"
00037 #include "serialise-double.h"
00038 #include "stats.h"
00039 #include "stringutils.h" // For STRINGIZE().
00040 #include "utils.h"
00041 
00042 #include <string>
00043 #include <vector>
00044 
00045 using namespace std;
00046 
00047 RemoteDatabase::RemoteDatabase(int fd, Xapian::timeout timeout_,
00048                                const string & context_, bool writable)
00049         : link(fd, fd, context_),
00050           context(context_),
00051           cached_stats_valid(),
00052           timeout(timeout_)
00053 {
00054 #ifndef __WIN32__
00055     // It's simplest to just ignore SIGPIPE.  We'll still know if the
00056     // connection dies because we'll get EPIPE back from write().
00057     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
00058         throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
00059     }
00060 #endif
00061 
00062     if (!writable) {
00063         // Transactions only make sense when writing, so flag them as
00064         // "unimplemented" so that our destructor doesn't call dtor_called()
00065         // since that might try to call flush() which will cause a message to
00066         // be sent to the remote server and probably an InvalidOperationError
00067         // exception message to be returned.
00068         transaction_state = TRANSACTION_UNIMPLEMENTED;
00069     }
00070 
00071     string message;
00072     char type = get_message(message);
00073 
00074     if (reply_type(type) != REPLY_GREETING || message.size() < 3) {
00075         if (type == 'O' && message.size() == size_t('M') && message[0] == ' ') {
00076             // The server reply used to start "OM ", which will now be
00077             // interpreted as a type 'O' message of length size_t('M')
00078             // with first character ' '.
00079             throw Xapian::NetworkError("Server protocol version too old", context);
00080         }
00081         throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context);
00082     }
00083 
00084     const char *p = message.c_str();
00085     const char *p_end = p + message.size();
00086 
00087     // The protocol major versions must match.  The protocol minor version of
00088     // the server must be >= that of the client.
00089     int protocol_major = static_cast<unsigned char>(*p++);
00090     int protocol_minor = static_cast<unsigned char>(*p++);
00091     if (protocol_major != XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION ||
00092         protocol_minor < XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION) {
00093         string errmsg("Unknown protocol version ");
00094         errmsg += om_tostring(protocol_major);
00095         errmsg += '.';
00096         errmsg += om_tostring(protocol_minor);
00097         errmsg += " ("STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION)"."STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION)" supported)";
00098         throw Xapian::NetworkError(errmsg, context);
00099     }
00100 
00101     doccount = decode_length(&p, p_end, false);
00102     lastdocid = decode_length(&p, p_end, false);
00103     if (p == p_end) {
00104         throw Xapian::NetworkError("Bad greeting message received (bool)", context);
00105     }
00106     has_positional_info = (*p++ == '1');
00107     avlength = unserialise_double(&p, p_end);
00108     if (p != p_end || avlength < 0) {
00109         throw Xapian::NetworkError("Bad greeting message received (double)", context);
00110     }
00111 }
00112 
00113 RemoteDatabase *
00114 RemoteDatabase::as_remotedatabase()
00115 {
00116     return this;
00117 }
00118 
00119 void
00120 RemoteDatabase::keep_alive()
00121 {
00122     send_message(MSG_KEEPALIVE, "");
00123     string message;
00124     get_message(message, REPLY_DONE);
00125 }
00126 
00127 TermList *
00128 RemoteDatabase::open_term_list(Xapian::docid did) const
00129 {
00130     if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00131 
00132     // Ensure that avlength and doccount are up-to-date.
00133     if (!cached_stats_valid) update_stats();
00134 
00135     send_message(MSG_TERMLIST, encode_length(did));
00136 
00137     string message;
00138     get_message(message, REPLY_DOCLENGTH);
00139     const char * p = message.c_str();
00140     const char * p_end = p + message.size();
00141     Xapian::doclength doclen = unserialise_double(&p, p_end);
00142     if (p != p_end || doclen < 0) {
00143         throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00144     }
00145 
00146     AutoPtr<NetworkTermList> tlist;
00147     tlist = new NetworkTermList(doclen, doccount,
00148                                 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00149                                 did);
00150     vector<NetworkTermListItem> & items = tlist->items;
00151 
00152     char type;
00153     while ((type = get_message(message)) == REPLY_TERMLIST) {
00154         NetworkTermListItem item;
00155         p = message.data();
00156         p_end = p + message.size();
00157         item.wdf = decode_length(&p, p_end, false);
00158         item.termfreq = decode_length(&p, p_end, false);
00159         item.tname.assign(p, p_end);
00160         items.push_back(item);
00161     }
00162     if (type != REPLY_DONE) {
00163         throw Xapian::NetworkError("Bad message received", context);
00164     }
00165 
00166     tlist->current_position = tlist->items.begin();
00167     return tlist.release();
00168 }
00169 
00170 TermList *
00171 RemoteDatabase::open_allterms(const string & prefix) const {
00172     // Ensure that avlength and doccount are up-to-date.
00173     if (!cached_stats_valid) update_stats();
00174 
00175     send_message(MSG_ALLTERMS, prefix);
00176 
00177     AutoPtr<NetworkTermList> tlist;
00178     tlist = new NetworkTermList(0.0, doccount,
00179                                 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
00180                                 0);
00181     vector<NetworkTermListItem> & items = tlist->items;
00182 
00183     string message;
00184     char type;
00185     while ((type = get_message(message)) == REPLY_ALLTERMS) {
00186         NetworkTermListItem item;
00187         const char * p = message.data();
00188         const char * p_end = p + message.size();
00189         item.termfreq = decode_length(&p, p_end, false);
00190         item.tname.assign(p, p_end);
00191         items.push_back(item);
00192     }
00193     if (type != REPLY_DONE) {
00194         throw Xapian::NetworkError("Bad message received", context);
00195     }
00196 
00197     tlist->current_position = tlist->items.begin();
00198     return tlist.release();
00199 }
00200 
00201 LeafPostList *
00202 RemoteDatabase::open_post_list(const string &term) const
00203 {
00204     return new NetworkPostList(Xapian::Internal::RefCntPtr<const RemoteDatabase>(this), term);
00205 }
00206 
00207 Xapian::doccount
00208 RemoteDatabase::read_post_list(const string &term, NetworkPostList & pl) const
00209 {
00210     send_message(MSG_POSTLIST, term);
00211 
00212     string message;
00213     char type;
00214     get_message(message, REPLY_POSTLISTSTART);
00215 
00216     const char * p = message.data();
00217     const char * p_end = p + message.size();
00218     Xapian::doccount termfreq = decode_length(&p, p_end, false);
00219 
00220     while ((type = get_message(message)) == REPLY_POSTLISTITEM) {
00221         pl.append_posting(message);
00222     }
00223     if (type != REPLY_DONE) {
00224         throw Xapian::NetworkError("Bad message received", context);
00225     }
00226 
00227     return termfreq;
00228 }
00229 
00230 PositionList *
00231 RemoteDatabase::open_position_list(Xapian::docid did, const string &term) const
00232 {
00233     send_message(MSG_POSITIONLIST, encode_length(did) + term);
00234 
00235     vector<Xapian::termpos> positions;
00236 
00237     string message;
00238     char type;
00239     Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
00240     while ((type = get_message(message)) == REPLY_POSITIONLIST) {
00241         const char * p = message.data();
00242         const char * p_end = p + message.size();
00243         lastpos += decode_length(&p, p_end, false) + 1;
00244         positions.push_back(lastpos);
00245     }
00246     if (type != REPLY_DONE) {
00247         throw Xapian::NetworkError("Bad message received", context);
00248     }
00249 
00250     return new InMemoryPositionList(positions);
00251 }
00252 
00253 bool
00254 RemoteDatabase::has_positions() const
00255 {
00256     if (!cached_stats_valid) update_stats();
00257     return has_positional_info;
00258 }
00259 
00260 void
00261 RemoteDatabase::reopen()
00262 {
00263     update_stats(MSG_REOPEN);
00264 }
00265 
00266 // Currently lazy is used when fetching documents from the MSet, and in three
00267 // cases in multimatch.cc.  One of the latter is when using a MatchDecider,
00268 // which we don't support with the remote backend currently.  The others are
00269 // for the sort key and collapse key which in the remote cases are fetched
00270 // during the remote match and passed across with the MSet.  So we can safely
00271 // ignore "lazy" here for now without any performance penalty during the match
00272 // process.
00273 Xapian::Document::Internal *
00274 RemoteDatabase::open_document(Xapian::docid did, bool /*lazy*/) const
00275 {
00276     if (did == 0) throw Xapian::InvalidArgumentError("Docid 0 invalid");
00277 
00278     send_message(MSG_DOCUMENT, encode_length(did));
00279     string doc_data;
00280     map<Xapian::valueno, string> values;
00281     get_message(doc_data, REPLY_DOCDATA);
00282 
00283     reply_type type;
00284     string message;
00285     while ((type = get_message(message)) == REPLY_VALUE) {
00286         const char * p = message.data();
00287         const char * p_end = p + message.size();
00288         Xapian::valueno valueno = decode_length(&p, p_end, false);
00289         values.insert(make_pair(valueno, string(p, p_end)));
00290     }
00291     if (type != REPLY_DONE) {
00292         throw Xapian::NetworkError("Bad message received", context);
00293     }
00294 
00295     return new NetworkDocument(this, did, doc_data, values);
00296 }
00297 
00298 void
00299 RemoteDatabase::update_stats(message_type msg_code) const
00300 {
00301     send_message(msg_code, "");
00302     string message;
00303     get_message(message, REPLY_UPDATE);
00304     const char * p = message.c_str();
00305     const char * p_end = p + message.size();
00306     doccount = decode_length(&p, p_end, false);
00307     lastdocid = decode_length(&p, p_end, false);
00308     if (p == p_end) {
00309         throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00310     }
00311     has_positional_info = (*p++ == '1');
00312     avlength = unserialise_double(&p, p_end);
00313     if (p != p_end || avlength < 0) {
00314         throw Xapian::NetworkError("Bad REPLY_UPDATE message received", context);
00315     }
00316     cached_stats_valid = true;
00317 }
00318 
00319 Xapian::doccount
00320 RemoteDatabase::get_doccount() const
00321 {
00322     if (!cached_stats_valid) update_stats();
00323     return doccount;
00324 }
00325 
00326 Xapian::docid
00327 RemoteDatabase::get_lastdocid() const
00328 {
00329     if (!cached_stats_valid) update_stats();
00330     return lastdocid;
00331 }
00332 
00333 Xapian::doclength
00334 RemoteDatabase::get_avlength() const
00335 {
00336     if (!cached_stats_valid) update_stats();
00337     return avlength;
00338 }
00339 
00340 bool
00341 RemoteDatabase::term_exists(const string & tname) const
00342 {
00343     Assert(!tname.empty());
00344     send_message(MSG_TERMEXISTS, tname);
00345     string message;
00346     reply_type type = get_message(message);
00347     if (type != REPLY_TERMEXISTS && type != REPLY_TERMDOESNTEXIST) {
00348         throw Xapian::NetworkError("Bad message received", context);
00349     }
00350     return (type == REPLY_TERMEXISTS);
00351 }
00352 
00353 Xapian::doccount
00354 RemoteDatabase::get_termfreq(const string & tname) const
00355 {
00356     Assert(!tname.empty());
00357     send_message(MSG_TERMFREQ, tname);
00358     string message;
00359     get_message(message, REPLY_TERMFREQ);
00360     const char * p = message.data();
00361     const char * p_end = p + message.size();
00362     return decode_length(&p, p_end, false);
00363 }
00364 
00365 Xapian::termcount
00366 RemoteDatabase::get_collection_freq(const string & tname) const
00367 {
00368     Assert(!tname.empty());
00369     send_message(MSG_COLLFREQ, tname);
00370     string message;
00371     get_message(message, REPLY_COLLFREQ);
00372     const char * p = message.data();
00373     const char * p_end = p + message.size();
00374     return decode_length(&p, p_end, false);
00375 }
00376 
00377 Xapian::doclength
00378 RemoteDatabase::get_doclength(Xapian::docid did) const
00379 {
00380     Assert(did != 0);
00381     send_message(MSG_DOCLENGTH, encode_length(did));
00382     string message;
00383     get_message(message, REPLY_DOCLENGTH);
00384     const char * p = message.c_str();
00385     const char * p_end = p + message.size();
00386     Xapian::doclength doclen = unserialise_double(&p, p_end);
00387     if (p != p_end || doclen < 0) {
00388         throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
00389     }
00390     return doclen;
00391 }
00392 
00393 reply_type
00394 RemoteDatabase::get_message(string &result, reply_type required_type) const
00395 {
00396     OmTime end_time;
00397     if (timeout) end_time = OmTime::now() + timeout;
00398 
00399     reply_type type = static_cast<reply_type>(link.get_message(result, end_time));
00400     if (type == REPLY_EXCEPTION) {
00401         unserialise_error(result, "REMOTE:", context);
00402     }
00403     if (required_type != REPLY_MAX && type != required_type) {
00404         string errmsg("Expecting reply type ");
00405         errmsg += om_tostring(int(required_type));
00406         errmsg += ", got ";
00407         errmsg += om_tostring(int(type));
00408         throw Xapian::NetworkError(errmsg);
00409     }
00410 
00411     return type;
00412 }
00413 
00414 void
00415 RemoteDatabase::send_message(message_type type, const string &message) const
00416 {
00417     OmTime end_time;
00418     if (timeout) end_time = OmTime::now() + timeout;
00419 
00420     link.send_message(static_cast<unsigned char>(type), message, end_time);
00421 }
00422 
00423 void
00424 RemoteDatabase::do_close()
00425 {
00426     // In the constructor, we set transaction_state to
00427     // TRANSACTION_UNIMPLEMENTED if we aren't writable so that we can check
00428     // it here.
00429     bool writable = (transaction_state != TRANSACTION_UNIMPLEMENTED);
00430 
00431     // Only call dtor_called() if we're writable.
00432     if (writable) dtor_called();
00433 
00434     // If we're writable, wait for a confirmation of the close, so we know that
00435     // changes have been written and flushed, and the database write lock
00436     // released.  For the non-writable case, there's no need to wait, so don't
00437     // slow down searching by waiting here.
00438     link.do_close(writable);
00439 }
00440 
00441 void
00442 RemoteDatabase::set_query(const Xapian::Query::Internal *query,
00443                          Xapian::termcount qlen,
00444                          Xapian::valueno collapse_key,
00445                          Xapian::Enquire::docid_order order,
00446                          Xapian::valueno sort_key,
00447                          Xapian::Enquire::Internal::sort_setting sort_by,
00448                          bool sort_value_forward,
00449                          int percent_cutoff, Xapian::weight weight_cutoff,
00450                          const Xapian::Weight *wtscheme,
00451                          const Xapian::RSet &omrset)
00452 {
00453     string tmp = query->serialise();
00454     string message = encode_length(tmp.size());
00455     message += tmp;
00456 
00457     // Serialise assorted Enquire settings.
00458     message += encode_length(qlen);
00459     message += encode_length(collapse_key);
00460     message += char('0' + order);
00461     message += encode_length(sort_key);
00462     message += char('0' + sort_by);
00463     message += char('0' + sort_value_forward);
00464     message += char(percent_cutoff);
00465     message += serialise_double(weight_cutoff);
00466 
00467     tmp = wtscheme->name();
00468     message += encode_length(tmp.size());
00469     message += tmp;
00470 
00471     tmp = wtscheme->serialise();
00472     message += encode_length(tmp.size());
00473     message += tmp;
00474 
00475     message += serialise_rset(omrset);
00476 
00477     send_message(MSG_QUERY, message);
00478 }
00479 
00480 bool
00481 RemoteDatabase::get_remote_stats(bool nowait, Stats &out)
00482 {
00483     if (nowait && !link.ready_to_read()) return false;
00484 
00485     string message;
00486     get_message(message, REPLY_STATS);
00487     out = unserialise_stats(message);
00488 
00489     return true;
00490 }
00491 
00492 void
00493 RemoteDatabase::send_global_stats(Xapian::doccount first,
00494                                 Xapian::doccount maxitems,
00495                                 Xapian::doccount check_at_least,
00496                                 const Stats &stats)
00497 {
00498     string message = encode_length(first);
00499     message += encode_length(maxitems);
00500     message += encode_length(check_at_least);
00501     message += serialise_stats(stats);
00502     send_message(MSG_GETMSET, message);
00503 }
00504 
00505 void
00506 RemoteDatabase::get_mset(Xapian::MSet &mset)
00507 {
00508     string message;
00509     get_message(message, REPLY_RESULTS);
00510     mset = unserialise_mset(message);
00511 }
00512 
00513 void
00514 RemoteDatabase::flush()
00515 {
00516     send_message(MSG_FLUSH, "");
00517 
00518     // We need to wait for a response to ensure documents have been committed.
00519     string message;
00520     get_message(message, REPLY_DONE);
00521 }
00522 
00523 void
00524 RemoteDatabase::cancel()
00525 {
00526     cached_stats_valid = false;
00527 
00528     send_message(MSG_CANCEL, "");
00529 }
00530 
00531 Xapian::docid
00532 RemoteDatabase::add_document(const Xapian::Document & doc)
00533 {
00534     cached_stats_valid = false;
00535 
00536     send_message(MSG_ADDDOCUMENT, serialise_document(doc));
00537 
00538     string message;
00539     get_message(message, REPLY_ADDDOCUMENT);
00540 
00541     const char * p = message.data();
00542     const char * p_end = p + message.size();
00543     return decode_length(&p, p_end, false);
00544 }
00545 
00546 void
00547 RemoteDatabase::delete_document(Xapian::docid did)
00548 {
00549     cached_stats_valid = false;
00550 
00551 //    send_message(MSG_DELETEDOCUMENT_PRE_30_2, encode_length(did));
00552     send_message(MSG_DELETEDOCUMENT, encode_length(did));
00553     string dummy;
00554     get_message(dummy, REPLY_DONE);
00555 }
00556 
00557 void
00558 RemoteDatabase::delete_document(const std::string & unique_term)
00559 {
00560     cached_stats_valid = false;
00561 
00562     send_message(MSG_DELETEDOCUMENTTERM, unique_term);
00563 }
00564 
00565 void
00566 RemoteDatabase::replace_document(Xapian::docid did,
00567                                  const Xapian::Document & doc)
00568 {
00569     cached_stats_valid = false;
00570 
00571     string message = encode_length(did);
00572     message += serialise_document(doc);
00573 
00574     send_message(MSG_REPLACEDOCUMENT, message);
00575 }
00576 
00577 Xapian::docid
00578 RemoteDatabase::replace_document(const std::string & unique_term,
00579                                  const Xapian::Document & doc)
00580 {
00581     cached_stats_valid = false;
00582 
00583     string message = encode_length(unique_term.size());
00584     message += unique_term;
00585     message += serialise_document(doc);
00586 
00587     send_message(MSG_REPLACEDOCUMENTTERM, message);
00588 
00589     get_message(message, REPLY_ADDDOCUMENT);
00590 
00591     const char * p = message.data();
00592     const char * p_end = p + message.size();
00593     return decode_length(&p, p_end, false);
00594 }

Documentation for Xapian (version 1.0.20).
Generated on 28 Apr 2010 by Doxygen 1.5.2.