00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include "chert_values.h"
00025
00026 #include "chert_cursor.h"
00027 #include "chert_postlist.h"
00028 #include "chert_termlist.h"
00029 #include "debuglog.h"
00030 #include "document.h"
00031 #include "pack.h"
00032
00033 #include "xapian/error.h"
00034 #include "xapian/valueiterator.h"
00035
00036 #include <algorithm>
00037 #include "autoptr.h"
00038
00039 using namespace std;
00040
00041
00042
00043
00044
00045
00047 inline string
00048 make_slot_key(Xapian::docid did)
00049 {
00050 LOGCALL_STATIC(DB, string, "make_slot_key", did);
00051
00052
00053
00054 string key;
00055 pack_uint_preserving_sort(key, did);
00056 key += '\0';
00057 RETURN(key);
00058 }
00059
00061 inline string
00062 make_valuestats_key(Xapian::valueno slot)
00063 {
00064 LOGCALL_STATIC(DB, string, "make_valuestats_key", slot);
00065 string key("\0\xd0", 2);
00066 pack_uint_last(key, slot);
00067 RETURN(key);
00068 }
00069
00070 void
00071 ValueChunkReader::assign(const char * p_, size_t len, Xapian::docid did_)
00072 {
00073 p = p_;
00074 end = p_ + len;
00075 did = did_;
00076 if (!unpack_string(&p, end, value))
00077 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
00078 }
00079
00080 void
00081 ValueChunkReader::next()
00082 {
00083 if (p == end) {
00084 p = NULL;
00085 return;
00086 }
00087
00088 Xapian::docid delta;
00089 if (!unpack_uint(&p, end, &delta))
00090 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
00091 did += delta + 1;
00092 if (!unpack_string(&p, end, value))
00093 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
00094 }
00095
00096 void
00097 ValueChunkReader::skip_to(Xapian::docid target)
00098 {
00099 if (p == NULL || target <= did)
00100 return;
00101
00102 size_t value_len;
00103 while (p != end) {
00104
00105 Xapian::docid delta;
00106 if (rare(!unpack_uint(&p, end, &delta)))
00107 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
00108 did += delta + 1;
00109
00110
00111 if (rare(!unpack_uint(&p, end, &value_len))) {
00112 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value length");
00113 }
00114
00115
00116 if (rare(value_len > size_t(end - p))) {
00117 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
00118 }
00119
00120
00121 if (did >= target) {
00122 value.assign(p, value_len);
00123 p += value_len;
00124 return;
00125 }
00126 p += value_len;
00127 }
00128 p = NULL;
00129 }
00130
00131 void
00132 ChertValueManager::add_value(Xapian::docid did, Xapian::valueno slot,
00133 const string & val)
00134 {
00135 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
00136 i = changes.find(slot);
00137 if (i == changes.end()) {
00138 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
00139 }
00140 i->second[did] = val;
00141 }
00142
00143 void
00144 ChertValueManager::remove_value(Xapian::docid did, Xapian::valueno slot)
00145 {
00146 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
00147 i = changes.find(slot);
00148 if (i == changes.end()) {
00149 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
00150 }
00151 i->second[did] = string();
00152 }
00153
00154 Xapian::docid
00155 ChertValueManager::get_chunk_containing_did(Xapian::valueno slot,
00156 Xapian::docid did,
00157 string &chunk) const
00158 {
00159 LOGCALL(DB, Xapian::docid, "ChertValueManager::get_chunk_containing_did", slot | did | chunk);
00160 AutoPtr<ChertCursor> cursor(postlist_table->cursor_get());
00161 if (!cursor.get()) return 0;
00162
00163 bool exact = cursor->find_entry(make_valuechunk_key(slot, did));
00164 if (!exact) {
00165
00166
00167 const char * p = cursor->current_key.data();
00168 const char * end = p + cursor->current_key.size();
00169
00170
00171 if (end - p < 2 || *p++ != '\0' || *p++ != '\xd8') return 0;
00172
00173
00174 Xapian::valueno v;
00175 if (!unpack_uint(&p, end, &v)) {
00176 throw Xapian::DatabaseCorruptError("Bad value key");
00177 }
00178 if (v != slot) return 0;
00179
00180
00181 if (!unpack_uint_preserving_sort(&p, end, &did) || p != end) {
00182 throw Xapian::DatabaseCorruptError("Bad value key");
00183 }
00184 }
00185
00186 cursor->read_tag();
00187 swap(chunk, cursor->current_tag);
00188
00189 return did;
00190 }
00191
00192 static const size_t CHUNK_SIZE_THRESHOLD = 2000;
00193
00194 static const Xapian::docid MAX_DOCID = static_cast<Xapian::docid>(-1);
00195
00196 class ValueUpdater {
00197 ChertPostListTable * table;
00198
00199 Xapian::valueno slot;
00200
00201 string ctag;
00202
00203 ValueChunkReader reader;
00204
00205 string tag;
00206
00207 Xapian::docid prev_did;
00208
00209 Xapian::docid first_did;
00210
00211 Xapian::docid new_first_did;
00212
00213 Xapian::docid last_allowed_did;
00214
00215 void append_to_stream(Xapian::docid did, const string & value) {
00216 Assert(did);
00217 if (tag.empty()) {
00218 new_first_did = did;
00219 } else {
00220 AssertRel(did,>,prev_did);
00221 pack_uint(tag, did - prev_did - 1);
00222 }
00223 prev_did = did;
00224 pack_string(tag, value);
00225 if (tag.size() >= CHUNK_SIZE_THRESHOLD) write_tag();
00226 }
00227
00228 void write_tag() {
00229
00230 if (first_did && new_first_did != first_did) {
00231 table->del(make_valuechunk_key(slot, first_did));
00232 }
00233 if (!tag.empty()) {
00234 table->add(make_valuechunk_key(slot, new_first_did), tag);
00235 }
00236 first_did = 0;
00237 tag.resize(0);
00238 }
00239
00240 public:
00241 ValueUpdater(ChertPostListTable * table_, Xapian::valueno slot_)
00242 : table(table_), slot(slot_), first_did(0), last_allowed_did(0) { }
00243
00244 ~ValueUpdater() {
00245 while (!reader.at_end()) {
00246
00247 append_to_stream(reader.get_docid(), reader.get_value());
00248 reader.next();
00249 }
00250 write_tag();
00251 }
00252
00253 void update(Xapian::docid did, const string & value) {
00254 if (last_allowed_did && did > last_allowed_did) {
00255
00256
00257
00258
00259
00260 while (!reader.at_end()) {
00261
00262 AssertRel(reader.get_docid(),<=,last_allowed_did);
00263 append_to_stream(reader.get_docid(), reader.get_value());
00264 reader.next();
00265 }
00266 write_tag();
00267 last_allowed_did = 0;
00268 }
00269 if (last_allowed_did == 0) {
00270 last_allowed_did = MAX_DOCID;
00271 Assert(tag.empty());
00272 new_first_did = 0;
00273 AutoPtr<ChertCursor> cursor(table->cursor_get());
00274 if (cursor->find_entry(make_valuechunk_key(slot, did))) {
00275
00276
00277 first_did = did;
00278 } else {
00279 Assert(!cursor->after_end());
00280
00281
00282
00283 first_did = docid_from_key(slot, cursor->current_key);
00284 }
00285
00286
00287
00288
00289
00290 if (first_did) {
00291
00292 cursor->read_tag();
00293
00294 ctag = cursor->current_tag;
00295 reader.assign(ctag.data(), ctag.size(), first_did);
00296 }
00297 if (cursor->next()) {
00298 const string & key = cursor->current_key;
00299 Xapian::docid next_first_did = docid_from_key(slot, key);
00300 if (next_first_did) last_allowed_did = next_first_did - 1;
00301 Assert(last_allowed_did);
00302 AssertRel(last_allowed_did,>=,first_did);
00303 }
00304 }
00305
00306
00307
00308
00309 while (!reader.at_end() && reader.get_docid() < did) {
00310 append_to_stream(reader.get_docid(), reader.get_value());
00311 reader.next();
00312 }
00313 if (!reader.at_end() && reader.get_docid() == did) reader.next();
00314 if (!value.empty()) {
00315
00316 append_to_stream(did, value);
00317 }
00318 }
00319 };
00320
00321 void
00322 ChertValueManager::merge_changes()
00323 {
00324 if (termlist_table->is_open()) {
00325 map<Xapian::docid, string>::const_iterator i;
00326 for (i = slots.begin(); i != slots.end(); ++i) {
00327 const string & enc = i->second;
00328 string key = make_slot_key(i->first);
00329 if (!enc.empty()) {
00330 termlist_table->add(key, i->second);
00331 } else {
00332 termlist_table->del(key);
00333 }
00334 }
00335 slots.clear();
00336 }
00337
00338 {
00339 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
00340 for (i = changes.begin(); i != changes.end(); ++i) {
00341 Xapian::valueno slot = i->first;
00342 ValueUpdater updater(postlist_table, slot);
00343 const map<Xapian::docid, string> & slot_changes = i->second;
00344 map<Xapian::docid, string>::const_iterator j;
00345 for (j = slot_changes.begin(); j != slot_changes.end(); ++j) {
00346 updater.update(j->first, j->second);
00347 }
00348 }
00349 changes.clear();
00350 }
00351 }
00352
00353 void
00354 ChertValueManager::add_document(Xapian::docid did, const Xapian::Document &doc,
00355 map<Xapian::valueno, ValueStats> & value_stats)
00356 {
00357
00358
00359 string slots_used;
00360 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
00361 Xapian::ValueIterator it = doc.values_begin();
00362 while (it != doc.values_end()) {
00363 Xapian::valueno slot = it.get_valueno();
00364 string value = *it;
00365
00366
00367 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
00368 i = value_stats.insert(make_pair(slot, ValueStats()));
00369 ValueStats & stats = i.first->second;
00370 if (i.second) {
00371
00372 get_value_stats(slot, stats);
00373 }
00374
00375
00376 if ((stats.freq)++ == 0) {
00377
00378
00379 stats.lower_bound = value;
00380 stats.upper_bound = value;
00381 } else {
00382
00383 if (value < stats.lower_bound) {
00384 stats.lower_bound = value;
00385 } else if (value > stats.upper_bound) {
00386 stats.upper_bound = value;
00387 }
00388 }
00389
00390 add_value(did, slot, value);
00391 if (termlist_table->is_open()) {
00392 pack_uint(slots_used, slot - prev_slot - 1);
00393 prev_slot = slot;
00394 }
00395 ++it;
00396 }
00397 if (slots_used.empty() && slots.find(did) == slots.end()) {
00398
00399 } else {
00400 swap(slots[did], slots_used);
00401 }
00402 }
00403
00404 void
00405 ChertValueManager::delete_document(Xapian::docid did,
00406 map<Xapian::valueno, ValueStats> & value_stats)
00407 {
00408 Assert(termlist_table->is_open());
00409 map<Xapian::docid, string>::iterator it = slots.find(did);
00410 string s;
00411 if (it != slots.end()) {
00412 swap(s, it->second);
00413 } else {
00414
00415 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
00416 slots.insert(make_pair(did, string()));
00417 }
00418 const char * p = s.data();
00419 const char * end = p + s.size();
00420 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
00421 while (p != end) {
00422 Xapian::valueno slot;
00423 if (!unpack_uint(&p, end, &slot)) {
00424 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
00425 }
00426 slot += prev_slot + 1;
00427 prev_slot = slot;
00428
00429 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
00430 i = value_stats.insert(make_pair(slot, ValueStats()));
00431 ValueStats & stats = i.first->second;
00432 if (i.second) {
00433
00434 get_value_stats(slot, stats);
00435 }
00436
00437
00438 AssertRelParanoid(stats.freq, >, 0);
00439 if (--(stats.freq) == 0) {
00440 stats.lower_bound.resize(0);
00441 stats.upper_bound.resize(0);
00442 }
00443
00444 remove_value(did, slot);
00445 }
00446 }
00447
00448 void
00449 ChertValueManager::replace_document(Xapian::docid did,
00450 const Xapian::Document &doc,
00451 map<Xapian::valueno, ValueStats> & value_stats)
00452 {
00453
00454
00455
00456 doc.internal->need_values();
00457 delete_document(did, value_stats);
00458 add_document(did, doc, value_stats);
00459 }
00460
00461 string
00462 ChertValueManager::get_value(Xapian::docid did, Xapian::valueno slot) const
00463 {
00464 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
00465 i = changes.find(slot);
00466 if (i != changes.end()) {
00467 map<Xapian::docid, string>::const_iterator j;
00468 j = i->second.find(did);
00469 if (j != i->second.end()) return j->second;
00470 }
00471
00472
00473 string chunk;
00474 Xapian::docid first_did;
00475 first_did = get_chunk_containing_did(slot, did, chunk);
00476 if (first_did == 0) return string();
00477
00478 ValueChunkReader reader(chunk.data(), chunk.size(), first_did);
00479 reader.skip_to(did);
00480 if (reader.at_end() || reader.get_docid() != did) return string();
00481 return reader.get_value();
00482 }
00483
00484 void
00485 ChertValueManager::get_all_values(map<Xapian::valueno, string> & values,
00486 Xapian::docid did) const
00487 {
00488 Assert(values.empty());
00489 if (!termlist_table->is_open()) {
00490
00491
00492 if (!postlist_table->is_open())
00493 ChertTable::throw_database_closed();
00494 throw Xapian::FeatureUnavailableError("Database has no termlist");
00495 }
00496 map<Xapian::docid, string>::const_iterator i = slots.find(did);
00497 string s;
00498 if (i != slots.end()) {
00499 s = i->second;
00500 } else {
00501
00502 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
00503 }
00504 const char * p = s.data();
00505 const char * end = p + s.size();
00506 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
00507 while (p != end) {
00508 Xapian::valueno slot;
00509 if (!unpack_uint(&p, end, &slot)) {
00510 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
00511 }
00512 slot += prev_slot + 1;
00513 prev_slot = slot;
00514 values.insert(make_pair(slot, get_value(did, slot)));
00515 }
00516 }
00517
00518 void
00519 ChertValueManager::get_value_stats(Xapian::valueno slot) const
00520 {
00521 LOGCALL_VOID(DB, "ChertValueManager::get_value_stats", slot);
00522
00523 mru_slot = Xapian::BAD_VALUENO;
00524 get_value_stats(slot, mru_valstats);
00525 mru_slot = slot;
00526 }
00527
00528 void
00529 ChertValueManager::get_value_stats(Xapian::valueno slot, ValueStats & stats) const
00530 {
00531 LOGCALL_VOID(DB, "ChertValueManager::get_value_stats", slot | Literal("[stats]"));
00532
00533 mru_slot = Xapian::BAD_VALUENO;
00534
00535 string tag;
00536 if (postlist_table->get_exact_entry(make_valuestats_key(slot), tag)) {
00537 const char * pos = tag.data();
00538 const char * end = pos + tag.size();
00539
00540 if (!unpack_uint(&pos, end, &(stats.freq))) {
00541 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
00542 throw Xapian::RangeError("Frequency statistic in value table is too large");
00543 }
00544 if (!unpack_string(&pos, end, stats.lower_bound)) {
00545 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
00546 throw Xapian::RangeError("Lower bound in value table is too large");
00547 }
00548 size_t len = end - pos;
00549 if (len == 0) {
00550 stats.upper_bound = stats.lower_bound;
00551 } else {
00552 stats.upper_bound.assign(pos, len);
00553 }
00554 } else {
00555 stats.clear();
00556 }
00557
00558 mru_slot = slot;
00559 }
00560
00561 void
00562 ChertValueManager::set_value_stats(map<Xapian::valueno, ValueStats> & value_stats)
00563 {
00564 LOGCALL_VOID(DB, "ChertValueManager::set_value_stats", value_stats);
00565 map<Xapian::valueno, ValueStats>::const_iterator i;
00566 for (i = value_stats.begin(); i != value_stats.end(); ++i) {
00567 string key = make_valuestats_key(i->first);
00568 const ValueStats & stats = i->second;
00569 if (stats.freq != 0) {
00570 string new_value;
00571 pack_uint(new_value, stats.freq);
00572 pack_string(new_value, stats.lower_bound);
00573
00574
00575
00576 if (stats.lower_bound != stats.upper_bound)
00577 new_value += stats.upper_bound;
00578 postlist_table->add(key, new_value);
00579 } else {
00580 postlist_table->del(key);
00581 }
00582 }
00583 value_stats.clear();
00584 mru_slot = Xapian::BAD_VALUENO;
00585 }