backends/flint/flint_database.cc

Go to the documentation of this file.
00001 /* flint_database.cc: flint database
00002  *
00003  * Copyright 1999,2000,2001 BrightStation PLC
00004  * Copyright 2001 Hein Ragas
00005  * Copyright 2002 Ananova Ltd
00006  * Copyright 2002,2003,2004,2005,2006,2007,2010 Olly Betts
00007  * Copyright 2006 Lemur Consulting Ltd
00008  * Copyright 2009 Richard Boulton
00009  * Copyright 2009 Kan-Ru Chen
00010  *
00011  * This program is free software; you can redistribute it and/or
00012  * modify it under the terms of the GNU General Public License as
00013  * published by the Free Software Foundation; either version 2 of the
00014  * License, or (at your option) any later version.
00015  *
00016  * This program is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  * GNU General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU General Public License
00022  * along with this program; if not, write to the Free Software
00023  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
00024  * USA
00025  */
00026 
00027 #include <config.h>
00028 
00029 #include "flint_database.h"
00030 
00031 #include <xapian/error.h>
00032 #include <xapian/valueiterator.h>
00033 
00034 #include "autoptr.h"
00035 #include "contiguousalldocspostlist.h"
00036 #include "flint_alldocspostlist.h"
00037 #include "flint_alltermslist.h"
00038 #include "flint_document.h"
00039 #include "flint_lock.h"
00040 #include "flint_metadata.h"
00041 #include "flint_modifiedpostlist.h"
00042 #include "flint_positionlist.h"
00043 #include "flint_postlist.h"
00044 #include "flint_record.h"
00045 #include "flint_spellingwordslist.h"
00046 #include "flint_termlist.h"
00047 #include "flint_utils.h"
00048 #include "flint_values.h"
00049 #include "omdebug.h"
00050 #include "safeerrno.h"
00051 #include "safesysstat.h"
00052 #include "stringutils.h"
00053 #include "utils.h"
00054 
00055 #include <sys/types.h>
00056 
00057 #include <list>
00058 #include <string>
00059 
00060 using namespace std;
00061 using namespace Xapian;
00062 
00063 // The maximum safe term length is determined by the postlist.  There we
00064 // store the term followed by "\x00\x00" then a length byte, then up to
00065 // 4 bytes of docid.  The Btree manager's key length limit is 252 bytes
00066 // so the maximum safe term length is 252 - 2 - 1 - 4 = 245 bytes.  If
00067 // the term contains zero bytes, the limit is lower (by one for each zero byte
00068 // in the term).
00069 #define MAX_SAFE_TERM_LENGTH 245
00070 
00071 // Magic key in the postlist table (which corresponds to an invalid docid) is
00072 // used to store the next free docid and total length of all documents.
00073 static const string METAINFO_KEY("", 1);
00074 
00075 /* This finds the tables, opens them at consistent revisions, manages
00076  * determining the current and next revision numbers, and stores handles
00077  * to the tables.
00078  */
00079 FlintDatabase::FlintDatabase(const string &flint_dir, int action,
00080                              unsigned int block_size)
00081         : db_dir(flint_dir),
00082           readonly(action == XAPIAN_DB_READONLY),
00083           version_file(db_dir),
00084           postlist_table(db_dir, readonly),
00085           position_table(db_dir, readonly),
00086           termlist_table(db_dir, readonly),
00087           value_table(db_dir, readonly),
00088           synonym_table(db_dir, readonly),
00089           spelling_table(db_dir, readonly),
00090           record_table(db_dir, readonly),
00091           lock(db_dir + "/flintlock"),
00092           total_length(0),
00093           lastdocid(0)
00094 {
00095     DEBUGCALL(DB, void, "FlintDatabase", flint_dir << ", " << action <<
00096               ", " << block_size);
00097 
00098     if (action == XAPIAN_DB_READONLY) {
00099         open_tables_consistent();
00100         return;
00101     }
00102 
00103     if (action != Xapian::DB_OPEN && !database_exists()) {
00104         // FIXME: if we allow Xapian::DB_OVERWRITE, check it here
00105 
00106         // Create the directory for the database, if it doesn't exist
00107         // already.
00108         bool fail = false;
00109         struct stat statbuf;
00110         if (stat(db_dir, &statbuf) == 0) {
00111             if (!S_ISDIR(statbuf.st_mode)) fail = true;
00112         } else if (errno != ENOENT || mkdir(db_dir, 0755) == -1) {
00113             fail = true;
00114         }
00115         if (fail) {
00116             throw Xapian::DatabaseCreateError("Cannot create directory `" +
00117                                               db_dir + "'", errno);
00118         }
00119         get_database_write_lock();
00120 
00121         create_and_open_tables(block_size);
00122         return;
00123     }
00124 
00125     if (action == Xapian::DB_CREATE) {
00126         throw Xapian::DatabaseCreateError("Can't create new database at `" +
00127                                           db_dir + "': a database already exists and I was told "
00128                                           "not to overwrite it");
00129     }
00130 
00131     get_database_write_lock();
00132     // if we're overwriting, pretend the db doesn't exist
00133     // FIXME: if we allow Xapian::DB_OVERWRITE, check it here
00134     if (action == Xapian::DB_CREATE_OR_OVERWRITE) {
00135         create_and_open_tables(block_size);
00136         return;
00137     }
00138 
00139     // Get latest consistent version
00140     open_tables_consistent();
00141 
00142     // Check that there are no more recent versions of tables.  If there
00143     // are, perform recovery by writing a new revision number to all
00144     // tables.
00145     if (record_table.get_open_revision_number() !=
00146         postlist_table.get_latest_revision_number()) {
00147         flint_revision_number_t new_revision = get_next_revision_number();
00148 
00149         set_revision_number(new_revision);
00150     }
00151 }
00152 
00153 FlintDatabase::~FlintDatabase()
00154 {
00155     DEBUGCALL(DB, void, "~FlintDatabase", "");
00156 }
00157 
00158 void
00159 FlintDatabase::read_metainfo()
00160 {
00161     DEBUGCALL(DB, void, "FlintDatabase::read_metainfo", "");
00162 
00163     string tag;
00164     if (!postlist_table.get_exact_entry(METAINFO_KEY, tag)) {
00165         lastdocid = 0;
00166         total_length = 0;
00167         return;
00168     }
00169 
00170     const char * data = tag.data();
00171     const char * end = data + tag.size();
00172     if (!unpack_uint(&data, end, &lastdocid) ||
00173         !unpack_uint_last(&data, end, &total_length)) {
00174         throw Xapian::DatabaseCorruptError("Meta information is corrupt.");
00175     }
00176 }
00177 
00178 bool
00179 FlintDatabase::database_exists() {
00180     DEBUGCALL(DB, bool, "FlintDatabase::database_exists", "");
00181     RETURN(record_table.exists() &&
00182            postlist_table.exists() &&
00183            termlist_table.exists());
00184 }
00185 
00186 void
00187 FlintDatabase::create_and_open_tables(unsigned int block_size)
00188 {
00189     DEBUGCALL(DB, void, "FlintDatabase::create_and_open_tables", "");
00190     // The caller is expected to create the database directory if it doesn't
00191     // already exist.
00192 
00193     // Create postlist_table first, and record_table last.  Existence of
00194     // record_table is considered to imply existence of the database.
00195     version_file.create();
00196     postlist_table.create_and_open(block_size);
00197     // The position table is created lazily, but erase it in case we're
00198     // overwriting an existing database and it already exists.
00199     position_table.erase();
00200     position_table.set_block_size(block_size);
00201 
00202     termlist_table.create_and_open(block_size);
00203     // The value table is created lazily, but erase it in case we're
00204     // overwriting an existing database and it already exists.
00205     value_table.erase();
00206     value_table.set_block_size(block_size);
00207 
00208     synonym_table.create_and_open(block_size);
00209     spelling_table.create_and_open(block_size);
00210     record_table.create_and_open(block_size);
00211 
00212     Assert(database_exists());
00213 
00214     // Check consistency
00215     flint_revision_number_t revision = record_table.get_open_revision_number();
00216     if (revision != termlist_table.get_open_revision_number() ||
00217         revision != postlist_table.get_open_revision_number()) {
00218         throw Xapian::DatabaseCreateError("Newly created tables are not in consistent state");
00219     }
00220 
00221     total_length = 0;
00222     lastdocid = 0;
00223 }
00224 
00225 void
00226 FlintDatabase::open_tables_consistent()
00227 {
00228     DEBUGCALL(DB, void, "FlintDatabase::open_tables_consistent", "");
00229     // Open record_table first, since it's the last to be written to,
00230     // and hence if a revision is available in it, it should be available
00231     // in all the other tables (unless they've moved on already).
00232     //
00233     // If we find that a table can't open the desired revision, we
00234     // go back and open record_table again, until record_table has
00235     // the same revision as the last time we opened it.
00236 
00237     flint_revision_number_t cur_rev = record_table.get_open_revision_number();
00238 
00239     // Check the version file unless we're reopening.
00240     if (cur_rev == 0) version_file.read_and_check(readonly);
00241 
00242     record_table.open();
00243     flint_revision_number_t revision = record_table.get_open_revision_number();
00244 
00245     if (cur_rev && cur_rev == revision) {
00246         // We're reopening a database and the revision hasn't changed so we
00247         // don't need to do anything.
00248         return;
00249     }
00250 
00251     // In case the position, value, synonym, and/or spelling tables don't
00252     // exist yet.
00253     unsigned int block_size = record_table.get_block_size();
00254     position_table.set_block_size(block_size);
00255     value_table.set_block_size(block_size);
00256     synonym_table.set_block_size(block_size);
00257     spelling_table.set_block_size(block_size);
00258 
00259     bool fully_opened = false;
00260     int tries = 100;
00261     int tries_left = tries;
00262     while (!fully_opened && (tries_left--) > 0) {
00263         if (spelling_table.open(revision) &&
00264             synonym_table.open(revision) &&
00265             value_table.open(revision) &&
00266             termlist_table.open(revision) &&
00267             position_table.open(revision) &&
00268             postlist_table.open(revision)) {
00269             // Everything now open at the same revision.
00270             fully_opened = true;
00271         } else {
00272             // Couldn't open consistent revision: two cases possible:
00273             // i)   An update has completed and a second one has begun since
00274             //      record was opened.  This leaves a consistent revision
00275             //      available, but not the one we were trying to open.
00276             // ii)  Tables have become corrupt / have no consistent revision
00277             //      available.  In this case, updates must have ceased.
00278             //
00279             // So, we reopen the record table, and check its revision number,
00280             // if it's changed we try the opening again, otherwise we give up.
00281             //
00282             record_table.open();
00283             flint_revision_number_t newrevision =
00284                     record_table.get_open_revision_number();
00285             if (revision == newrevision) {
00286                 // Revision number hasn't changed - therefore a second index
00287                 // sweep hasn't begun and the system must have failed.  Database
00288                 // is inconsistent.
00289                 throw Xapian::DatabaseCorruptError("Cannot open tables at consistent revisions");
00290             }
00291             revision = newrevision;
00292         }
00293     }
00294 
00295     if (!fully_opened) {
00296         throw Xapian::DatabaseModifiedError("Cannot open tables at stable revision - changing too fast");
00297     }
00298 
00299     read_metainfo();
00300 }
00301 
00302 void
00303 FlintDatabase::open_tables(flint_revision_number_t revision)
00304 {
00305     DEBUGCALL(DB, void, "FlintDatabase::open_tables", revision);
00306     version_file.read_and_check(readonly);
00307     record_table.open(revision);
00308 
00309     // In case the position, value, synonym, and/or spelling tables don't
00310     // exist yet.
00311     unsigned int block_size = record_table.get_block_size();
00312     position_table.set_block_size(block_size);
00313     value_table.set_block_size(block_size);
00314     synonym_table.set_block_size(block_size);
00315     spelling_table.set_block_size(block_size);
00316 
00317     spelling_table.open(revision);
00318     synonym_table.open(revision);
00319     value_table.open(revision);
00320     termlist_table.open(revision);
00321     position_table.open(revision);
00322     postlist_table.open(revision);
00323 }
00324 
00325 flint_revision_number_t
00326 FlintDatabase::get_revision_number() const
00327 {
00328     DEBUGCALL(DB, flint_revision_number_t, "FlintDatabase::get_revision_number", "");
00329     // We could use any table here, theoretically.
00330     RETURN(postlist_table.get_open_revision_number());
00331 }
00332 
00333 flint_revision_number_t
00334 FlintDatabase::get_next_revision_number() const
00335 {
00336     DEBUGCALL(DB, flint_revision_number_t, "FlintDatabase::get_next_revision_number", "");
00337     /* We _must_ use postlist_table here, since it is always the first
00338      * to be written, and hence will have the greatest available revision
00339      * number.
00340      */
00341     flint_revision_number_t new_revision =
00342             postlist_table.get_latest_revision_number();
00343     ++new_revision;
00344     RETURN(new_revision);
00345 }
00346 
00347 void
00348 FlintDatabase::set_revision_number(flint_revision_number_t new_revision)
00349 {
00350     DEBUGCALL(DB, void, "FlintDatabase::set_revision_number", new_revision);
00351     postlist_table.commit(new_revision);
00352     position_table.commit(new_revision);
00353     termlist_table.commit(new_revision);
00354     value_table.commit(new_revision);
00355     synonym_table.commit(new_revision);
00356     spelling_table.commit(new_revision);
00357     record_table.commit(new_revision);
00358 }
00359 
00360 void
00361 FlintDatabase::reopen()
00362 {
00363     DEBUGCALL(DB, void, "FlintDatabase::reopen", "");
00364     if (readonly) {
00365         open_tables_consistent();
00366     }
00367 }
00368 
00369 void
00370 FlintDatabase::get_database_write_lock()
00371 {
00372     DEBUGCALL(DB, void, "FlintDatabase::get_database_write_lock", "");
00373     FlintLock::reason why = lock.lock(true);
00374     if (why != FlintLock::SUCCESS) {
00375         if (why == FlintLock::UNKNOWN && !database_exists()) {
00376             string msg("No flint database found at path `");
00377             msg += db_dir;
00378             msg += '\'';
00379             throw Xapian::DatabaseOpeningError(msg);
00380         }
00381         string msg("Unable to acquire database write lock on ");
00382         msg += db_dir;
00383         if (why == FlintLock::INUSE) {
00384             msg += ": already locked";
00385         } else if (why == FlintLock::UNSUPPORTED) {
00386             msg += ": locking probably not supported by this FS";
00387         }
00388         throw Xapian::DatabaseLockError(msg);
00389     }
00390 }
00391 
00392 void
00393 FlintDatabase::modifications_failed(flint_revision_number_t old_revision,
00394                                     flint_revision_number_t new_revision,
00395                                     const string & msg)
00396 {
00397     // Modifications failed.  Wipe all the modifications from memory.
00398     try {
00399         // Discard any buffered changes and reinitialised cached values
00400         // from the table.
00401         cancel();
00402 
00403         // Reopen tables with old revision number.
00404         open_tables(old_revision);
00405 
00406         // Increase revision numbers to new revision number plus one,
00407         // writing increased numbers to all tables.
00408         ++new_revision;
00409         set_revision_number(new_revision);
00410     } catch (const Xapian::Error &e) {
00411         // Permanently close the table, since we can't get it into a
00412         // consistent state, to avoid risk of database corruption.
00413         postlist_table.close(true);
00414         position_table.close(true);
00415         termlist_table.close(true);
00416         value_table.close(true);
00417         synonym_table.close(true);
00418         spelling_table.close(true);
00419         record_table.close(true);
00420         lock.release();
00421         throw Xapian::DatabaseError("Modifications failed (" + msg +
00422                                     "), and cannot set consistent table "
00423                                     "revision numbers: " + e.get_msg());
00424     }
00425 }
00426 
00427 void
00428 FlintDatabase::apply()
00429 {
00430     DEBUGCALL(DB, void, "FlintDatabase::apply", "");
00431     if (!postlist_table.is_modified() &&
00432         !position_table.is_modified() &&
00433         !termlist_table.is_modified() &&
00434         !value_table.is_modified() &&
00435         !synonym_table.is_modified() &&
00436         !spelling_table.is_modified() &&
00437         !record_table.is_modified()) {
00438         return;
00439     }
00440 
00441     flint_revision_number_t old_revision = get_revision_number();
00442     flint_revision_number_t new_revision = get_next_revision_number();
00443 
00444     try {
00445         set_revision_number(new_revision);
00446     } catch (const Xapian::Error &e) {
00447         modifications_failed(old_revision, new_revision, e.get_description());
00448         throw;
00449     } catch (...) {
00450         modifications_failed(old_revision, new_revision, "Unknown error");
00451         throw;
00452     }
00453 }
00454 
00455 void
00456 FlintDatabase::cancel()
00457 {
00458     DEBUGCALL(DB, void, "FlintDatabase::cancel", "");
00459     postlist_table.cancel();
00460     position_table.cancel();
00461     termlist_table.cancel();
00462     value_table.cancel();
00463     synonym_table.cancel();
00464     spelling_table.cancel();
00465     record_table.cancel();
00466 }
00467 
00468 Xapian::doccount
00469 FlintDatabase::get_doccount() const
00470 {
00471     DEBUGCALL(DB, Xapian::doccount, "FlintDatabase::get_doccount", "");
00472     RETURN(record_table.get_doccount());
00473 }
00474 
00475 Xapian::docid
00476 FlintDatabase::get_lastdocid() const
00477 {
00478     DEBUGCALL(DB, Xapian::docid, "FlintDatabase::get_lastdocid", "");
00479     RETURN(lastdocid);
00480 }
00481 
00482 Xapian::doclength
00483 FlintDatabase::get_avlength() const
00484 {
00485     DEBUGCALL(DB, Xapian::doclength, "FlintDatabase::get_avlength", "");
00486     Xapian::doccount doccount = record_table.get_doccount();
00487     if (doccount == 0) {
00488         // Avoid dividing by zero when there are no documents.
00489         RETURN(0);
00490     }
00491     RETURN(double(total_length) / doccount);
00492 }
00493 
00494 Xapian::doclength
00495 FlintDatabase::get_doclength(Xapian::docid did) const
00496 {
00497     DEBUGCALL(DB, Xapian::doclength, "FlintDatabase::get_doclength", did);
00498     Assert(did != 0);
00499     RETURN(termlist_table.get_doclength(did));
00500 }
00501 
00502 Xapian::doccount
00503 FlintDatabase::get_termfreq(const string & term) const
00504 {
00505     DEBUGCALL(DB, Xapian::doccount, "FlintDatabase::get_termfreq", term);
00506     Assert(!term.empty());
00507     RETURN(postlist_table.get_termfreq(term));
00508 }
00509 
00510 Xapian::termcount
00511 FlintDatabase::get_collection_freq(const string & term) const
00512 {
00513     DEBUGCALL(DB, Xapian::termcount, "FlintDatabase::get_collection_freq", term);
00514     Assert(!term.empty());
00515     RETURN(postlist_table.get_collection_freq(term));
00516 }
00517 
00518 bool
00519 FlintDatabase::term_exists(const string & term) const
00520 {
00521     DEBUGCALL(DB, bool, "FlintDatabase::term_exists", term);
00522     Assert(!term.empty());
00523     return postlist_table.term_exists(term);
00524 }
00525 
00526 bool
00527 FlintDatabase::has_positions() const
00528 {
00529     return !position_table.empty();
00530 }
00531 
00532 LeafPostList *
00533 FlintDatabase::open_post_list(const string& term) const
00534 {
00535     DEBUGCALL(DB, LeafPostList *, "FlintDatabase::open_post_list", term);
00536     Xapian::Internal::RefCntPtr<const FlintDatabase> ptrtothis(this);
00537 
00538     if (term.empty()) {
00539         Xapian::doccount doccount = get_doccount();
00540         if (lastdocid == doccount) {
00541             RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
00542         }
00543         RETURN(new FlintAllDocsPostList(ptrtothis, doccount));
00544     }
00545 
00546     RETURN(new FlintPostList(ptrtothis, term));
00547 }
00548 
00549 TermList *
00550 FlintDatabase::open_term_list(Xapian::docid did) const
00551 {
00552     DEBUGCALL(DB, TermList *, "FlintDatabase::open_term_list", did);
00553     Assert(did != 0);
00554 
00555     Xapian::Internal::RefCntPtr<const FlintDatabase> ptrtothis(this);
00556     RETURN(new FlintTermList(ptrtothis, did));
00557 }
00558 
00559 Xapian::Document::Internal *
00560 FlintDatabase::open_document(Xapian::docid did, bool lazy) const
00561 {
00562     DEBUGCALL(DB, Xapian::Document::Internal *, "FlintDatabase::open_document",
00563               did << ", " << lazy);
00564     Assert(did != 0);
00565 
00566     Xapian::Internal::RefCntPtr<const FlintDatabase> ptrtothis(this);
00567     RETURN(new FlintDocument(ptrtothis,
00568                               &value_table,
00569                               &record_table,
00570                               did, lazy));
00571 }
00572 
00573 PositionList *
00574 FlintDatabase::open_position_list(Xapian::docid did, const string & term) const
00575 {
00576     Assert(did != 0);
00577 
00578     AutoPtr<FlintPositionList> poslist(new FlintPositionList());
00579     if (!poslist->read_data(&position_table, did, term)) {
00580         // Check that term / document combination exists.
00581         // If the doc doesn't exist, this will throw Xapian::DocNotFoundError:
00582         AutoPtr<TermList> tl(open_term_list(did));
00583         tl->skip_to(term);
00584         if (tl->at_end() || tl->get_termname() != term)
00585             throw Xapian::RangeError("Can't open position list: requested term is not present in document.");
00586         // FIXME: For 1.2.0, change this to just return an empty termlist.
00587         // If the user really needs to know, they can check themselves.
00588     }
00589 
00590     return poslist.release();
00591 }
00592 
00593 TermList *
00594 FlintDatabase::open_allterms(const string & prefix) const
00595 {
00596     DEBUGCALL(DB, TermList *, "FlintDatabase::open_allterms", "");
00597     RETURN(new FlintAllTermsList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
00598                                  prefix));
00599 }
00600 
00601 TermList *
00602 FlintDatabase::open_spelling_termlist(const string & word) const
00603 {
00604     return spelling_table.open_termlist(word);
00605 }
00606 
00607 TermList *
00608 FlintDatabase::open_spelling_wordlist() const
00609 {
00610     FlintCursor * cursor = spelling_table.cursor_get();
00611     if (!cursor) return NULL;
00612     return new FlintSpellingWordsList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
00613                                       cursor);
00614 }
00615 
00616 Xapian::doccount
00617 FlintDatabase::get_spelling_frequency(const string & word) const
00618 {
00619     return spelling_table.get_word_frequency(word);
00620 }
00621 
00622 TermList *
00623 FlintDatabase::open_synonym_termlist(const string & term) const
00624 {
00625     return synonym_table.open_termlist(term);
00626 }
00627 
00628 TermList *
00629 FlintDatabase::open_synonym_keylist(const string & prefix) const
00630 {
00631     FlintCursor * cursor = synonym_table.cursor_get();
00632     if (!cursor) return NULL;
00633     return new FlintSynonymTermList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
00634                                     cursor, prefix);
00635 }
00636 
00637 TermList *
00638 FlintDatabase::open_metadata_keylist(const std::string &prefix) const
00639 {
00640     DEBUGCALL(DB, string, "FlintDatabase::open_metadata_keylist", "");
00641     FlintCursor * cursor = postlist_table.cursor_get();
00642     if (!cursor) return NULL;
00643     return new FlintMetadataTermList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
00644                                      cursor, prefix);
00645 }
00646 
00647 string
00648 FlintDatabase::get_metadata(const string & key) const
00649 {
00650     DEBUGCALL(DB, string, "FlintDatabase::get_metadata", key);
00651     string btree_key("\x00\xc0", 2);
00652     btree_key += key;
00653     string tag;
00654     (void)postlist_table.get_exact_entry(btree_key, tag);
00655     RETURN(tag);
00656 }
00657 
00659 
00660 FlintWritableDatabase::FlintWritableDatabase(const string &dir, int action,
00661                                                int block_size)
00662         : FlintDatabase(dir, action, block_size),
00663           freq_deltas(),
00664           doclens(),
00665           mod_plists(),
00666           change_count(0),
00667           flush_threshold(0),
00668           modify_shortcut_document(NULL),
00669           modify_shortcut_docid(0)
00670 {
00671     DEBUGCALL(DB, void, "FlintWritableDatabase", dir << ", " << action << ", "
00672               << block_size);
00673 
00674     const char *p = getenv("XAPIAN_FLUSH_THRESHOLD");
00675     if (p)
00676         flush_threshold = atoi(p);
00677     if (flush_threshold == 0)
00678         flush_threshold = 10000;
00679 }
00680 
00681 FlintWritableDatabase::~FlintWritableDatabase()
00682 {
00683     DEBUGCALL(DB, void, "~FlintWritableDatabase", "");
00684     dtor_called();
00685 }
00686 
00687 void
00688 FlintWritableDatabase::flush()
00689 {
00690     if (transaction_active())
00691         throw Xapian::InvalidOperationError("Can't flush during a transaction");
00692     if (change_count) flush_postlist_changes();
00693     apply();
00694 }
00695 
00696 void
00697 FlintWritableDatabase::flush_postlist_changes() const
00698 {
00699     postlist_table.merge_changes(mod_plists, doclens, freq_deltas);
00700 
00701     // Update the total document length and last used docid.
00702     string tag = pack_uint(lastdocid);
00703     tag += pack_uint_last(total_length);
00704     postlist_table.add(METAINFO_KEY, tag);
00705 
00706     freq_deltas.clear();
00707     doclens.clear();
00708     mod_plists.clear();
00709     change_count = 0;
00710 }
00711 
00712 void
00713 FlintWritableDatabase::add_freq_delta(const string & tname,
00714                                       Xapian::termcount_diff tf_delta,
00715                                       Xapian::termcount_diff cf_delta)
00716 {
00717     map<string, pair<termcount_diff, termcount_diff> >::iterator i;
00718     i = freq_deltas.find(tname);
00719     if (i == freq_deltas.end()) {
00720         freq_deltas.insert(make_pair(tname, make_pair(tf_delta, cf_delta)));
00721     } else {
00722         i->second.first += tf_delta;
00723         i->second.second += cf_delta;
00724     }
00725 }
00726 
00727 void
00728 FlintWritableDatabase::insert_mod_plist(Xapian::docid did,
00729                                         const string & tname,
00730                                         Xapian::termcount wdf)
00731 {
00732     // Find or make the appropriate entry in mod_plists.
00733     map<string, map<docid, pair<char, termcount> > >::iterator j;
00734     j = mod_plists.find(tname);
00735     if (j == mod_plists.end()) {
00736         map<docid, pair<char, termcount> > m;
00737         j = mod_plists.insert(make_pair(tname, m)).first;
00738     }
00739     j->second[did] = make_pair('A', wdf);
00740 }
00741 
00742 void
00743 FlintWritableDatabase::update_mod_plist(Xapian::docid did,
00744                                         const string & tname,
00745                                         char type,
00746                                         Xapian::termcount wdf)
00747 {
00748     // Find or make the appropriate entry in mod_plists.
00749     map<string, map<docid, pair<char, termcount> > >::iterator j;
00750     j = mod_plists.find(tname);
00751     if (j == mod_plists.end()) {
00752         map<docid, pair<char, termcount> > m;
00753         j = mod_plists.insert(make_pair(tname, m)).first;
00754     }
00755 
00756     map<docid, pair<char, termcount> >::iterator k;
00757     k = j->second.find(did);
00758     if (k == j->second.end()) {
00759         j->second.insert(make_pair(did, make_pair(type, wdf)));
00760     } else {
00761         if (type == 'A') {
00762             // Adding an entry which has already been deleted.
00763             Assert(k->second.first == 'D');
00764             type = 'M';
00765         }
00766         k->second = make_pair(type, wdf);
00767     }
00768 }
00769 
00770 Xapian::docid
00771 FlintWritableDatabase::add_document(const Xapian::Document & document)
00772 {
00773     DEBUGCALL(DB, Xapian::docid,
00774               "FlintWritableDatabase::add_document", document);
00775     // Make sure the docid counter doesn't overflow.
00776     if (lastdocid == Xapian::docid(-1))
00777         throw Xapian::DatabaseError("Run out of docids - you'll have to use copydatabase to eliminate any gaps before you can add more documents");
00778     // Use the next unused document ID.
00779     RETURN(add_document_(++lastdocid, document));
00780 }
00781 
00782 Xapian::docid
00783 FlintWritableDatabase::add_document_(Xapian::docid did,
00784                                      const Xapian::Document & document)
00785 {
00786     DEBUGCALL(DB, Xapian::docid,
00787               "FlintWritableDatabase::add_document_", did << ", " << document);
00788     Assert(did != 0);
00789     try {
00790         // Add the record using that document ID.
00791         record_table.replace_record(document.get_data(), did);
00792 
00793         // Set the values.
00794         {
00795             Xapian::ValueIterator value = document.values_begin();
00796             Xapian::ValueIterator value_end = document.values_end();
00797             string s;
00798             value_table.encode_values(s, value, value_end);
00799             value_table.set_encoded_values(did, s);
00800         }
00801 
00802         flint_doclen_t new_doclen = 0;
00803         {
00804             Xapian::TermIterator term = document.termlist_begin();
00805             Xapian::TermIterator term_end = document.termlist_end();
00806             for ( ; term != term_end; ++term) {
00807                 termcount wdf = term.get_wdf();
00808                 // Calculate the new document length
00809                 new_doclen += wdf;
00810 
00811                 string tname = *term;
00812                 if (tname.size() > MAX_SAFE_TERM_LENGTH)
00813                     throw Xapian::InvalidArgumentError("Term too long (> "STRINGIZE(MAX_SAFE_TERM_LENGTH)"): " + tname);
00814                 add_freq_delta(tname, 1, wdf);
00815                 insert_mod_plist(did, tname, wdf);
00816 
00817                 PositionIterator pos = term.positionlist_begin();
00818                 if (pos != term.positionlist_end()) {
00819                     position_table.set_positionlist(
00820                         did, tname,
00821                         pos, term.positionlist_end(), false);
00822                 }
00823             }
00824         }
00825         DEBUGLINE(DB, "Calculated doclen for new document " << did << " as " << new_doclen);
00826 
00827         // Set the termlist
00828         termlist_table.set_termlist(did, document, new_doclen);
00829 
00830         // Set the new document length
00831         Assert(doclens.find(did) == doclens.end());
00832         doclens[did] = new_doclen;
00833         total_length += new_doclen;
00834     } catch (...) {
00835         // If an error occurs while adding a document, or doing any other
00836         // transaction, the modifications so far must be cleared before
00837         // returning control to the user - otherwise partial modifications will
00838         // persist in memory, and eventually get written to disk.
00839         cancel();
00840         throw;
00841     }
00842 
00843     // FIXME: this should be done by checking memory usage, not the number of
00844     // changes.
00845     // We could also look at:
00846     // * mod_plists.size()
00847     // * doclens.size()
00848     // * freq_deltas.size()
00849     //
00850     // cout << "+++ mod_plists.size() " << mod_plists.size() <<
00851     //     ", doclens.size() " << doclens.size() <<
00852     //     ", freq_deltas.size() " << freq_deltas.size() << endl;
00853     if (++change_count >= flush_threshold) {
00854         flush_postlist_changes();
00855         if (!transaction_active()) apply();
00856     }
00857 
00858     RETURN(did);
00859 }
00860 
00861 void
00862 FlintWritableDatabase::delete_document(Xapian::docid did)
00863 {
00864     DEBUGCALL(DB, void, "FlintWritableDatabase::delete_document", did);
00865     Assert(did != 0);
00866 
00867     if (rare(modify_shortcut_docid == did)) {
00868         // The modify_shortcut document can't be used for a modification
00869         // shortcut now, because it's been deleted!
00870         modify_shortcut_document = NULL;
00871         modify_shortcut_docid = 0;
00872     }
00873 
00874     // Remove the record.  If this fails, just propagate the exception since
00875     // the state should still be consistent (most likely it's
00876     // DocNotFoundError).
00877     record_table.delete_record(did);
00878 
00879     try {
00880         // Remove the values
00881         value_table.delete_all_values(did);
00882 
00883         // OK, now add entries to remove the postings in the underlying record.
00884         Xapian::Internal::RefCntPtr<const FlintWritableDatabase> ptrtothis(this);
00885         FlintTermList termlist(ptrtothis, did);
00886 
00887         total_length -= termlist.get_doclength();
00888 
00889         termlist.next();
00890         while (!termlist.at_end()) {
00891             string tname = termlist.get_termname();
00892             position_table.delete_positionlist(did, tname);
00893             termcount wdf = termlist.get_wdf();
00894 
00895             add_freq_delta(tname, -1, -wdf);
00896             update_mod_plist(did, tname, 'D', 0u);
00897 
00898             termlist.next();
00899         }
00900 
00901         // Remove the termlist.
00902         termlist_table.delete_termlist(did);
00903 
00904         // Remove the new doclength.
00905         doclens.erase(did);
00906     } catch (...) {
00907         // If an error occurs while deleting a document, or doing any other
00908         // transaction, the modifications so far must be cleared before
00909         // returning control to the user - otherwise partial modifications will
00910         // persist in memory, and eventually get written to disk.
00911         cancel();
00912         throw;
00913     }
00914 
00915     if (++change_count >= flush_threshold) {
00916         flush_postlist_changes();
00917         if (!transaction_active()) apply();
00918     }
00919 }
00920 
00921 void
00922 FlintWritableDatabase::replace_document(Xapian::docid did,
00923                                         const Xapian::Document & document)
00924 {
00925     DEBUGCALL(DB, void, "FlintWritableDatabase::replace_document", did << ", " << document);
00926     Assert(did != 0);
00927 
00928     try {
00929         if (did > lastdocid) {
00930             lastdocid = did;
00931             // If this docid is above the highwatermark, then we can't be
00932             // replacing an existing document.
00933             (void)add_document_(did, document);
00934             return;
00935         }
00936 
00937         // Check for a document read from this database being replaced - ie, a
00938         // modification operation.
00939         bool modifying = false;
00940         if (modify_shortcut_docid &&
00941             document.internal->get_docid() == modify_shortcut_docid) {
00942             if (document.internal.get() == modify_shortcut_document) {
00943                 // We have a docid, it matches, and the pointer matches, so we
00944                 // can skip modification of any data which hasn't been modified
00945                 // in the document.
00946                 if (!document.internal->modified()) {
00947                     // If the document is unchanged, we've nothing to do.
00948                     return;
00949                 }
00950                 modifying = true;
00951                 DEBUGLINE(DB, "Detected potential document modification shortcut.");
00952             } else {
00953                 // The modify_shortcut document can't be used for a
00954                 // modification shortcut now, because it's about to be
00955                 // modified.
00956                 modify_shortcut_document = NULL;
00957                 modify_shortcut_docid = 0;
00958             }
00959         }
00960   
00961         if (!modifying || document.internal->terms_modified()) {
00962             Xapian::Internal::RefCntPtr<const FlintWritableDatabase> ptrtothis(this);
00963             FlintTermList termlist(ptrtothis, did);
00964             Xapian::TermIterator term = document.termlist_begin();
00965 
00966             // We need to know whether the document length has changed before
00967             // we iterate through the term changes, because the document length
00968             // is stored in the postings, so if it's changed we have to update
00969             // all postings.  Therefore, we have to calculate the new document
00970             // length first.
00971             flint_doclen_t new_doclen = 0;
00972             for (; term != document.termlist_end(); ++term) {
00973                 new_doclen += term.get_wdf();
00974             }
00975 
00976             term = document.termlist_begin();
00977             flint_doclen_t old_doclen = termlist.get_doclength();
00978             string old_tname, new_tname;
00979  
00980             total_length -= old_doclen;
00981 
00982             termlist.next();
00983             while (!termlist.at_end() || term != document.termlist_end()) {
00984                 int cmp;
00985                 if (termlist.at_end()) {
00986                     cmp = 1;
00987                     new_tname = *term;
00988                 } else {
00989                     old_tname = termlist.get_termname();
00990                     if (term != document.termlist_end()) {
00991                         new_tname = *term;
00992                         cmp = old_tname.compare(new_tname);
00993                     } else {
00994                         cmp = -1;
00995                     }
00996                 }
00997 
00998                 if (cmp < 0) {
00999                     // Term old_tname has been deleted.
01000                     add_freq_delta(old_tname, -1, -termlist.get_wdf());
01001                     position_table.delete_positionlist(did, old_tname);
01002                     update_mod_plist(did, old_tname, 'D', 0u);
01003                     termlist.next();
01004                 } else if (cmp > 0) {
01005                     // Term new_tname as been added.
01006                     termcount new_wdf = term.get_wdf();
01007                     if (new_tname.size() > MAX_SAFE_TERM_LENGTH)
01008                         throw Xapian::InvalidArgumentError("Term too long (> "STRINGIZE(MAX_SAFE_TERM_LENGTH)"): " + new_tname);
01009                     add_freq_delta(new_tname, 1, new_wdf);
01010                     update_mod_plist(did, new_tname, 'A', new_wdf);
01011                     PositionIterator pos = term.positionlist_begin();
01012                     if (pos != term.positionlist_end()) {
01013                         position_table.set_positionlist(
01014                             did, new_tname,
01015                             pos, term.positionlist_end(), false);
01016                     }
01017                     ++term;
01018                 } else {
01019                     // Term already exists: look for wdf and positionlist changes.
01020                     termcount old_wdf = termlist.get_wdf();
01021                     termcount new_wdf = term.get_wdf();
01022                     if (old_doclen != new_doclen || old_wdf != new_wdf) {
01023                         add_freq_delta(new_tname, 0, new_wdf - old_wdf);
01024                         update_mod_plist(did, new_tname, 'M', new_wdf);
01025                     }
01026 
01027                     PositionIterator pos = term.positionlist_begin();
01028                     if (pos != term.positionlist_end()) {
01029                         position_table.set_positionlist(did, new_tname, pos,
01030                                                         term.positionlist_end(),
01031                                                         true);
01032                     } else {
01033                         position_table.delete_positionlist(did, new_tname);
01034                     }
01035 
01036                     termlist.next();
01037                     ++term;
01038                 }
01039             }
01040             DEBUGLINE(DB, "Calculated doclen for replacement document " << did << " as " << new_doclen);
01041 
01042             // Set the termlist
01043             termlist_table.set_termlist(did, document, new_doclen);
01044 
01045             // Set the new document length
01046             doclens[did] = new_doclen;
01047             total_length += new_doclen;
01048         }
01049 
01050         if (!modifying || document.internal->data_modified()) {
01051             // Replace the record
01052             record_table.replace_record(document.get_data(), did);
01053         }
01054 
01055         if (!modifying || document.internal->values_modified()) {
01056             // FIXME: we read the values delete them and then replace in case
01057             // they come from where they're going!  Better to ask Document
01058             // nicely and shortcut in this case!
01059             Xapian::ValueIterator value = document.values_begin();
01060             Xapian::ValueIterator value_end = document.values_end();
01061             string s;
01062             value_table.encode_values(s, value, value_end);
01063 
01064             // Replace the values.
01065             value_table.delete_all_values(did);
01066             value_table.set_encoded_values(did, s);
01067         }
01068     } catch (const Xapian::DocNotFoundError &) {
01069         (void)add_document_(did, document);
01070         return;
01071     } catch (...) {
01072         // If an error occurs while replacing a document, or doing any other
01073         // transaction, the modifications so far must be cleared before
01074         // returning control to the user - otherwise partial modifications will
01075         // persist in memory, and eventually get written to disk.
01076         cancel();
01077         throw;
01078     }
01079 
01080     if (++change_count >= flush_threshold) {
01081         flush_postlist_changes();
01082         if (!transaction_active()) apply();
01083     }
01084 }
01085 
01086 Xapian::Document::Internal *
01087 FlintWritableDatabase::open_document(Xapian::docid did, bool lazy) const
01088 {
01089     DEBUGCALL(DB, Xapian::Document::Internal *, "FlintWritableDatabase::open_document",
01090               did << ", " << lazy);
01091     modify_shortcut_document = FlintDatabase::open_document(did, lazy);
01092     // Store the docid only after open_document() successfully returns, so an
01093     // attempt to open a missing document doesn't overwrite this.
01094     modify_shortcut_docid = did;
01095     RETURN(modify_shortcut_document);
01096 }
01097 
01098 Xapian::doclength
01099 FlintWritableDatabase::get_doclength(Xapian::docid did) const
01100 {
01101     DEBUGCALL(DB, Xapian::doclength, "FlintWritableDatabase::get_doclength", did);
01102     map<docid, termcount>::const_iterator i = doclens.find(did);
01103     if (i != doclens.end()) RETURN(i->second);
01104 
01105     RETURN(FlintDatabase::get_doclength(did));
01106 }
01107 
01108 Xapian::doccount
01109 FlintWritableDatabase::get_termfreq(const string & tname) const
01110 {
01111     DEBUGCALL(DB, Xapian::doccount, "FlintWritableDatabase::get_termfreq", tname);
01112     Xapian::doccount termfreq = FlintDatabase::get_termfreq(tname);
01113     map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
01114     i = freq_deltas.find(tname);
01115     if (i != freq_deltas.end()) termfreq += i->second.first;
01116     RETURN(termfreq);
01117 }
01118 
01119 Xapian::termcount
01120 FlintWritableDatabase::get_collection_freq(const string & tname) const
01121 {
01122     DEBUGCALL(DB, Xapian::termcount, "FlintWritableDatabase::get_collection_freq", tname);
01123     Xapian::termcount collfreq = FlintDatabase::get_collection_freq(tname);
01124 
01125     map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
01126     i = freq_deltas.find(tname);
01127     if (i != freq_deltas.end()) collfreq += i->second.second;
01128 
01129     RETURN(collfreq);
01130 }
01131 
01132 bool
01133 FlintWritableDatabase::term_exists(const string & tname) const
01134 {
01135     DEBUGCALL(DB, bool, "FlintWritableDatabase::term_exists", tname);
01136     RETURN(get_termfreq(tname) != 0);
01137 }
01138 
01139 LeafPostList *
01140 FlintWritableDatabase::open_post_list(const string& tname) const
01141 {
01142     DEBUGCALL(DB, LeafPostList *, "FlintWritableDatabase::open_post_list", tname);
01143     Xapian::Internal::RefCntPtr<const FlintWritableDatabase> ptrtothis(this);
01144 
01145     if (tname.empty()) {
01146         Xapian::doccount doccount = get_doccount();
01147         if (lastdocid == doccount) {
01148             RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
01149         }
01150         RETURN(new FlintAllDocsPostList(ptrtothis, doccount));
01151     }
01152 
01153     map<string, map<docid, pair<char, termcount> > >::const_iterator j;
01154     j = mod_plists.find(tname);
01155     if (j != mod_plists.end()) {
01156         // We've got buffered changes to this term's postlist, so we need to
01157         // use a FlintModifiedPostList.
01158         RETURN(new FlintModifiedPostList(ptrtothis, tname, j->second));
01159     }
01160 
01161     RETURN(new FlintPostList(ptrtothis, tname));
01162 }
01163 
01164 TermList *
01165 FlintWritableDatabase::open_allterms(const string & prefix) const
01166 {
01167     DEBUGCALL(DB, TermList *, "FlintWritableDatabase::open_allterms", "");
01168     // If there are changes, terms may have been added or removed, and so we
01169     // need to flush (but don't commit - there may be a transaction in progress.
01170     if (change_count) flush_postlist_changes();
01171     RETURN(FlintDatabase::open_allterms(prefix));
01172 }
01173 
01174 void
01175 FlintWritableDatabase::cancel()
01176 {
01177     FlintDatabase::cancel();
01178     read_metainfo();
01179     freq_deltas.clear();
01180     doclens.clear();
01181     mod_plists.clear();
01182     change_count = 0;
01183 }
01184 
01185 void
01186 FlintWritableDatabase::add_spelling(const string & word,
01187                                     Xapian::termcount freqinc) const
01188 {
01189     spelling_table.add_word(word, freqinc);
01190 }
01191 
01192 void
01193 FlintWritableDatabase::remove_spelling(const string & word,
01194                                        Xapian::termcount freqdec) const
01195 {
01196     spelling_table.remove_word(word, freqdec);
01197 }
01198 
01199 TermList *
01200 FlintWritableDatabase::open_spelling_wordlist() const
01201 {
01202     spelling_table.merge_changes();
01203     return FlintDatabase::open_spelling_wordlist();
01204 }
01205 
01206 TermList *
01207 FlintWritableDatabase::open_synonym_keylist(const string & prefix) const
01208 {
01209     synonym_table.merge_changes();
01210     return FlintDatabase::open_synonym_keylist(prefix);
01211 }
01212 
01213 void
01214 FlintWritableDatabase::add_synonym(const string & term,
01215                                    const string & synonym) const
01216 {
01217     synonym_table.add_synonym(term, synonym);
01218 }
01219 
01220 void
01221 FlintWritableDatabase::remove_synonym(const string & term,
01222                                       const string & synonym) const
01223 {
01224     synonym_table.remove_synonym(term, synonym);
01225 }
01226 
01227 void
01228 FlintWritableDatabase::clear_synonyms(const string & term) const
01229 {
01230     synonym_table.clear_synonyms(term);
01231 }
01232 
01233 void
01234 FlintWritableDatabase::set_metadata(const string & key, const string & value)
01235 {
01236     DEBUGCALL(DB, string, "FlintWritableDatabase::set_metadata",
01237               key << ", " << value);
01238     string btree_key("\x00\xc0", 2);
01239     btree_key += key;
01240     if (value.empty()) {
01241         postlist_table.del(btree_key);
01242     } else {
01243         postlist_table.add(btree_key, value);
01244     }
01245 }
01246 
01247 void
01248 FlintWritableDatabase::invalidate_doc_object(Xapian::Document::Internal * obj) const
01249 {
01250     if (obj == modify_shortcut_document) {
01251         modify_shortcut_document = NULL;
01252         modify_shortcut_docid = 0;
01253     }
01254 }

Documentation for Xapian (version 1.0.20).
Generated on 28 Apr 2010 by Doxygen 1.5.2.