00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023
00024 #include <xapian/compactor.h>
00025
00026 #include <algorithm>
00027 #include <queue>
00028
00029 #include <cstdio>
00030
00031 #include "safeerrno.h"
00032 #include <sys/types.h>
00033 #include "safesysstat.h"
00034
00035 #include "flint_table.h"
00036 #include "flint_compact.h"
00037 #include "flint_cursor.h"
00038 #include "flint_utils.h"
00039 #include "internaltypes.h"
00040 #include "utils.h"
00041
00042 #include "../byte_length_strings.h"
00043 #include "../prefix_compressed_strings.h"
00044 #include <xapian.h>
00045
00046 using namespace std;
00047
00048
00049
00050 namespace FlintCompact {
00051
00052 static inline bool
00053 is_metainfo_key(const string & key)
00054 {
00055 return key.size() == 1 && key[0] == '\0';
00056 }
00057
00058 static inline bool
00059 is_user_metadata_key(const string & key)
00060 {
00061 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
00062 }
00063
00064 class PostlistCursor : private FlintCursor {
00065 Xapian::docid offset;
00066
00067 public:
00068 string key, tag;
00069 Xapian::docid firstdid;
00070 Xapian::termcount tf, cf;
00071
00072 PostlistCursor(FlintTable *in, Xapian::docid offset_)
00073 : FlintCursor(in), offset(offset_), firstdid(0)
00074 {
00075 find_entry(string());
00076 next();
00077 }
00078
00079 ~PostlistCursor()
00080 {
00081 delete FlintCursor::get_table();
00082 }
00083
00084 bool next() {
00085 if (!FlintCursor::next()) return false;
00086
00087
00088 read_tag();
00089 key = current_key;
00090 tag = current_tag;
00091 tf = cf = 0;
00092 if (is_metainfo_key(key)) return true;
00093 if (is_user_metadata_key(key)) return true;
00094
00095
00096
00097 const char * d = key.data();
00098 const char * e = d + key.size();
00099 string tname;
00100 if (!F_unpack_string_preserving_sort(&d, e, tname))
00101 throw Xapian::DatabaseCorruptError("Bad postlist key");
00102 if (d == e) {
00103
00104 d = tag.data();
00105 e = d + tag.size();
00106 if (!F_unpack_uint(&d, e, &tf) ||
00107 !F_unpack_uint(&d, e, &cf) ||
00108 !F_unpack_uint(&d, e, &firstdid)) {
00109 throw Xapian::DatabaseCorruptError("Bad postlist tag");
00110 }
00111 ++firstdid;
00112 tag.erase(0, d - tag.data());
00113 } else {
00114
00115 size_t tmp = d - key.data();
00116 if (!F_unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
00117 throw Xapian::DatabaseCorruptError("Bad postlist key");
00118 key.erase(tmp);
00119 }
00120 firstdid += offset;
00121 return true;
00122 }
00123 };
00124
00125 class PostlistCursorGt {
00126 public:
00129 bool operator()(const PostlistCursor *a, const PostlistCursor *b) {
00130 if (a->key > b->key) return true;
00131 if (a->key != b->key) return false;
00132 return (a->firstdid > b->firstdid);
00133 }
00134 };
00135
00136 static void
00137 merge_postlists(Xapian::Compactor & compactor,
00138 FlintTable * out, vector<Xapian::docid>::const_iterator offset,
00139 vector<string>::const_iterator b,
00140 vector<string>::const_iterator e,
00141 Xapian::docid last_docid)
00142 {
00143 totlen_t tot_totlen = 0;
00144 priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
00145 for ( ; b != e; ++b, ++offset) {
00146 FlintTable *in = new FlintTable("postlist", *b, true);
00147 in->open();
00148 if (in->empty()) {
00149
00150 delete in;
00151 continue;
00152 }
00153
00154
00155
00156 PostlistCursor * cur = new PostlistCursor(in, *offset);
00157
00158
00159
00160
00161 if (is_metainfo_key(cur->key)) {
00162 const char * data = cur->tag.data();
00163 const char * end = data + cur->tag.size();
00164 Xapian::docid dummy_did = 0;
00165 if (!F_unpack_uint(&data, end, &dummy_did)) {
00166 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
00167 }
00168 totlen_t totlen = 0;
00169 if (!F_unpack_uint_last(&data, end, &totlen)) {
00170 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
00171 }
00172 tot_totlen += totlen;
00173 if (tot_totlen < totlen) {
00174 throw "totlen wrapped!";
00175 }
00176 }
00177 if (cur->next()) {
00178 pq.push(cur);
00179 } else {
00180 delete cur;
00181 }
00182 }
00183
00184 {
00185 string tag = F_pack_uint(last_docid);
00186 tag += F_pack_uint_last(tot_totlen);
00187 out->add(string(1, '\0'), tag);
00188 }
00189
00190 string last_key;
00191 {
00192
00193 vector<string> tags;
00194 while (!pq.empty()) {
00195 PostlistCursor * cur = pq.top();
00196 const string& key = cur->key;
00197 if (!is_user_metadata_key(key)) break;
00198
00199 if (key != last_key) {
00200 if (tags.size() > 1) {
00201 Assert(!last_key.empty());
00202 out->add(last_key,
00203 compactor.resolve_duplicate_metadata(last_key,
00204 tags.size(),
00205 &tags[0]));
00206 } else if (tags.size() == 1) {
00207 Assert(!last_key.empty());
00208 out->add(last_key, tags[0]);
00209 }
00210 tags.resize(0);
00211 last_key = key;
00212 }
00213 tags.push_back(cur->tag);
00214
00215 pq.pop();
00216 if (cur->next()) {
00217 pq.push(cur);
00218 } else {
00219 delete cur;
00220 }
00221 }
00222 if (tags.size() > 1) {
00223 Assert(!last_key.empty());
00224 out->add(last_key,
00225 compactor.resolve_duplicate_metadata(last_key,
00226 tags.size(),
00227 &tags[0]));
00228 } else if (tags.size() == 1) {
00229 Assert(!last_key.empty());
00230 out->add(last_key, tags[0]);
00231 }
00232 }
00233
00234 Xapian::termcount tf = 0, cf = 0;
00235 vector<pair<Xapian::docid, string> > tags;
00236 while (true) {
00237 PostlistCursor * cur = NULL;
00238 if (!pq.empty()) {
00239 cur = pq.top();
00240 pq.pop();
00241 }
00242 Assert(cur == NULL || !is_user_metadata_key(cur->key));
00243 if (cur == NULL || cur->key != last_key) {
00244 if (!tags.empty()) {
00245 string first_tag = F_pack_uint(tf);
00246 first_tag += F_pack_uint(cf);
00247 first_tag += F_pack_uint(tags[0].first - 1);
00248 string tag = tags[0].second;
00249 tag[0] = (tags.size() == 1) ? '1' : '0';
00250 first_tag += tag;
00251 out->add(last_key, first_tag);
00252 vector<pair<Xapian::docid, string> >::const_iterator i;
00253 i = tags.begin();
00254 while (++i != tags.end()) {
00255 string new_key = last_key;
00256 new_key += F_pack_uint_preserving_sort(i->first);
00257 tag = i->second;
00258 tag[0] = (i + 1 == tags.end()) ? '1' : '0';
00259 out->add(new_key, tag);
00260 }
00261 }
00262 tags.clear();
00263 if (cur == NULL) break;
00264 tf = cf = 0;
00265 last_key = cur->key;
00266 }
00267 tf += cur->tf;
00268 cf += cur->cf;
00269 tags.push_back(make_pair(cur->firstdid, cur->tag));
00270 if (cur->next()) {
00271 pq.push(cur);
00272 } else {
00273 delete cur;
00274 }
00275 }
00276 }
00277
00278 struct MergeCursor : public FlintCursor {
00279 MergeCursor(FlintTable *in) : FlintCursor(in) {
00280 find_entry(string());
00281 next();
00282 }
00283
00284 ~MergeCursor() {
00285 delete FlintCursor::get_table();
00286 }
00287 };
00288
00289 struct CursorGt {
00291 bool operator()(const FlintCursor *a, const FlintCursor *b) {
00292 if (b->after_end()) return false;
00293 if (a->after_end()) return true;
00294 return (a->current_key > b->current_key);
00295 }
00296 };
00297
00298 static void
00299 merge_spellings(FlintTable * out,
00300 vector<string>::const_iterator b,
00301 vector<string>::const_iterator e)
00302 {
00303 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
00304 for ( ; b != e; ++b) {
00305 FlintTable *in = new FlintTable("spelling", *b, true, DONT_COMPRESS, true);
00306 in->open();
00307 if (!in->empty()) {
00308
00309
00310 pq.push(new MergeCursor(in));
00311 } else {
00312 delete in;
00313 }
00314 }
00315
00316 while (!pq.empty()) {
00317 MergeCursor * cur = pq.top();
00318 pq.pop();
00319
00320 string key = cur->current_key;
00321 if (pq.empty() || pq.top()->current_key > key) {
00322
00323
00324 bool compressed = cur->read_tag(true);
00325 out->add(key, cur->current_tag, compressed);
00326 if (cur->next()) {
00327 pq.push(cur);
00328 } else {
00329 delete cur;
00330 }
00331 continue;
00332 }
00333
00334
00335 string tag;
00336 if (key[0] != 'W') {
00337
00338
00339 priority_queue<PrefixCompressedStringItor *,
00340 vector<PrefixCompressedStringItor *>,
00341 PrefixCompressedStringItorGt> pqtag;
00342
00343
00344
00345 vector<MergeCursor *> vec;
00346 vec.reserve(pq.size());
00347
00348 while (true) {
00349 cur->read_tag();
00350 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
00351 vec.push_back(cur);
00352 if (pq.empty() || pq.top()->current_key != key) break;
00353 cur = pq.top();
00354 pq.pop();
00355 }
00356
00357 PrefixCompressedStringWriter wr(tag);
00358 string lastword;
00359 while (!pqtag.empty()) {
00360 PrefixCompressedStringItor * it = pqtag.top();
00361 string word = **it;
00362 if (word != lastword) {
00363 lastword = word;
00364 wr.append(lastword);
00365 }
00366 ++*it;
00367 pqtag.pop();
00368 if (!it->at_end()) {
00369 pqtag.push(it);
00370 } else {
00371 delete it;
00372 }
00373 }
00374
00375 vector<MergeCursor *>::const_iterator i;
00376 for (i = vec.begin(); i != vec.end(); ++i) {
00377 cur = *i;
00378 if (cur->next()) {
00379 pq.push(cur);
00380 } else {
00381 delete cur;
00382 }
00383 }
00384 } else {
00385
00386 Xapian::termcount tot_freq = 0;
00387 while (true) {
00388 cur->read_tag();
00389 Xapian::termcount freq;
00390 const char * p = cur->current_tag.data();
00391 const char * end = p + cur->current_tag.size();
00392 if (!F_unpack_uint_last(&p, end, &freq) || freq == 0) {
00393 throw Xapian::DatabaseCorruptError("Bad spelling word freq");
00394 }
00395 tot_freq += freq;
00396 if (cur->next()) {
00397 pq.push(cur);
00398 } else {
00399 delete cur;
00400 }
00401 if (pq.empty() || pq.top()->current_key != key) break;
00402 cur = pq.top();
00403 pq.pop();
00404 }
00405 tag = F_pack_uint_last(tot_freq);
00406 }
00407 out->add(key, tag);
00408 }
00409 }
00410
00411 static void
00412 merge_synonyms(FlintTable * out,
00413 vector<string>::const_iterator b,
00414 vector<string>::const_iterator e)
00415 {
00416 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
00417 for ( ; b != e; ++b) {
00418 FlintTable *in = new FlintTable("synonym", *b, true, DONT_COMPRESS, true);
00419 in->open();
00420 if (!in->empty()) {
00421
00422
00423 pq.push(new MergeCursor(in));
00424 } else {
00425 delete in;
00426 }
00427 }
00428
00429 while (!pq.empty()) {
00430 MergeCursor * cur = pq.top();
00431 pq.pop();
00432
00433 string key = cur->current_key;
00434 if (pq.empty() || pq.top()->current_key > key) {
00435
00436
00437 bool compressed = cur->read_tag(true);
00438 out->add(key, cur->current_tag, compressed);
00439 if (cur->next()) {
00440 pq.push(cur);
00441 } else {
00442 delete cur;
00443 }
00444 continue;
00445 }
00446
00447
00448 string tag;
00449
00450
00451
00452 priority_queue<ByteLengthPrefixedStringItor *,
00453 vector<ByteLengthPrefixedStringItor *>,
00454 ByteLengthPrefixedStringItorGt> pqtag;
00455 vector<MergeCursor *> vec;
00456
00457 while (true) {
00458 cur->read_tag();
00459 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
00460 vec.push_back(cur);
00461 if (pq.empty() || pq.top()->current_key != key) break;
00462 cur = pq.top();
00463 pq.pop();
00464 }
00465
00466 string lastword;
00467 while (!pqtag.empty()) {
00468 ByteLengthPrefixedStringItor * it = pqtag.top();
00469 if (**it != lastword) {
00470 lastword = **it;
00471 tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
00472 tag += lastword;
00473 }
00474 ++*it;
00475 pqtag.pop();
00476 if (!it->at_end()) {
00477 pqtag.push(it);
00478 } else {
00479 delete it;
00480 }
00481 }
00482
00483 vector<MergeCursor *>::const_iterator i;
00484 for (i = vec.begin(); i != vec.end(); ++i) {
00485 cur = *i;
00486 if (cur->next()) {
00487 pq.push(cur);
00488 } else {
00489 delete cur;
00490 }
00491 }
00492
00493 out->add(key, tag);
00494 }
00495 }
00496
00497 static void
00498 multimerge_postlists(Xapian::Compactor & compactor,
00499 FlintTable * out, const char * tmpdir,
00500 Xapian::docid last_docid,
00501 vector<string> tmp, vector<Xapian::docid> off)
00502 {
00503 unsigned int c = 0;
00504 while (tmp.size() > 3) {
00505 vector<string> tmpout;
00506 tmpout.reserve(tmp.size() / 2);
00507 vector<Xapian::docid> newoff;
00508 newoff.resize(tmp.size() / 2);
00509 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
00510 j = i + 2;
00511 if (j == tmp.size() - 1) ++j;
00512
00513 string dest = tmpdir;
00514 char buf[64];
00515 sprintf(buf, "/tmp%u_%u.", c, i / 2);
00516 dest += buf;
00517
00518
00519
00520 FlintTable tmptab("postlist", dest, false);
00521
00522 tmptab.create_and_open(65536);
00523
00524 merge_postlists(compactor, &tmptab, off.begin() + i,
00525 tmp.begin() + i, tmp.begin() + j, 0);
00526 if (c > 0) {
00527 for (unsigned int k = i; k < j; ++k) {
00528 unlink((tmp[k] + "DB").c_str());
00529 unlink((tmp[k] + "baseA").c_str());
00530 unlink((tmp[k] + "baseB").c_str());
00531 }
00532 }
00533 tmpout.push_back(dest);
00534 tmptab.flush_db();
00535 tmptab.commit(1);
00536 }
00537 swap(tmp, tmpout);
00538 swap(off, newoff);
00539 ++c;
00540 }
00541 merge_postlists(compactor,
00542 out, off.begin(), tmp.begin(), tmp.end(), last_docid);
00543 if (c > 0) {
00544 for (size_t k = 0; k < tmp.size(); ++k) {
00545 unlink((tmp[k] + "DB").c_str());
00546 unlink((tmp[k] + "baseA").c_str());
00547 unlink((tmp[k] + "baseB").c_str());
00548 }
00549 }
00550 }
00551
00552 static void
00553 merge_docid_keyed(const char * tablename,
00554 FlintTable *out, const vector<string> & inputs,
00555 const vector<Xapian::docid> & offset, bool lazy)
00556 {
00557 for (size_t i = 0; i < inputs.size(); ++i) {
00558 Xapian::docid off = offset[i];
00559
00560 FlintTable in(tablename, inputs[i], true, DONT_COMPRESS, lazy);
00561 in.open();
00562 if (in.empty()) continue;
00563
00564 FlintCursor cur(&in);
00565 cur.find_entry(string());
00566
00567 string key;
00568 while (cur.next()) {
00569
00570 if (off) {
00571 Xapian::docid did;
00572 const char * d = cur.current_key.data();
00573 const char * e = d + cur.current_key.size();
00574 if (!F_unpack_uint_preserving_sort(&d, e, &did)) {
00575 string msg = "Bad key in ";
00576 msg += inputs[i];
00577 throw Xapian::DatabaseCorruptError(msg);
00578 }
00579 did += off;
00580 key = F_pack_uint_preserving_sort(did);
00581 if (d != e) {
00582
00583 key.append(d, e - d);
00584 }
00585 } else {
00586 key = cur.current_key;
00587 }
00588 bool compressed = cur.read_tag(true);
00589 out->add(key, cur.current_tag, compressed);
00590 }
00591 }
00592 }
00593
00594 }
00595
00596 using namespace FlintCompact;
00597
00598 void
00599 compact_flint(Xapian::Compactor & compactor,
00600 const char * destdir, const vector<string> & sources,
00601 const vector<Xapian::docid> & offset, size_t block_size,
00602 Xapian::Compactor::compaction_level compaction, bool multipass,
00603 Xapian::docid last_docid) {
00604 enum table_type {
00605 POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
00606 };
00607 struct table_list {
00608
00609 const char * name;
00610
00611 table_type type;
00612
00613 int compress_strategy;
00614
00615 bool lazy;
00616 };
00617
00618 static const table_list tables[] = {
00619
00620 { "postlist", POSTLIST, DONT_COMPRESS, false },
00621 { "record", RECORD, Z_DEFAULT_STRATEGY, false },
00622 { "termlist", TERMLIST, Z_DEFAULT_STRATEGY, false },
00623 { "position", POSITION, DONT_COMPRESS, true },
00624 { "value", VALUE, DONT_COMPRESS, true },
00625 { "spelling", SPELLING, Z_DEFAULT_STRATEGY, true },
00626 { "synonym", SYNONYM, Z_DEFAULT_STRATEGY, true }
00627 };
00628 const table_list * tables_end = tables +
00629 (sizeof(tables) / sizeof(tables[0]));
00630
00631 for (const table_list * t = tables; t < tables_end; ++t) {
00632
00633
00634
00635
00636
00637 compactor.set_status(t->name, string());
00638
00639 string dest = destdir;
00640 dest += '/';
00641 dest += t->name;
00642 dest += '.';
00643
00644 bool output_will_exist = !t->lazy;
00645
00646
00647
00648 bool bad_stat = false;
00649
00650 off_t in_size = 0;
00651
00652 vector<string> inputs;
00653 inputs.reserve(sources.size());
00654 size_t inputs_present = 0;
00655 for (vector<string>::const_iterator src = sources.begin();
00656 src != sources.end(); ++src) {
00657 string s(*src);
00658 s += t->name;
00659 s += '.';
00660
00661 struct stat sb;
00662 if (stat(s + "DB", &sb) == 0) {
00663 in_size += sb.st_size / 1024;
00664 output_will_exist = true;
00665 ++inputs_present;
00666 } else if (errno != ENOENT) {
00667
00668 bad_stat = true;
00669 output_will_exist = true;
00670 ++inputs_present;
00671 }
00672 inputs.push_back(s);
00673 }
00674
00675 if (!output_will_exist) {
00676 compactor.set_status(t->name, "doesn't exist");
00677 continue;
00678 }
00679
00680 FlintTable out(t->name, dest, false, t->compress_strategy, t->lazy);
00681 if (!t->lazy) {
00682 out.create_and_open(block_size);
00683 } else {
00684 out.erase();
00685 out.set_block_size(block_size);
00686 }
00687
00688 out.set_full_compaction(compaction != compactor.STANDARD);
00689 if (compaction == compactor.FULLER) out.set_max_item_size(1);
00690
00691 switch (t->type) {
00692 case POSTLIST:
00693 if (multipass && inputs.size() > 3) {
00694 multimerge_postlists(compactor, &out, destdir, last_docid,
00695 inputs, offset);
00696 } else {
00697 merge_postlists(compactor, &out, offset.begin(),
00698 inputs.begin(), inputs.end(),
00699 last_docid);
00700 }
00701 break;
00702 case SPELLING:
00703 merge_spellings(&out, inputs.begin(), inputs.end());
00704 break;
00705 case SYNONYM:
00706 merge_synonyms(&out, inputs.begin(), inputs.end());
00707 break;
00708 default:
00709
00710 merge_docid_keyed(t->name, &out, inputs, offset, t->lazy);
00711 break;
00712 }
00713
00714
00715 out.flush_db();
00716 out.commit(1);
00717
00718 off_t out_size = 0;
00719 if (!bad_stat) {
00720 struct stat sb;
00721 if (stat(dest + "DB", &sb) == 0) {
00722 out_size = sb.st_size / 1024;
00723 } else {
00724 bad_stat = (errno != ENOENT);
00725 }
00726 }
00727 if (bad_stat) {
00728 compactor.set_status(t->name, "Done (couldn't stat all the DB files)");
00729 } else {
00730 string status;
00731 if (out_size == in_size) {
00732 status = "Size unchanged (";
00733 } else {
00734 off_t delta;
00735 if (out_size < in_size) {
00736 delta = in_size - out_size;
00737 status = "Reduced by ";
00738 } else {
00739 delta = out_size - in_size;
00740 status = "INCREASED by ";
00741 }
00742 status += str(100 * delta / in_size);
00743 status += "% ";
00744 status += str(delta);
00745 status += "K (";
00746 status += str(in_size);
00747 status += "K -> ";
00748 }
00749 status += str(out_size);
00750 status += "K)";
00751 compactor.set_status(t->name, status);
00752 }
00753 }
00754 }