00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include "brass_values.h"
00025
00026 #include "brass_cursor.h"
00027 #include "brass_postlist.h"
00028 #include "brass_termlist.h"
00029 #include "debuglog.h"
00030 #include "document.h"
00031 #include "pack.h"
00032
00033 #include "xapian/error.h"
00034 #include "xapian/valueiterator.h"
00035
00036 #include <algorithm>
00037 #include "autoptr.h"
00038
00039 using namespace Brass;
00040 using namespace std;
00041
00042
00043
00044
00045
00046
00048 inline string
00049 make_slot_key(Xapian::docid did)
00050 {
00051 LOGCALL_STATIC(DB, string, "make_slot_key", did);
00052
00053
00054
00055 string key;
00056 pack_uint_preserving_sort(key, did);
00057 key += '\0';
00058 RETURN(key);
00059 }
00060
00062 inline string
00063 make_valuestats_key(Xapian::valueno slot)
00064 {
00065 LOGCALL_STATIC(DB, string, "make_valuestats_key", slot);
00066 string key("\0\xd0", 2);
00067 pack_uint_last(key, slot);
00068 RETURN(key);
00069 }
00070
00071 void
00072 ValueChunkReader::assign(const char * p_, size_t len, Xapian::docid did_)
00073 {
00074 p = p_;
00075 end = p_ + len;
00076 did = did_;
00077 if (!unpack_string(&p, end, value))
00078 throw Xapian::DatabaseCorruptError("Failed to unpack first value");
00079 }
00080
00081 void
00082 ValueChunkReader::next()
00083 {
00084 if (p == end) {
00085 p = NULL;
00086 return;
00087 }
00088
00089 Xapian::docid delta;
00090 if (!unpack_uint(&p, end, &delta))
00091 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
00092 did += delta + 1;
00093 if (!unpack_string(&p, end, value))
00094 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
00095 }
00096
00097 void
00098 ValueChunkReader::skip_to(Xapian::docid target)
00099 {
00100 if (p == NULL || target <= did)
00101 return;
00102
00103 size_t value_len;
00104 while (p != end) {
00105
00106 Xapian::docid delta;
00107 if (rare(!unpack_uint(&p, end, &delta)))
00108 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value docid");
00109 did += delta + 1;
00110
00111
00112 if (rare(!unpack_uint(&p, end, &value_len))) {
00113 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value length");
00114 }
00115
00116
00117 if (rare(value_len > size_t(end - p))) {
00118 throw Xapian::DatabaseCorruptError("Failed to unpack streamed value");
00119 }
00120
00121
00122 if (did >= target) {
00123 value.assign(p, value_len);
00124 p += value_len;
00125 return;
00126 }
00127 p += value_len;
00128 }
00129 p = NULL;
00130 }
00131
00132 void
00133 BrassValueManager::add_value(Xapian::docid did, Xapian::valueno slot,
00134 const string & val)
00135 {
00136 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
00137 i = changes.find(slot);
00138 if (i == changes.end()) {
00139 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
00140 }
00141 i->second[did] = val;
00142 }
00143
00144 void
00145 BrassValueManager::remove_value(Xapian::docid did, Xapian::valueno slot)
00146 {
00147 map<Xapian::valueno, map<Xapian::docid, string> >::iterator i;
00148 i = changes.find(slot);
00149 if (i == changes.end()) {
00150 i = changes.insert(make_pair(slot, map<Xapian::docid, string>())).first;
00151 }
00152 i->second[did] = string();
00153 }
00154
00155 Xapian::docid
00156 BrassValueManager::get_chunk_containing_did(Xapian::valueno slot,
00157 Xapian::docid did,
00158 string &chunk) const
00159 {
00160 LOGCALL(DB, Xapian::docid, "BrassValueManager::get_chunk_containing_did", slot | did | chunk);
00161 AutoPtr<BrassCursor> cursor(postlist_table->cursor_get());
00162 if (!cursor.get()) return 0;
00163
00164 bool exact = cursor->find_entry(make_valuechunk_key(slot, did));
00165 if (!exact) {
00166
00167
00168 const char * p = cursor->current_key.data();
00169 const char * end = p + cursor->current_key.size();
00170
00171
00172 if (end - p < 2 || *p++ != '\0' || *p++ != '\xd8') return 0;
00173
00174
00175 Xapian::valueno v;
00176 if (!unpack_uint(&p, end, &v)) {
00177 throw Xapian::DatabaseCorruptError("Bad value key");
00178 }
00179 if (v != slot) return 0;
00180
00181
00182 if (!unpack_uint_preserving_sort(&p, end, &did) || p != end) {
00183 throw Xapian::DatabaseCorruptError("Bad value key");
00184 }
00185 }
00186
00187 cursor->read_tag();
00188 swap(chunk, cursor->current_tag);
00189
00190 return did;
00191 }
00192
00193 static const size_t CHUNK_SIZE_THRESHOLD = 2000;
00194
00195 static const Xapian::docid MAX_DOCID = static_cast<Xapian::docid>(-1);
00196
00197 namespace Brass {
00198
00199 class ValueUpdater {
00200 BrassPostListTable * table;
00201
00202 Xapian::valueno slot;
00203
00204 string ctag;
00205
00206 ValueChunkReader reader;
00207
00208 string tag;
00209
00210 Xapian::docid prev_did;
00211
00212 Xapian::docid first_did;
00213
00214 Xapian::docid new_first_did;
00215
00216 Xapian::docid last_allowed_did;
00217
00218 void append_to_stream(Xapian::docid did, const string & value) {
00219 Assert(did);
00220 if (tag.empty()) {
00221 new_first_did = did;
00222 } else {
00223 AssertRel(did,>,prev_did);
00224 pack_uint(tag, did - prev_did - 1);
00225 }
00226 prev_did = did;
00227 pack_string(tag, value);
00228 if (tag.size() >= CHUNK_SIZE_THRESHOLD) write_tag();
00229 }
00230
00231 void write_tag() {
00232
00233 if (first_did && new_first_did != first_did) {
00234 table->del(make_valuechunk_key(slot, first_did));
00235 }
00236 if (!tag.empty()) {
00237 table->add(make_valuechunk_key(slot, new_first_did), tag);
00238 }
00239 first_did = 0;
00240 tag.resize(0);
00241 }
00242
00243 public:
00244 ValueUpdater(BrassPostListTable * table_, Xapian::valueno slot_)
00245 : table(table_), slot(slot_), first_did(0), last_allowed_did(0) { }
00246
00247 ~ValueUpdater() {
00248 while (!reader.at_end()) {
00249
00250 append_to_stream(reader.get_docid(), reader.get_value());
00251 reader.next();
00252 }
00253 write_tag();
00254 }
00255
00256 void update(Xapian::docid did, const string & value) {
00257 if (last_allowed_did && did > last_allowed_did) {
00258
00259
00260
00261
00262
00263 while (!reader.at_end()) {
00264
00265 AssertRel(reader.get_docid(),<=,last_allowed_did);
00266 append_to_stream(reader.get_docid(), reader.get_value());
00267 reader.next();
00268 }
00269 write_tag();
00270 last_allowed_did = 0;
00271 }
00272 if (last_allowed_did == 0) {
00273 last_allowed_did = MAX_DOCID;
00274 Assert(tag.empty());
00275 new_first_did = 0;
00276 AutoPtr<BrassCursor> cursor(table->cursor_get());
00277 if (cursor->find_entry(make_valuechunk_key(slot, did))) {
00278
00279
00280 first_did = did;
00281 } else {
00282 Assert(!cursor->after_end());
00283
00284
00285
00286 first_did = docid_from_key(slot, cursor->current_key);
00287 }
00288
00289
00290
00291
00292
00293 if (first_did) {
00294
00295 cursor->read_tag();
00296
00297 ctag = cursor->current_tag;
00298 reader.assign(ctag.data(), ctag.size(), first_did);
00299 }
00300 if (cursor->next()) {
00301 const string & key = cursor->current_key;
00302 Xapian::docid next_first_did = docid_from_key(slot, key);
00303 if (next_first_did) last_allowed_did = next_first_did - 1;
00304 Assert(last_allowed_did);
00305 AssertRel(last_allowed_did,>=,first_did);
00306 }
00307 }
00308
00309
00310
00311
00312 while (!reader.at_end() && reader.get_docid() < did) {
00313 append_to_stream(reader.get_docid(), reader.get_value());
00314 reader.next();
00315 }
00316 if (!reader.at_end() && reader.get_docid() == did) reader.next();
00317 if (!value.empty()) {
00318
00319 append_to_stream(did, value);
00320 }
00321 }
00322 };
00323
00324 }
00325
00326 void
00327 BrassValueManager::merge_changes()
00328 {
00329 if (termlist_table->is_open()) {
00330 map<Xapian::docid, string>::const_iterator i;
00331 for (i = slots.begin(); i != slots.end(); ++i) {
00332 const string & enc = i->second;
00333 string key = make_slot_key(i->first);
00334 if (!enc.empty()) {
00335 termlist_table->add(key, i->second);
00336 } else {
00337 termlist_table->del(key);
00338 }
00339 }
00340 slots.clear();
00341 }
00342
00343 {
00344 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
00345 for (i = changes.begin(); i != changes.end(); ++i) {
00346 Xapian::valueno slot = i->first;
00347 Brass::ValueUpdater updater(postlist_table, slot);
00348 const map<Xapian::docid, string> & slot_changes = i->second;
00349 map<Xapian::docid, string>::const_iterator j;
00350 for (j = slot_changes.begin(); j != slot_changes.end(); ++j) {
00351 updater.update(j->first, j->second);
00352 }
00353 }
00354 changes.clear();
00355 }
00356 }
00357
00358 void
00359 BrassValueManager::add_document(Xapian::docid did, const Xapian::Document &doc,
00360 map<Xapian::valueno, ValueStats> & value_stats)
00361 {
00362
00363
00364 string slots_used;
00365 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
00366 Xapian::ValueIterator it = doc.values_begin();
00367 while (it != doc.values_end()) {
00368 Xapian::valueno slot = it.get_valueno();
00369 string value = *it;
00370
00371
00372 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
00373 i = value_stats.insert(make_pair(slot, ValueStats()));
00374 ValueStats & stats = i.first->second;
00375 if (i.second) {
00376
00377 get_value_stats(slot, stats);
00378 }
00379
00380
00381 if ((stats.freq)++ == 0) {
00382
00383
00384 stats.lower_bound = value;
00385 stats.upper_bound = value;
00386 } else {
00387
00388 if (value < stats.lower_bound) {
00389 stats.lower_bound = value;
00390 } else if (value > stats.upper_bound) {
00391 stats.upper_bound = value;
00392 }
00393 }
00394
00395 add_value(did, slot, value);
00396 if (termlist_table->is_open()) {
00397 pack_uint(slots_used, slot - prev_slot - 1);
00398 prev_slot = slot;
00399 }
00400 ++it;
00401 }
00402 if (slots_used.empty() && slots.find(did) == slots.end()) {
00403
00404 } else {
00405 swap(slots[did], slots_used);
00406 }
00407 }
00408
00409 void
00410 BrassValueManager::delete_document(Xapian::docid did,
00411 map<Xapian::valueno, ValueStats> & value_stats)
00412 {
00413 Assert(termlist_table->is_open());
00414 map<Xapian::docid, string>::iterator it = slots.find(did);
00415 string s;
00416 if (it != slots.end()) {
00417 swap(s, it->second);
00418 } else {
00419
00420 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
00421 slots.insert(make_pair(did, string()));
00422 }
00423 const char * p = s.data();
00424 const char * end = p + s.size();
00425 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
00426 while (p != end) {
00427 Xapian::valueno slot;
00428 if (!unpack_uint(&p, end, &slot)) {
00429 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
00430 }
00431 slot += prev_slot + 1;
00432 prev_slot = slot;
00433
00434 std::pair<map<Xapian::valueno, ValueStats>::iterator, bool> i;
00435 i = value_stats.insert(make_pair(slot, ValueStats()));
00436 ValueStats & stats = i.first->second;
00437 if (i.second) {
00438
00439 get_value_stats(slot, stats);
00440 }
00441
00442
00443 AssertRelParanoid(stats.freq, >, 0);
00444 if (--(stats.freq) == 0) {
00445 stats.lower_bound.resize(0);
00446 stats.upper_bound.resize(0);
00447 }
00448
00449 remove_value(did, slot);
00450 }
00451 }
00452
00453 void
00454 BrassValueManager::replace_document(Xapian::docid did,
00455 const Xapian::Document &doc,
00456 map<Xapian::valueno, ValueStats> & value_stats)
00457 {
00458
00459
00460
00461 doc.internal->need_values();
00462 delete_document(did, value_stats);
00463 add_document(did, doc, value_stats);
00464 }
00465
00466 string
00467 BrassValueManager::get_value(Xapian::docid did, Xapian::valueno slot) const
00468 {
00469 map<Xapian::valueno, map<Xapian::docid, string> >::const_iterator i;
00470 i = changes.find(slot);
00471 if (i != changes.end()) {
00472 map<Xapian::docid, string>::const_iterator j;
00473 j = i->second.find(did);
00474 if (j != i->second.end()) return j->second;
00475 }
00476
00477
00478 string chunk;
00479 Xapian::docid first_did;
00480 first_did = get_chunk_containing_did(slot, did, chunk);
00481 if (first_did == 0) return string();
00482
00483 ValueChunkReader reader(chunk.data(), chunk.size(), first_did);
00484 reader.skip_to(did);
00485 if (reader.at_end() || reader.get_docid() != did) return string();
00486 return reader.get_value();
00487 }
00488
00489 void
00490 BrassValueManager::get_all_values(map<Xapian::valueno, string> & values,
00491 Xapian::docid did) const
00492 {
00493 Assert(values.empty());
00494 if (!termlist_table->is_open())
00495 throw Xapian::FeatureUnavailableError("Database has no termlist");
00496 map<Xapian::docid, string>::const_iterator i = slots.find(did);
00497 string s;
00498 if (i != slots.end()) {
00499 s = i->second;
00500 } else {
00501
00502 if (!termlist_table->get_exact_entry(make_slot_key(did), s)) return;
00503 }
00504 const char * p = s.data();
00505 const char * end = p + s.size();
00506 Xapian::valueno prev_slot = static_cast<Xapian::valueno>(-1);
00507 while (p != end) {
00508 Xapian::valueno slot;
00509 if (!unpack_uint(&p, end, &slot)) {
00510 throw Xapian::DatabaseCorruptError("Value slot encoding corrupt");
00511 }
00512 slot += prev_slot + 1;
00513 prev_slot = slot;
00514 values.insert(make_pair(slot, get_value(did, slot)));
00515 }
00516 }
00517
00518 void
00519 BrassValueManager::get_value_stats(Xapian::valueno slot) const
00520 {
00521 LOGCALL_VOID(DB, "BrassValueManager::get_value_stats", slot);
00522
00523 mru_slot = Xapian::BAD_VALUENO;
00524 get_value_stats(slot, mru_valstats);
00525 mru_slot = slot;
00526 }
00527
00528 void
00529 BrassValueManager::get_value_stats(Xapian::valueno slot, ValueStats & stats) const
00530 {
00531 LOGCALL_VOID(DB, "BrassValueManager::get_value_stats", slot | Literal("[stats]"));
00532
00533 mru_slot = Xapian::BAD_VALUENO;
00534
00535 string tag;
00536 if (postlist_table->get_exact_entry(make_valuestats_key(slot), tag)) {
00537 const char * pos = tag.data();
00538 const char * end = pos + tag.size();
00539
00540 if (!unpack_uint(&pos, end, &(stats.freq))) {
00541 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
00542 throw Xapian::RangeError("Frequency statistic in value table is too large");
00543 }
00544 if (!unpack_string(&pos, end, stats.lower_bound)) {
00545 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
00546 throw Xapian::RangeError("Lower bound in value table is too large");
00547 }
00548 size_t len = end - pos;
00549 if (len == 0) {
00550 stats.upper_bound = stats.lower_bound;
00551 } else {
00552 stats.upper_bound.assign(pos, len);
00553 }
00554 } else {
00555 stats.clear();
00556 }
00557
00558 mru_slot = slot;
00559 }
00560
00561 void
00562 BrassValueManager::set_value_stats(map<Xapian::valueno, ValueStats> & value_stats)
00563 {
00564 LOGCALL_VOID(DB, "BrassValueManager::set_value_stats", value_stats);
00565 map<Xapian::valueno, ValueStats>::const_iterator i;
00566 for (i = value_stats.begin(); i != value_stats.end(); ++i) {
00567 string key = make_valuestats_key(i->first);
00568 const ValueStats & stats = i->second;
00569 if (stats.freq != 0) {
00570 string new_value;
00571 pack_uint(new_value, stats.freq);
00572 pack_string(new_value, stats.lower_bound);
00573
00574
00575
00576 if (stats.lower_bound != stats.upper_bound)
00577 new_value += stats.upper_bound;
00578 postlist_table->add(key, new_value);
00579 } else {
00580 postlist_table->del(key);
00581 }
00582 }
00583 value_stats.clear();
00584 mru_slot = Xapian::BAD_VALUENO;
00585 }