xapian-core  2.0.0
glass_compact.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2004-2024 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, see
18  * <https://www.gnu.org/licenses/>.
19  */
20 
21 #include <config.h>
22 
23 #include "xapian/compactor.h"
24 #include "xapian/constants.h"
25 #include "xapian/error.h"
26 #include "xapian/types.h"
27 
28 #include <algorithm>
29 #include <iterator>
30 #include <memory>
31 #include <queue>
32 
33 #include <cerrno>
34 
35 #include "backends/flint_lock.h"
36 #include "glass_database.h"
37 #include "glass_defs.h"
38 #include "glass_table.h"
39 #include "glass_cursor.h"
40 #include "glass_version.h"
41 #include "filetests.h"
42 #include "internaltypes.h"
43 #include "pack.h"
44 #include "backends/valuestats.h"
45 
46 #include "../byte_length_strings.h"
47 #include "../prefix_compressed_strings.h"
48 
49 using namespace std;
50 
51 // Put all the helpers in a namespace to avoid symbols colliding with those of
52 // the same name in other flint-derived backends.
53 namespace GlassCompact {
54 
55 static inline bool
56 is_user_metadata_key(const string & key)
57 {
58  return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
59 }
60 
61 static inline bool
62 is_valuestats_key(const string & key)
63 {
64  return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
65 }
66 
67 static inline bool
68 is_valuechunk_key(const string & key)
69 {
70  return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
71 }
72 
73 static inline bool
74 is_doclenchunk_key(const string & key)
75 {
76  return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
77 }
78 
79 class PostlistCursor : private GlassCursor {
81 
82  public:
83  string key, tag;
86 
88  : GlassCursor(in), offset(offset_), firstdid(0)
89  {
90  rewind();
91  next();
92  }
93 
94  bool next() {
95  if (!GlassCursor::next()) return false;
96  // We put all chunks into the non-initial chunk form here, then fix up
97  // the first chunk for each term in the merged database as we merge.
98  read_tag();
99  key = current_key;
100  tag = current_tag;
101  tf = cf = 0;
102  if (is_user_metadata_key(key)) return true;
103  if (is_valuestats_key(key)) return true;
104  if (is_valuechunk_key(key)) {
105  const char * p = key.data();
106  const char * end = p + key.length();
107  p += 2;
108  Xapian::valueno slot;
109  if (!unpack_uint(&p, end, &slot))
110  throw Xapian::DatabaseCorruptError("bad value key");
111  Xapian::docid did;
112  if (!unpack_uint_preserving_sort(&p, end, &did))
113  throw Xapian::DatabaseCorruptError("bad value key");
114  did += offset;
115 
116  key.assign("\0\xd8", 2);
117  pack_uint(key, slot);
118  pack_uint_preserving_sort(key, did);
119  return true;
120  }
121 
122  // Adjust key if this is *NOT* an initial chunk.
123  // key is: pack_string_preserving_sort(key, tname)
124  // plus optionally: pack_uint_preserving_sort(key, did)
125  const char * d = key.data();
126  const char * e = d + key.size();
127  if (is_doclenchunk_key(key)) {
128  d += 2;
129  } else {
130  string tname;
131  if (!unpack_string_preserving_sort(&d, e, tname))
132  throw Xapian::DatabaseCorruptError("Bad postlist key");
133  }
134 
135  if (d == e) {
136  // This is an initial chunk for a term, so adjust tag header.
137  d = tag.data();
138  e = d + tag.size();
139  if (!unpack_uint(&d, e, &tf) ||
140  !unpack_uint(&d, e, &cf) ||
141  !unpack_uint(&d, e, &firstdid)) {
142  throw Xapian::DatabaseCorruptError("Bad postlist key");
143  }
144  ++firstdid;
145  tag.erase(0, d - tag.data());
146  } else {
147  // Not an initial chunk, so adjust key.
148  size_t tmp = d - key.data();
149  if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
150  throw Xapian::DatabaseCorruptError("Bad postlist key");
151  if (is_doclenchunk_key(key)) {
152  key.erase(tmp);
153  } else {
154  key.erase(tmp - 1);
155  }
156  }
157  UNSIGNED_OVERFLOW_OK(firstdid += offset);
158  return true;
159  }
160 };
161 
163  public:
166  bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
167  if (a->key > b->key) return true;
168  if (a->key != b->key) return false;
169  return (a->firstdid > b->firstdid);
170  }
171 };
172 
173 static string
175  const string & lbound, const string & ubound)
176 {
177  string value;
178  pack_uint(value, freq);
179  pack_string(value, lbound);
180  // We don't store or count empty values, so neither of the bounds
181  // can be empty. So we can safely store an empty upper bound when
182  // the bounds are equal.
183  if (lbound != ubound) value += ubound;
184  return value;
185 }
186 
187 static void
189  GlassTable * out, vector<Xapian::docid>::const_iterator offset,
190  vector<const GlassTable*>::const_iterator b,
191  vector<const GlassTable*>::const_iterator e)
192 {
193  priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
194  for ( ; b != e; ++b, ++offset) {
195  const GlassTable *in = *b;
196  if (in->empty()) {
197  // Skip empty tables.
198  continue;
199  }
200 
201  pq.push(new PostlistCursor(in, *offset));
202  }
203 
204  string last_key;
205  {
206  // Merge user metadata.
207  vector<string> tags;
208  while (!pq.empty()) {
209  PostlistCursor * cur = pq.top();
210  const string& key = cur->key;
211  if (!is_user_metadata_key(key)) break;
212 
213  if (key != last_key) {
214  if (!tags.empty()) {
215  if (tags.size() > 1 && compactor) {
216  Assert(!last_key.empty());
217  // FIXME: It would be better to merge all duplicates
218  // for a key in one call, but currently we don't in
219  // multipass mode.
220  const string & resolved_tag =
221  compactor->resolve_duplicate_metadata(last_key,
222  tags.size(),
223  &tags[0]);
224  if (!resolved_tag.empty())
225  out->add(last_key, resolved_tag);
226  } else {
227  Assert(!last_key.empty());
228  out->add(last_key, tags[0]);
229  }
230  tags.resize(0);
231  }
232  last_key = key;
233  }
234  tags.push_back(cur->tag);
235 
236  pq.pop();
237  if (cur->next()) {
238  pq.push(cur);
239  } else {
240  delete cur;
241  }
242  }
243  if (!tags.empty()) {
244  if (tags.size() > 1 && compactor) {
245  Assert(!last_key.empty());
246  const string & resolved_tag =
247  compactor->resolve_duplicate_metadata(last_key,
248  tags.size(),
249  &tags[0]);
250  if (!resolved_tag.empty())
251  out->add(last_key, resolved_tag);
252  } else {
253  Assert(!last_key.empty());
254  out->add(last_key, tags[0]);
255  }
256  }
257  }
258 
259  {
260  // Merge valuestats.
261  Xapian::doccount freq = 0;
262  string lbound, ubound;
263 
264  while (!pq.empty()) {
265  PostlistCursor * cur = pq.top();
266  const string& key = cur->key;
267  if (!is_valuestats_key(key)) break;
268  if (key != last_key) {
269  // For the first valuestats key, last_key will be the previous
270  // key we wrote, which we don't want to overwrite. This is the
271  // only time that freq will be 0, so check that.
272  if (freq) {
273  out->add(last_key, encode_valuestats(freq, lbound, ubound));
274  freq = 0;
275  }
276  last_key = key;
277  }
278 
279  const string & tag = cur->tag;
280 
281  const char * pos = tag.data();
282  const char * end = pos + tag.size();
283 
285  string l, u;
286  if (!unpack_uint(&pos, end, &f)) {
287  if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
288  throw Xapian::RangeError("Frequency statistic in value table is too large");
289  }
290  if (!unpack_string(&pos, end, l)) {
291  if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
292  throw Xapian::RangeError("Lower bound in value table is too large");
293  }
294  size_t len = end - pos;
295  if (len == 0) {
296  u = l;
297  } else {
298  u.assign(pos, len);
299  }
300  if (freq == 0) {
301  freq = f;
302  lbound = l;
303  ubound = u;
304  } else {
305  freq += f;
306  if (l < lbound) lbound = l;
307  if (u > ubound) ubound = u;
308  }
309 
310  pq.pop();
311  if (cur->next()) {
312  pq.push(cur);
313  } else {
314  delete cur;
315  }
316  }
317 
318  if (freq) {
319  out->add(last_key, encode_valuestats(freq, lbound, ubound));
320  }
321  }
322 
323  // Merge valuestream chunks.
324  while (!pq.empty()) {
325  PostlistCursor * cur = pq.top();
326  const string & key = cur->key;
327  if (!is_valuechunk_key(key)) break;
329  out->add(key, cur->tag);
330  pq.pop();
331  if (cur->next()) {
332  pq.push(cur);
333  } else {
334  delete cur;
335  }
336  }
337 
338  Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
339  vector<pair<Xapian::docid, string>> tags;
340  while (true) {
341  PostlistCursor * cur = NULL;
342  if (!pq.empty()) {
343  cur = pq.top();
344  pq.pop();
345  }
346  Assert(cur == NULL || !is_user_metadata_key(cur->key));
347  if (cur == NULL || cur->key != last_key) {
348  if (!tags.empty()) {
349  string first_tag;
350  pack_uint(first_tag, tf);
351  pack_uint(first_tag, cf);
352  pack_uint(first_tag, tags[0].first - 1);
353  string tag = tags[0].second;
354  tag[0] = (tags.size() == 1) ? '1' : '0';
355  first_tag += tag;
356  out->add(last_key, first_tag);
357 
358  string term;
359  if (!is_doclenchunk_key(last_key)) {
360  const char * p = last_key.data();
361  const char * end = p + last_key.size();
362  if (!unpack_string_preserving_sort(&p, end, term) || p != end)
363  throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
364  }
365 
366  auto i = tags.begin();
367  while (++i != tags.end()) {
368  tag = i->second;
369  tag[0] = (i + 1 == tags.end()) ? '1' : '0';
370  out->add(pack_glass_postlist_key(term, i->first), tag);
371  }
372  }
373  tags.clear();
374  if (cur == NULL) break;
375  tf = cf = 0;
376  last_key = cur->key;
377  }
378  tf += cur->tf;
379  cf += cur->cf;
380  tags.push_back(make_pair(cur->firstdid, cur->tag));
381  if (cur->next()) {
382  pq.push(cur);
383  } else {
384  delete cur;
385  }
386  }
387 }
388 
389 struct MergeCursor : public GlassCursor {
390  explicit MergeCursor(const GlassTable *in) : GlassCursor(in) {
391  rewind();
392  next();
393  }
394 };
395 
396 struct CursorGt {
398  bool operator()(const GlassCursor *a, const GlassCursor *b) const {
399  if (b->after_end()) return false;
400  if (a->after_end()) return true;
401  return (a->current_key > b->current_key);
402  }
403 };
404 
405 static void
407  vector<const GlassTable*>::const_iterator b,
408  vector<const GlassTable*>::const_iterator e)
409 {
410  priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
411  for ( ; b != e; ++b) {
412  const GlassTable *in = *b;
413  if (!in->empty()) {
414  pq.push(new MergeCursor(in));
415  }
416  }
417 
418  while (!pq.empty()) {
419  MergeCursor * cur = pq.top();
420  pq.pop();
421 
422  string key = cur->current_key;
423  if (pq.empty() || pq.top()->current_key > key) {
424  // No need to merge the tags, just copy the (possibly compressed)
425  // tag value.
426  bool compressed = cur->read_tag(true);
427  out->add(key, cur->current_tag, compressed);
428  if (cur->next()) {
429  pq.push(cur);
430  } else {
431  delete cur;
432  }
433  continue;
434  }
435 
436  // Merge tag values with the same key:
437  string tag;
438  if (key[0] != 'W') {
439  // We just want the union of words, so copy over the first instance
440  // and skip any identical ones.
441  priority_queue<PrefixCompressedStringItor *,
442  vector<PrefixCompressedStringItor *>,
444  // Stick all the MergeCursor pointers in a vector because their
445  // current_tag members must remain valid while we're merging their
446  // tags, but we need to call next() on them all afterwards.
447  vector<MergeCursor *> vec;
448  vec.reserve(pq.size());
449 
450  while (true) {
451  cur->read_tag();
452  pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
453  vec.push_back(cur);
454  if (pq.empty() || pq.top()->current_key != key) break;
455  cur = pq.top();
456  pq.pop();
457  }
458 
460  string lastword;
461  while (!pqtag.empty()) {
462  PrefixCompressedStringItor * it = pqtag.top();
463  pqtag.pop();
464  string word = **it;
465  if (word != lastword) {
466  lastword = word;
467  wr.append(lastword);
468  }
469  ++*it;
470  if (!it->at_end()) {
471  pqtag.push(it);
472  } else {
473  delete it;
474  }
475  }
476 
477  vector<MergeCursor *>::const_iterator i;
478  for (i = vec.begin(); i != vec.end(); ++i) {
479  cur = *i;
480  if (cur->next()) {
481  pq.push(cur);
482  } else {
483  delete cur;
484  }
485  }
486  } else {
487  // We want to sum the frequencies from tags for the same key.
488  Xapian::termcount tot_freq = 0;
489  while (true) {
490  cur->read_tag();
491  Xapian::termcount freq;
492  const char * p = cur->current_tag.data();
493  const char * end = p + cur->current_tag.size();
494  if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
495  throw Xapian::DatabaseCorruptError("Bad spelling word freq");
496  }
497  tot_freq += freq;
498  if (cur->next()) {
499  pq.push(cur);
500  } else {
501  delete cur;
502  }
503  if (pq.empty() || pq.top()->current_key != key) break;
504  cur = pq.top();
505  pq.pop();
506  }
507  tag.resize(0);
508  pack_uint_last(tag, tot_freq);
509  }
510  out->add(key, tag);
511  }
512 }
513 
514 static void
516  vector<const GlassTable*>::const_iterator b,
517  vector<const GlassTable*>::const_iterator e)
518 {
519  priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
520  for ( ; b != e; ++b) {
521  const GlassTable *in = *b;
522  if (!in->empty()) {
523  pq.push(new MergeCursor(in));
524  }
525  }
526 
527  while (!pq.empty()) {
528  MergeCursor * cur = pq.top();
529  pq.pop();
530 
531  string key = cur->current_key;
532  if (pq.empty() || pq.top()->current_key > key) {
533  // No need to merge the tags, just copy the (possibly compressed)
534  // tag value.
535  bool compressed = cur->read_tag(true);
536  out->add(key, cur->current_tag, compressed);
537  if (cur->next()) {
538  pq.push(cur);
539  } else {
540  delete cur;
541  }
542  continue;
543  }
544 
545  // Merge tag values with the same key:
546  string tag;
547 
548  // We just want the union of words, so copy over the first instance
549  // and skip any identical ones.
550  priority_queue<ByteLengthPrefixedStringItor *,
551  vector<ByteLengthPrefixedStringItor *>,
553  vector<MergeCursor *> vec;
554 
555  while (true) {
556  cur->read_tag();
557  pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
558  vec.push_back(cur);
559  if (pq.empty() || pq.top()->current_key != key) break;
560  cur = pq.top();
561  pq.pop();
562  }
563 
564  string_view lastword;
565  while (!pqtag.empty()) {
566  ByteLengthPrefixedStringItor * it = pqtag.top();
567  pqtag.pop();
568  string_view word = **it;
569  if (word != lastword) {
570  tag += uint8_t(word.size() ^ MAGIC_XOR_VALUE);
571  tag += word;
572  lastword = word;
573  }
574  ++*it;
575  if (!it->at_end()) {
576  pqtag.push(it);
577  } else {
578  delete it;
579  }
580  }
581 
582  vector<MergeCursor *>::const_iterator i;
583  for (i = vec.begin(); i != vec.end(); ++i) {
584  cur = *i;
585  if (cur->next()) {
586  pq.push(cur);
587  } else {
588  delete cur;
589  }
590  }
591 
592  out->add(key, tag);
593  }
594 }
595 
596 static void
598  GlassTable * out, const char * tmpdir,
599  vector<const GlassTable *> tmp,
600  vector<Xapian::docid> off)
601 {
602  unsigned int c = 0;
603  while (tmp.size() > 3) {
604  vector<const GlassTable *> tmpout;
605  tmpout.reserve(tmp.size() / 2);
606  vector<Xapian::docid> newoff;
607  newoff.resize(tmp.size() / 2);
608  for (unsigned int i = 0, j; i < tmp.size(); i = j) {
609  j = i + 2;
610  if (j == tmp.size() - 1) ++j;
611 
612  string dest = tmpdir;
613  dest += "/tmp";
614  dest += str(c);
615  dest += '_';
616  dest += str(i / 2);
617  dest += '.';
618 
619  GlassTable * tmptab = new GlassTable("postlist", dest, false);
620 
621  // Use maximum blocksize for temporary tables. And don't compress
622  // entries in temporary tables, even if the final table would do
623  // so. Any already compressed entries will get copied in
624  // compressed form.
625  RootInfo root_info;
626  root_info.init(65536, 0);
627  const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
628  tmptab->create_and_open(flags, root_info);
629 
630  merge_postlists(compactor, tmptab, off.begin() + i,
631  tmp.begin() + i, tmp.begin() + j);
632  if (c > 0) {
633  for (unsigned int k = i; k < j; ++k) {
634  unlink(tmp[k]->get_path().c_str());
635  delete tmp[k];
636  tmp[k] = NULL;
637  }
638  }
639  tmpout.push_back(tmptab);
640  tmptab->flush_db();
641  tmptab->commit(1, &root_info);
642  AssertRel(root_info.get_blocksize(),==,65536);
643  }
644  swap(tmp, tmpout);
645  swap(off, newoff);
646  ++c;
647  }
648  merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
649  if (c > 0) {
650  for (size_t k = 0; k < tmp.size(); ++k) {
651  unlink(tmp[k]->get_path().c_str());
652  delete tmp[k];
653  tmp[k] = NULL;
654  }
655  }
656 }
657 
658 class PositionCursor : private GlassCursor {
660 
661  public:
662  string key;
664 
666  : GlassCursor(in), offset(offset_), firstdid(0) {
667  rewind();
668  next();
669  }
670 
671  bool next() {
672  if (!GlassCursor::next()) return false;
673  read_tag();
674  const char * d = current_key.data();
675  const char * e = d + current_key.size();
676  string term;
677  Xapian::docid did;
678  if (!unpack_string_preserving_sort(&d, e, term) ||
679  !unpack_uint_preserving_sort(&d, e, &did) ||
680  d != e) {
681  throw Xapian::DatabaseCorruptError("Bad position key");
682  }
683 
684  key.resize(0);
686  pack_uint_preserving_sort(key, did + offset);
687  return true;
688  }
689 
690  const string & get_tag() const {
691  return current_tag;
692  }
693 };
694 
696  public:
699  bool operator()(const PositionCursor *a, const PositionCursor *b) const {
700  return a->key > b->key;
701  }
702 };
703 
704 static void
705 merge_positions(GlassTable *out, const vector<const GlassTable*> & inputs,
706  const vector<Xapian::docid> & offset)
707 {
708  priority_queue<PositionCursor *, vector<PositionCursor *>, PositionCursorGt> pq;
709  for (size_t i = 0; i < inputs.size(); ++i) {
710  const GlassTable *in = inputs[i];
711  if (in->empty()) {
712  // Skip empty tables.
713  continue;
714  }
715 
716  pq.push(new PositionCursor(in, offset[i]));
717  }
718 
719  while (!pq.empty()) {
720  PositionCursor * cur = pq.top();
721  pq.pop();
722  out->add(cur->key, cur->get_tag());
723  if (cur->next()) {
724  pq.push(cur);
725  } else {
726  delete cur;
727  }
728  }
729 }
730 
731 static void
732 merge_docid_keyed(GlassTable *out, const vector<const GlassTable*> & inputs,
733  const vector<Xapian::docid> & offset)
734 {
735  for (size_t i = 0; i < inputs.size(); ++i) {
736  Xapian::docid off = offset[i];
737 
738  const GlassTable * in = inputs[i];
739  if (in->empty()) continue;
740 
741  GlassCursor cur(in);
742  cur.rewind();
743 
744  string key;
745  while (cur.next()) {
746  // Adjust the key if this isn't the first database.
747  if (off) {
748  Xapian::docid did;
749  const char * d = cur.current_key.data();
750  const char * e = d + cur.current_key.size();
751  if (!unpack_uint_preserving_sort(&d, e, &did)) {
752  string msg = "Bad key in ";
753  msg += inputs[i]->get_path();
754  throw Xapian::DatabaseCorruptError(msg);
755  }
756  UNSIGNED_OVERFLOW_OK(did += off);
757  key.resize(0);
758  pack_uint_preserving_sort(key, did);
759  if (d != e) {
760  // Copy over anything extra in the key (e.g. the zero byte
761  // at the end of "used value slots" in the termlist table).
762  key.append(d, e - d);
763  }
764  } else {
765  key = cur.current_key;
766  }
767  bool compressed = cur.read_tag(true);
768  out->add(key, cur.current_tag, compressed);
769  }
770  }
771 }
772 
773 }
774 
775 using namespace GlassCompact;
776 
777 void
779  const char * destdir,
780  int fd,
781  const vector<const Xapian::Database::Internal*>& sources,
782  const vector<Xapian::docid> & offset,
783  unsigned block_size,
785  unsigned flags,
786  Xapian::docid last_docid)
787 {
788  struct table_list {
789  // The "base name" of the table.
790  char name[9];
791  // The type.
792  Glass::table_type type;
793  // Create tables after position lazily.
794  bool lazy;
795  };
796 
797  static const table_list tables[] = {
798  // name type lazy
799  { "postlist", Glass::POSTLIST, false },
800  { "docdata", Glass::DOCDATA, true },
801  { "termlist", Glass::TERMLIST, false },
802  { "position", Glass::POSITION, true },
803  { "spelling", Glass::SPELLING, true },
804  { "synonym", Glass::SYNONYM, true }
805  };
806  const table_list* tables_end = std::end(tables);
807 
808  const int FLAGS = Xapian::DB_DANGEROUS;
809 
810  bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
811  bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
812  if (single_file) {
813  // FIXME: Support this combination - we need to put temporary files
814  // somewhere.
815  multipass = false;
816  }
817 
818  for (size_t i = 0; i != sources.size(); ++i) {
819  auto db = static_cast<const GlassDatabase*>(sources[i]);
820  if (db->has_uncommitted_changes()) {
821  const char * m =
822  "Can't compact from a WritableDatabase with uncommitted "
823  "changes - either call commit() first, or create a new "
824  "Database object from the filename on disk";
826  }
827  }
828 
829  if (block_size < GLASS_MIN_BLOCKSIZE ||
830  block_size > GLASS_MAX_BLOCKSIZE ||
831  (block_size & (block_size - 1)) != 0) {
832  block_size = GLASS_DEFAULT_BLOCKSIZE;
833  }
834 
835  FlintLock lock(destdir ? destdir : "");
836  if (!single_file) {
837  string explanation;
838  FlintLock::reason why = lock.lock(true, false, explanation);
839  if (why != FlintLock::SUCCESS) {
840  lock.throw_databaselockerror(why, destdir, explanation);
841  }
842  }
843 
844  unique_ptr<GlassVersion> version_file_out;
845  if (single_file) {
846  if (destdir) {
847  fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
848  if (fd < 0) {
849  throw Xapian::DatabaseCreateError("open() failed", errno);
850  }
851  }
852  version_file_out.reset(new GlassVersion(fd));
853  } else {
854  fd = -1;
855  version_file_out.reset(new GlassVersion(destdir));
856  }
857 
858  version_file_out->create(block_size);
859  for (size_t i = 0; i != sources.size(); ++i) {
860  auto db = static_cast<const GlassDatabase*>(sources[i]);
861  version_file_out->merge_stats(db->version_file);
862  }
863 
864  string fl_serialised;
865  if (single_file) {
866  GlassFreeList fl;
867  fl.set_first_unused_block(1); // FIXME: Assumption?
868  fl.pack(fl_serialised);
869  }
870 
871  vector<GlassTable *> tabs;
872  tabs.reserve(tables_end - tables);
873  file_size_type prev_size = block_size;
874  for (const table_list * t = tables; t < tables_end; ++t) {
875  // The postlist table requires an N-way merge, adjusting the
876  // headers of various blocks. The spelling and synonym tables also
877  // need special handling. The other tables have keys sorted in
878  // docid order, so we can merge them by simply copying all the keys
879  // from each source table in turn.
880  if (compactor)
881  compactor->set_status(t->name, string());
882 
883  string dest;
884  if (!single_file) {
885  dest = destdir;
886  dest += '/';
887  dest += t->name;
888  dest += '.';
889  }
890 
891  bool output_will_exist = !t->lazy;
892 
893  // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
894  // on certain systems).
895  bool bad_stat = false;
896 
897  // We can't currently report input sizes if there's a single file DB
898  // amongst the inputs.
899  bool single_file_in = false;
900 
901  file_size_type in_size = 0;
902 
903  vector<const GlassTable*> inputs;
904  inputs.reserve(sources.size());
905  size_t inputs_present = 0;
906  for (auto src : sources) {
907  auto db = static_cast<const GlassDatabase*>(src);
908  const GlassTable * table;
909  switch (t->type) {
910  case Glass::POSTLIST:
911  table = &(db->postlist_table);
912  break;
913  case Glass::DOCDATA:
914  table = &(db->docdata_table);
915  break;
916  case Glass::TERMLIST:
917  table = &(db->termlist_table);
918  break;
919  case Glass::POSITION:
920  table = &(db->position_table);
921  break;
922  case Glass::SPELLING:
923  table = &(db->spelling_table);
924  break;
925  case Glass::SYNONYM:
926  table = &(db->synonym_table);
927  break;
928  default:
929  Assert(false);
930  return;
931  }
932 
933  if (db->single_file()) {
934  if (t->lazy && table->empty()) {
935  // Essentially doesn't exist.
936  } else {
937  // FIXME: Find actual size somehow?
938  // in_size += table->size() / 1024;
939  single_file_in = true;
940  output_will_exist = true;
941  ++inputs_present;
942  }
943  } else {
944  auto db_size = file_size(table->get_path());
945  if (errno == 0) {
946  in_size += db_size / 1024;
947  output_will_exist = true;
948  ++inputs_present;
949  } else if (errno != ENOENT) {
950  // We get ENOENT for an optional table.
951  bad_stat = true;
952  output_will_exist = true;
953  ++inputs_present;
954  }
955  }
956  inputs.push_back(table);
957  }
958 
959  // If any inputs lack a termlist table, suppress it in the output.
960  if (t->type == Glass::TERMLIST && inputs_present != sources.size()) {
961  if (inputs_present != 0) {
962  if (compactor) {
963  string m = str(inputs_present);
964  m += " of ";
965  m += str(sources.size());
966  m += " inputs present, so suppressing output";
967  compactor->set_status(t->name, m);
968  }
969  continue;
970  }
971  output_will_exist = false;
972  }
973 
974  if (!output_will_exist) {
975  if (compactor)
976  compactor->set_status(t->name, "doesn't exist");
977  continue;
978  }
979 
980  GlassTable * out;
981  if (single_file) {
982  out = new GlassTable(t->name, fd, version_file_out->get_offset(),
983  false, false);
984  } else {
985  out = new GlassTable(t->name, dest, false, t->lazy);
986  }
987  tabs.push_back(out);
988  RootInfo * root_info = version_file_out->root_to_set(t->type);
989  if (single_file) {
990  root_info->set_free_list(fl_serialised);
991  out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
992  } else {
993  out->create_and_open(FLAGS, *root_info);
994  }
995 
996  out->set_full_compaction(compaction != compactor->STANDARD);
997 
998  switch (t->type) {
999  case Glass::POSTLIST: {
1000  if (multipass && inputs.size() > 3) {
1001  multimerge_postlists(compactor, out, destdir,
1002  inputs, offset);
1003  } else {
1004  merge_postlists(compactor, out, offset.begin(),
1005  inputs.begin(), inputs.end());
1006  }
1007  break;
1008  }
1009  case Glass::SPELLING:
1010  merge_spellings(out, inputs.begin(), inputs.end());
1011  break;
1012  case Glass::SYNONYM:
1013  merge_synonyms(out, inputs.begin(), inputs.end());
1014  break;
1015  case Glass::POSITION:
1016  merge_positions(out, inputs, offset);
1017  break;
1018  default:
1019  // DocData, Termlist
1020  merge_docid_keyed(out, inputs, offset);
1021  break;
1022  }
1023 
1024  if (out->is_modified()) {
1025  // Commit as revision 1.
1026  out->flush_db();
1027  out->commit(1, root_info);
1028  out->sync();
1029  }
1030  if (single_file) fl_serialised = root_info->get_free_list();
1031 
1032  file_size_type out_size = 0;
1033  if (!bad_stat && !single_file_in) {
1034  file_size_type db_size;
1035  if (single_file) {
1036  db_size = file_size(fd);
1037  } else {
1038  db_size = file_size(dest + GLASS_TABLE_EXTENSION);
1039  }
1040  if (errno == 0) {
1041  if (single_file) {
1042  auto old_prev_size = max(prev_size,
1043  file_size_type(block_size));
1044  prev_size = db_size;
1045  db_size = max(db_size, file_size_type(block_size));
1046  db_size -= old_prev_size;
1047  }
1048  out_size = db_size / 1024;
1049  } else {
1050  bad_stat = (errno != ENOENT);
1051  }
1052  }
1053  if (bad_stat) {
1054  if (compactor)
1055  compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
1056  } else if (single_file_in) {
1057  if (compactor)
1058  compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
1059  } else {
1060  string status;
1061  if (out_size == in_size) {
1062  status = "Size unchanged (";
1063  } else {
1064  off_t delta;
1065  if (out_size < in_size) {
1066  delta = in_size - out_size;
1067  status = "Reduced by ";
1068  } else {
1069  delta = out_size - in_size;
1070  status = "INCREASED by ";
1071  }
1072  if (in_size) {
1073  status += str(100 * delta / in_size);
1074  status += "% ";
1075  }
1076  status += str(delta);
1077  status += "K (";
1078  status += str(in_size);
1079  status += "K -> ";
1080  }
1081  status += str(out_size);
1082  status += "K)";
1083  if (compactor)
1084  compactor->set_status(t->name, status);
1085  }
1086  }
1087 
1088  // If compacting to a single file output and all the tables are empty, pad
1089  // the output so that it isn't mistaken for a stub database when we try to
1090  // open it. For this it needs to be a multiple of 2KB in size.
1091  if (single_file && prev_size < block_size) {
1092 #ifdef HAVE_FTRUNCATE
1093  if (ftruncate(fd, block_size) < 0) {
1094  throw Xapian::DatabaseError("Failed to set size of output database", errno);
1095  }
1096 #else
1097  const off_t off = block_size - 1;
1098  if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
1099  throw Xapian::DatabaseError("Failed to set size of output database", errno);
1100  }
1101 #endif
1102  }
1103 
1104  if (single_file) {
1105  if (lseek(fd, version_file_out->get_offset(), SEEK_SET) < 0) {
1106  throw Xapian::DatabaseError("lseek() failed", errno);
1107  }
1108  }
1109  version_file_out->set_last_docid(last_docid);
1110  string tmpfile = version_file_out->write(1, FLAGS);
1111  for (unsigned j = 0; j != tabs.size(); ++j) {
1112  tabs[j]->sync();
1113  }
1114  // Commit with revision 1.
1115  version_file_out->sync(tmpfile, 1, FLAGS);
1116  for (unsigned j = 0; j != tabs.size(); ++j) {
1117  delete tabs[j];
1118  }
1119 
1120  if (!single_file) lock.release();
1121 }
#define MAGIC_XOR_VALUE
void release()
Release the lock.
Definition: flint_lock.cc:458
reason lock(bool exclusive, bool wait, std::string &explanation)
Attempt to obtain the lock.
Definition: flint_lock.cc:124
void throw_databaselockerror(FlintLock::reason why, const std::string &db_dir, const std::string &explanation) const
Throw Xapian::DatabaseLockError.
Definition: flint_lock.cc:494
bool operator()(const PositionCursor *a, const PositionCursor *b) const
Return true if and only if a's key is strictly greater than b's key.
PositionCursor(const GlassTable *in, Xapian::docid offset_)
const string & get_tag() const
bool operator()(const PostlistCursor *a, const PostlistCursor *b) const
Return true if and only if a's key is strictly greater than b's key.
PostlistCursor(const GlassTable *in, Xapian::docid offset_)
A cursor pointing to a position in a Btree table, for reading several entries in order,...
Definition: glass_cursor.h:148
string current_key
Current key pointed to by cursor.
Definition: glass_cursor.h:239
bool after_end() const
Determine whether cursor is off the end of table.
Definition: glass_cursor.h:337
bool read_tag(bool keep_compressed=false)
Read the tag from the table and store it in current_tag.
bool next()
Advance to the next key.
void rewind()
Position cursor on the dummy empty key.
Definition: glass_cursor.h:250
string current_tag
Current tag pointed to by cursor.
Definition: glass_cursor.h:244
A backend designed for efficient indexing and retrieval, using compressed posting lists and a btree s...
static void compact(Xapian::Compactor *compactor, const char *destdir, int fd, const std::vector< const Xapian::Database::Internal * > &sources, const std::vector< Xapian::docid > &offset, unsigned block_size, Xapian::Compactor::compaction_level compaction, unsigned flags, Xapian::docid last_docid)
void set_first_unused_block(uint4 base)
void pack(std::string &buf)
Class managing a Btree table in a Glass database.
Definition: glass_table.h:432
void create_and_open(int flags_, const RootInfo &root_info)
Create a new empty btree structure on disk and open it at the initial revision.
bool sync()
Definition: glass_table.h:542
void commit(glass_revision_number_t revision, RootInfo *root_info)
Commit any outstanding changes to the table.
string get_path() const
Definition: glass_table.h:738
void flush_db()
Flush any outstanding changes to the DB file of the table.
bool empty() const
Return true if there are no entries in the table.
Definition: glass_table.h:690
void open(int flags_, const RootInfo &root_info, glass_revision_number_t rev)
Open the btree.
bool is_modified() const
Determine whether the object contains uncommitted modifications.
Definition: glass_table.h:706
void set_full_compaction(bool parity)
void add(std::string_view key, std::string_view tag, bool already_compressed=false)
Add a key/tag pair to the table, replacing any existing pair with the same key.
The GlassVersion class manages the revision files.
Definition: glass_version.h:96
const std::string & get_free_list() const
Definition: glass_version.h:70
void init(unsigned blocksize_, uint4 compress_min_)
unsigned get_blocksize() const
Definition: glass_version.h:64
void set_free_list(const std::string &s)
Definition: glass_version.h:82
void append(const std::string &word)
Compact a database, or merge and compact several.
Definition: compactor.h:39
virtual void set_status(const std::string &table, const std::string &status)
Update progress.
Definition: compactor.cc:87
compaction_level
Compaction level.
Definition: compactor.h:42
@ STANDARD
Don't split items unnecessarily.
Definition: compactor.h:44
virtual std::string resolve_duplicate_metadata(const std::string &key, size_t num_tags, const std::string tags[])
Resolve multiple user metadata entries with the same key.
Definition: compactor.cc:94
DatabaseCorruptError indicates database corruption was detected.
Definition: error.h:397
DatabaseCreateError indicates a failure to create a database.
Definition: error.h:439
DatabaseError indicates some sort of database related error.
Definition: error.h:355
InvalidOperationError indicates the API was used in an invalid way.
Definition: error.h:271
RangeError indicates an attempt to access outside the bounds of a container.
Definition: error.h:959
Compact a database, or merge and compact several.
#define UNSIGNED_OVERFLOW_OK(X)
Definition: config.h:626
Constants in the Xapian namespace.
string term
PositionList * p
Xapian::termpos pos
Hierarchy of classes which Xapian can throw as exceptions.
Utility functions for testing files.
std::make_unsigned_t< off_t > file_size_type
Unsigned return type of file_size() function.
Definition: filetests.h:57
file_size_type file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:76
Flint-compatible database locking.
Interface to Btree cursors.
C++ class definition for glass database.
static bool is_user_metadata_key(const string &key)
Definitions, types, etc for use inside glass.
#define GLASS_MIN_BLOCKSIZE
Minimum B-tree block size.
Definition: glass_defs.h:33
#define GLASS_DEFAULT_BLOCKSIZE
Default B-tree block size.
Definition: glass_defs.h:30
#define GLASS_TABLE_EXTENSION
Glass table extension.
Definition: glass_defs.h:27
#define GLASS_MAX_BLOCKSIZE
Maximum B-tree block size.
Definition: glass_defs.h:36
Btree implementation.
GlassVersion class.
Types used internally.
static void multimerge_postlists(Xapian::Compactor *compactor, GlassTable *out, const char *tmpdir, vector< const GlassTable * > tmp, vector< Xapian::docid > off)
static string encode_valuestats(Xapian::doccount freq, const string &lbound, const string &ubound)
static void merge_docid_keyed(GlassTable *out, const vector< const GlassTable * > &inputs, const vector< Xapian::docid > &offset)
static void merge_synonyms(GlassTable *out, vector< const GlassTable * >::const_iterator b, vector< const GlassTable * >::const_iterator e)
static bool is_doclenchunk_key(const string &key)
static void merge_spellings(GlassTable *out, vector< const GlassTable * >::const_iterator b, vector< const GlassTable * >::const_iterator e)
static bool is_valuestats_key(const string &key)
static void merge_positions(GlassTable *out, const vector< const GlassTable * > &inputs, const vector< Xapian::docid > &offset)
static void merge_postlists(Xapian::Compactor *compactor, GlassTable *out, vector< Xapian::docid >::const_iterator offset, vector< const GlassTable * >::const_iterator b, vector< const GlassTable * >::const_iterator e)
static bool is_valuechunk_key(const string &key)
table_type
Definition: glass_defs.h:53
@ DOCDATA
Definition: glass_defs.h:55
@ SYNONYM
Definition: glass_defs.h:59
@ POSITION
Definition: glass_defs.h:57
@ POSTLIST
Definition: glass_defs.h:54
@ TERMLIST
Definition: glass_defs.h:56
@ SPELLING
Definition: glass_defs.h:58
string str(int value)
Convert int to std::string.
Definition: str.cc:91
Database open(std::string_view host, unsigned int port, unsigned timeout=10000, unsigned connect_timeout=10000)
Construct a Database object for read-only access to a remote database accessed via a TCP connection.
unsigned XAPIAN_TERMCOUNT_BASE_TYPE termcount
A counts of terms.
Definition: types.h:64
const int DB_NO_SYNC
Don't attempt to ensure changes have hit disk.
Definition: constants.h:65
const int DBCOMPACT_MULTIPASS
If merging more than 3 databases, merge the postlists in multiple passes.
Definition: constants.h:257
unsigned valueno
The number for a value slot in a document.
Definition: types.h:90
unsigned XAPIAN_DOCID_BASE_TYPE doccount
A count of documents.
Definition: types.h:37
unsigned XAPIAN_DOCID_BASE_TYPE docid
A unique identifier for a document.
Definition: types.h:51
const int DBCOMPACT_SINGLE_FILE
Produce a single-file database.
Definition: constants.h:263
const int DB_DANGEROUS
Update the database in-place.
Definition: constants.h:102
#define AssertRel(A, REL, B)
Definition: omassert.h:123
#define Assert(COND)
Definition: omassert.h:122
Pack types into strings and unpack them again.
bool unpack_uint_last(const char **p, const char *end, U *result)
Decode an unsigned integer as the last item in a string.
Definition: pack.h:118
bool unpack_string_preserving_sort(const char **p, const char *end, std::string &result)
Decode a "sort preserved" std::string from a string.
Definition: pack.h:551
bool unpack_string(const char **p, const char *end, std::string &result)
Decode a std::string from a string.
Definition: pack.h:468
void pack_uint_last(std::string &s, U value)
Append an encoded unsigned integer to a string as the last item.
Definition: pack.h:100
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:346
void pack_string_preserving_sort(std::string &s, std::string_view value, bool last=false)
Append an encoded std::string to a string, preserving the sort order.
Definition: pack.h:528
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:315
void pack_string(std::string &s, std::string_view value)
Append an encoded std::string to a string.
Definition: pack.h:442
bool unpack_uint_preserving_sort(const char **p, const char *end, U *result)
Decode a "sort preserved" unsigned integer from a string.
Definition: pack.h:251
std::string pack_glass_postlist_key(std::string_view term)
Definition: pack.h:574
void pack_uint_preserving_sort(std::string &s, U value)
Append an encoded unsigned integer to a string, preserving the sort order.
Definition: pack.h:204
#define O_BINARY
Definition: safefcntl.h:80
#define O_CLOEXEC
Definition: safefcntl.h:89
bool operator()(const GlassCursor *a, const GlassCursor *b) const
Return true if and only if a's key is strictly greater than b's key.
MergeCursor(const GlassTable *in)
Definition: header.h:215
typedefs for Xapian
Statistics about values.
static bool tags