xapian-core  1.4.27
glass_compact.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19  * USA
20  */
21 
22 #include <config.h>
23 
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
28 
29 #include "autoptr.h"
30 #include <algorithm>
31 #include <queue>
32 
33 #include <cerrno>
34 
35 #include "backends/flint_lock.h"
36 #include "glass_database.h"
37 #include "glass_defs.h"
38 #include "glass_table.h"
39 #include "glass_cursor.h"
40 #include "glass_version.h"
41 #include "filetests.h"
42 #include "internaltypes.h"
43 #include "pack.h"
44 #include "backends/valuestats.h"
45 
46 #include "../byte_length_strings.h"
47 #include "../prefix_compressed_strings.h"
48 
49 using namespace std;
50 
51 // Put all the helpers in a namespace to avoid symbols colliding with those of
52 // the same name in other flint-derived backends.
53 namespace GlassCompact {
54 
55 static inline bool
56 is_user_metadata_key(const string & key)
57 {
58  return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
59 }
60 
61 static inline bool
62 is_valuestats_key(const string & key)
63 {
64  return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
65 }
66 
67 static inline bool
68 is_valuechunk_key(const string & key)
69 {
70  return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
71 }
72 
73 static inline bool
74 is_doclenchunk_key(const string & key)
75 {
76  return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
77 }
78 
79 class PostlistCursor : private GlassCursor {
81 
82  public:
83  string key, tag;
86 
88  : GlassCursor(in), offset(offset_), firstdid(0)
89  {
90  find_entry(string());
91  next();
92  }
93 
94  bool next() {
95  if (!GlassCursor::next()) return false;
96  // We put all chunks into the non-initial chunk form here, then fix up
97  // the first chunk for each term in the merged database as we merge.
98  read_tag();
99  key = current_key;
100  tag = current_tag;
101  tf = cf = 0;
102  if (is_user_metadata_key(key)) return true;
103  if (is_valuestats_key(key)) return true;
104  if (is_valuechunk_key(key)) {
105  const char * p = key.data();
106  const char * end = p + key.length();
107  p += 2;
108  Xapian::valueno slot;
109  if (!unpack_uint(&p, end, &slot))
110  throw Xapian::DatabaseCorruptError("bad value key");
111  Xapian::docid did;
112  if (!unpack_uint_preserving_sort(&p, end, &did))
113  throw Xapian::DatabaseCorruptError("bad value key");
114  did += offset;
115 
116  key.assign("\0\xd8", 2);
117  pack_uint(key, slot);
118  pack_uint_preserving_sort(key, did);
119  return true;
120  }
121 
122  // Adjust key if this is *NOT* an initial chunk.
123  // key is: pack_string_preserving_sort(key, tname)
124  // plus optionally: pack_uint_preserving_sort(key, did)
125  const char * d = key.data();
126  const char * e = d + key.size();
127  if (is_doclenchunk_key(key)) {
128  d += 2;
129  } else {
130  string tname;
131  if (!unpack_string_preserving_sort(&d, e, tname))
132  throw Xapian::DatabaseCorruptError("Bad postlist key");
133  }
134 
135  if (d == e) {
136  // This is an initial chunk for a term, so adjust tag header.
137  d = tag.data();
138  e = d + tag.size();
139  if (!unpack_uint(&d, e, &tf) ||
140  !unpack_uint(&d, e, &cf) ||
141  !unpack_uint(&d, e, &firstdid)) {
142  throw Xapian::DatabaseCorruptError("Bad postlist key");
143  }
144  ++firstdid;
145  tag.erase(0, d - tag.data());
146  } else {
147  // Not an initial chunk, so adjust key.
148  size_t tmp = d - key.data();
149  if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
150  throw Xapian::DatabaseCorruptError("Bad postlist key");
151  if (is_doclenchunk_key(key)) {
152  key.erase(tmp);
153  } else {
154  key.erase(tmp - 1);
155  }
156  }
157  firstdid += offset;
158  return true;
159  }
160 };
161 
163  public:
166  bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
167  if (a->key > b->key) return true;
168  if (a->key != b->key) return false;
169  return (a->firstdid > b->firstdid);
170  }
171 };
172 
173 static string
175  const string & lbound, const string & ubound)
176 {
177  string value;
178  pack_uint(value, freq);
179  pack_string(value, lbound);
180  // We don't store or count empty values, so neither of the bounds
181  // can be empty. So we can safely store an empty upper bound when
182  // the bounds are equal.
183  if (lbound != ubound) value += ubound;
184  return value;
185 }
186 
187 static void
189  GlassTable * out, vector<Xapian::docid>::const_iterator offset,
190  vector<GlassTable*>::const_iterator b,
191  vector<GlassTable*>::const_iterator e)
192 {
193  priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
194  for ( ; b != e; ++b, ++offset) {
195  GlassTable *in = *b;
196  if (in->empty()) {
197  // Skip empty tables.
198  continue;
199  }
200 
201  pq.push(new PostlistCursor(in, *offset));
202  }
203 
204  string last_key;
205  {
206  // Merge user metadata.
207  vector<string> tags;
208  while (!pq.empty()) {
209  PostlistCursor * cur = pq.top();
210  const string& key = cur->key;
211  if (!is_user_metadata_key(key)) break;
212 
213  if (key != last_key) {
214  if (!tags.empty()) {
215  if (tags.size() > 1 && compactor) {
216  Assert(!last_key.empty());
217  // FIXME: It would be better to merge all duplicates
218  // for a key in one call, but currently we don't in
219  // multipass mode.
220  const string & resolved_tag =
221  compactor->resolve_duplicate_metadata(last_key,
222  tags.size(),
223  &tags[0]);
224  if (!resolved_tag.empty())
225  out->add(last_key, resolved_tag);
226  } else {
227  Assert(!last_key.empty());
228  out->add(last_key, tags[0]);
229  }
230  tags.resize(0);
231  }
232  last_key = key;
233  }
234  tags.push_back(cur->tag);
235 
236  pq.pop();
237  if (cur->next()) {
238  pq.push(cur);
239  } else {
240  delete cur;
241  }
242  }
243  if (!tags.empty()) {
244  if (tags.size() > 1 && compactor) {
245  Assert(!last_key.empty());
246  const string & resolved_tag =
247  compactor->resolve_duplicate_metadata(last_key,
248  tags.size(),
249  &tags[0]);
250  if (!resolved_tag.empty())
251  out->add(last_key, resolved_tag);
252  } else {
253  Assert(!last_key.empty());
254  out->add(last_key, tags[0]);
255  }
256  }
257  }
258 
259  {
260  // Merge valuestats.
261  Xapian::doccount freq = 0;
262  string lbound, ubound;
263 
264  while (!pq.empty()) {
265  PostlistCursor * cur = pq.top();
266  const string& key = cur->key;
267  if (!is_valuestats_key(key)) break;
268  if (key != last_key) {
269  // For the first valuestats key, last_key will be the previous
270  // key we wrote, which we don't want to overwrite. This is the
271  // only time that freq will be 0, so check that.
272  if (freq) {
273  out->add(last_key, encode_valuestats(freq, lbound, ubound));
274  freq = 0;
275  }
276  last_key = key;
277  }
278 
279  const string & tag = cur->tag;
280 
281  const char * pos = tag.data();
282  const char * end = pos + tag.size();
283 
285  string l, u;
286  if (!unpack_uint(&pos, end, &f)) {
287  if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
288  throw Xapian::RangeError("Frequency statistic in value table is too large");
289  }
290  if (!unpack_string(&pos, end, l)) {
291  if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
292  throw Xapian::RangeError("Lower bound in value table is too large");
293  }
294  size_t len = end - pos;
295  if (len == 0) {
296  u = l;
297  } else {
298  u.assign(pos, len);
299  }
300  if (freq == 0) {
301  freq = f;
302  lbound = l;
303  ubound = u;
304  } else {
305  freq += f;
306  if (l < lbound) lbound = l;
307  if (u > ubound) ubound = u;
308  }
309 
310  pq.pop();
311  if (cur->next()) {
312  pq.push(cur);
313  } else {
314  delete cur;
315  }
316  }
317 
318  if (freq) {
319  out->add(last_key, encode_valuestats(freq, lbound, ubound));
320  }
321  }
322 
323  // Merge valuestream chunks.
324  while (!pq.empty()) {
325  PostlistCursor * cur = pq.top();
326  const string & key = cur->key;
327  if (!is_valuechunk_key(key)) break;
329  out->add(key, cur->tag);
330  pq.pop();
331  if (cur->next()) {
332  pq.push(cur);
333  } else {
334  delete cur;
335  }
336  }
337 
338  Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
339  vector<pair<Xapian::docid, string>> tags;
340  while (true) {
341  PostlistCursor * cur = NULL;
342  if (!pq.empty()) {
343  cur = pq.top();
344  pq.pop();
345  }
346  Assert(cur == NULL || !is_user_metadata_key(cur->key));
347  if (cur == NULL || cur->key != last_key) {
348  if (!tags.empty()) {
349  string first_tag;
350  pack_uint(first_tag, tf);
351  pack_uint(first_tag, cf);
352  pack_uint(first_tag, tags[0].first - 1);
353  string tag = tags[0].second;
354  tag[0] = (tags.size() == 1) ? '1' : '0';
355  first_tag += tag;
356  out->add(last_key, first_tag);
357 
358  string term;
359  if (!is_doclenchunk_key(last_key)) {
360  const char * p = last_key.data();
361  const char * end = p + last_key.size();
362  if (!unpack_string_preserving_sort(&p, end, term) || p != end)
363  throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
364  }
365 
366  auto i = tags.begin();
367  while (++i != tags.end()) {
368  tag = i->second;
369  tag[0] = (i + 1 == tags.end()) ? '1' : '0';
370  out->add(pack_glass_postlist_key(term, i->first), tag);
371  }
372  }
373  tags.clear();
374  if (cur == NULL) break;
375  tf = cf = 0;
376  last_key = cur->key;
377  }
378  tf += cur->tf;
379  cf += cur->cf;
380  tags.push_back(make_pair(cur->firstdid, cur->tag));
381  if (cur->next()) {
382  pq.push(cur);
383  } else {
384  delete cur;
385  }
386  }
387 }
388 
389 struct MergeCursor : public GlassCursor {
390  explicit MergeCursor(GlassTable *in) : GlassCursor(in) {
391  find_entry(string());
392  next();
393  }
394 };
395 
396 struct CursorGt {
398  bool operator()(const GlassCursor *a, const GlassCursor *b) const {
399  if (b->after_end()) return false;
400  if (a->after_end()) return true;
401  return (a->current_key > b->current_key);
402  }
403 };
404 
405 static void
407  vector<GlassTable*>::const_iterator b,
408  vector<GlassTable*>::const_iterator e)
409 {
410  priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
411  for ( ; b != e; ++b) {
412  GlassTable *in = *b;
413  if (!in->empty()) {
414  pq.push(new MergeCursor(in));
415  }
416  }
417 
418  while (!pq.empty()) {
419  MergeCursor * cur = pq.top();
420  pq.pop();
421 
422  string key = cur->current_key;
423  if (pq.empty() || pq.top()->current_key > key) {
424  // No need to merge the tags, just copy the (possibly compressed)
425  // tag value.
426  bool compressed = cur->read_tag(true);
427  out->add(key, cur->current_tag, compressed);
428  if (cur->next()) {
429  pq.push(cur);
430  } else {
431  delete cur;
432  }
433  continue;
434  }
435 
436  // Merge tag values with the same key:
437  string tag;
438  if (key[0] != 'W') {
439  // We just want the union of words, so copy over the first instance
440  // and skip any identical ones.
441  priority_queue<PrefixCompressedStringItor *,
442  vector<PrefixCompressedStringItor *>,
444  // Stick all the MergeCursor pointers in a vector because their
445  // current_tag members must remain valid while we're merging their
446  // tags, but we need to call next() on them all afterwards.
447  vector<MergeCursor *> vec;
448  vec.reserve(pq.size());
449 
450  while (true) {
451  cur->read_tag();
452  pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
453  vec.push_back(cur);
454  if (pq.empty() || pq.top()->current_key != key) break;
455  cur = pq.top();
456  pq.pop();
457  }
458 
460  string lastword;
461  while (!pqtag.empty()) {
462  PrefixCompressedStringItor * it = pqtag.top();
463  pqtag.pop();
464  string word = **it;
465  if (word != lastword) {
466  lastword = word;
467  wr.append(lastword);
468  }
469  ++*it;
470  if (!it->at_end()) {
471  pqtag.push(it);
472  } else {
473  delete it;
474  }
475  }
476 
477  vector<MergeCursor *>::const_iterator i;
478  for (i = vec.begin(); i != vec.end(); ++i) {
479  cur = *i;
480  if (cur->next()) {
481  pq.push(cur);
482  } else {
483  delete cur;
484  }
485  }
486  } else {
487  // We want to sum the frequencies from tags for the same key.
488  Xapian::termcount tot_freq = 0;
489  while (true) {
490  cur->read_tag();
491  Xapian::termcount freq;
492  const char * p = cur->current_tag.data();
493  const char * end = p + cur->current_tag.size();
494  if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
495  throw Xapian::DatabaseCorruptError("Bad spelling word freq");
496  }
497  tot_freq += freq;
498  if (cur->next()) {
499  pq.push(cur);
500  } else {
501  delete cur;
502  }
503  if (pq.empty() || pq.top()->current_key != key) break;
504  cur = pq.top();
505  pq.pop();
506  }
507  tag.resize(0);
508  pack_uint_last(tag, tot_freq);
509  }
510  out->add(key, tag);
511  }
512 }
513 
514 static void
516  vector<GlassTable*>::const_iterator b,
517  vector<GlassTable*>::const_iterator e)
518 {
519  priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
520  for ( ; b != e; ++b) {
521  GlassTable *in = *b;
522  if (!in->empty()) {
523  pq.push(new MergeCursor(in));
524  }
525  }
526 
527  while (!pq.empty()) {
528  MergeCursor * cur = pq.top();
529  pq.pop();
530 
531  string key = cur->current_key;
532  if (pq.empty() || pq.top()->current_key > key) {
533  // No need to merge the tags, just copy the (possibly compressed)
534  // tag value.
535  bool compressed = cur->read_tag(true);
536  out->add(key, cur->current_tag, compressed);
537  if (cur->next()) {
538  pq.push(cur);
539  } else {
540  delete cur;
541  }
542  continue;
543  }
544 
545  // Merge tag values with the same key:
546  string tag;
547 
548  // We just want the union of words, so copy over the first instance
549  // and skip any identical ones.
550  priority_queue<ByteLengthPrefixedStringItor *,
551  vector<ByteLengthPrefixedStringItor *>,
553  vector<MergeCursor *> vec;
554 
555  while (true) {
556  cur->read_tag();
557  pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
558  vec.push_back(cur);
559  if (pq.empty() || pq.top()->current_key != key) break;
560  cur = pq.top();
561  pq.pop();
562  }
563 
564  string lastword;
565  while (!pqtag.empty()) {
566  ByteLengthPrefixedStringItor * it = pqtag.top();
567  pqtag.pop();
568  if (**it != lastword) {
569  lastword = **it;
570  tag += uint8_t(lastword.size() ^ MAGIC_XOR_VALUE);
571  tag += lastword;
572  }
573  ++*it;
574  if (!it->at_end()) {
575  pqtag.push(it);
576  } else {
577  delete it;
578  }
579  }
580 
581  vector<MergeCursor *>::const_iterator i;
582  for (i = vec.begin(); i != vec.end(); ++i) {
583  cur = *i;
584  if (cur->next()) {
585  pq.push(cur);
586  } else {
587  delete cur;
588  }
589  }
590 
591  out->add(key, tag);
592  }
593 }
594 
595 static void
597  GlassTable * out, const char * tmpdir,
598  vector<GlassTable *> tmp,
599  vector<Xapian::docid> off)
600 {
601  unsigned int c = 0;
602  while (tmp.size() > 3) {
603  vector<GlassTable *> tmpout;
604  tmpout.reserve(tmp.size() / 2);
605  vector<Xapian::docid> newoff;
606  newoff.resize(tmp.size() / 2);
607  for (unsigned int i = 0, j; i < tmp.size(); i = j) {
608  j = i + 2;
609  if (j == tmp.size() - 1) ++j;
610 
611  string dest = tmpdir;
612  dest += "/tmp";
613  dest += str(c);
614  dest += '_';
615  dest += str(i / 2);
616  dest += '.';
617 
618  GlassTable * tmptab = new GlassTable("postlist", dest, false);
619 
620  // Use maximum blocksize for temporary tables. And don't compress
621  // entries in temporary tables, even if the final table would do
622  // so. Any already compressed entries will get copied in
623  // compressed form.
624  RootInfo root_info;
625  root_info.init(65536, 0);
626  const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
627  tmptab->create_and_open(flags, root_info);
628 
629  merge_postlists(compactor, tmptab, off.begin() + i,
630  tmp.begin() + i, tmp.begin() + j);
631  if (c > 0) {
632  for (unsigned int k = i; k < j; ++k) {
633  unlink(tmp[k]->get_path().c_str());
634  delete tmp[k];
635  tmp[k] = NULL;
636  }
637  }
638  tmpout.push_back(tmptab);
639  tmptab->flush_db();
640  tmptab->commit(1, &root_info);
641  AssertRel(root_info.get_blocksize(),==,65536);
642  }
643  swap(tmp, tmpout);
644  swap(off, newoff);
645  ++c;
646  }
647  merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
648  if (c > 0) {
649  for (size_t k = 0; k < tmp.size(); ++k) {
650  unlink(tmp[k]->get_path().c_str());
651  delete tmp[k];
652  tmp[k] = NULL;
653  }
654  }
655 }
656 
657 class PositionCursor : private GlassCursor {
659 
660  public:
661  string key;
663 
665  : GlassCursor(in), offset(offset_), firstdid(0) {
666  find_entry(string());
667  next();
668  }
669 
670  bool next() {
671  if (!GlassCursor::next()) return false;
672  read_tag();
673  const char * d = current_key.data();
674  const char * e = d + current_key.size();
675  string term;
676  Xapian::docid did;
677  if (!unpack_string_preserving_sort(&d, e, term) ||
678  !unpack_uint_preserving_sort(&d, e, &did) ||
679  d != e) {
680  throw Xapian::DatabaseCorruptError("Bad position key");
681  }
682 
683  key.resize(0);
684  pack_string_preserving_sort(key, term);
685  pack_uint_preserving_sort(key, did + offset);
686  return true;
687  }
688 
689  const string & get_tag() const {
690  return current_tag;
691  }
692 };
693 
695  public:
698  bool operator()(const PositionCursor *a, const PositionCursor *b) const {
699  return a->key > b->key;
700  }
701 };
702 
703 static void
704 merge_positions(GlassTable *out, const vector<GlassTable*> & inputs,
705  const vector<Xapian::docid> & offset)
706 {
707  priority_queue<PositionCursor *, vector<PositionCursor *>, PositionCursorGt> pq;
708  for (size_t i = 0; i < inputs.size(); ++i) {
709  GlassTable *in = inputs[i];
710  if (in->empty()) {
711  // Skip empty tables.
712  continue;
713  }
714 
715  pq.push(new PositionCursor(in, offset[i]));
716  }
717 
718  while (!pq.empty()) {
719  PositionCursor * cur = pq.top();
720  pq.pop();
721  out->add(cur->key, cur->get_tag());
722  if (cur->next()) {
723  pq.push(cur);
724  } else {
725  delete cur;
726  }
727  }
728 }
729 
730 static void
731 merge_docid_keyed(GlassTable *out, const vector<GlassTable*> & inputs,
732  const vector<Xapian::docid> & offset)
733 {
734  for (size_t i = 0; i < inputs.size(); ++i) {
735  Xapian::docid off = offset[i];
736 
737  GlassTable * in = inputs[i];
738  if (in->empty()) continue;
739 
740  GlassCursor cur(in);
741  cur.find_entry(string());
742 
743  string key;
744  while (cur.next()) {
745  // Adjust the key if this isn't the first database.
746  if (off) {
747  Xapian::docid did;
748  const char * d = cur.current_key.data();
749  const char * e = d + cur.current_key.size();
750  if (!unpack_uint_preserving_sort(&d, e, &did)) {
751  string msg = "Bad key in ";
752  msg += inputs[i]->get_path();
753  throw Xapian::DatabaseCorruptError(msg);
754  }
755  did += off;
756  key.resize(0);
757  pack_uint_preserving_sort(key, did);
758  if (d != e) {
759  // Copy over anything extra in the key (e.g. the zero byte
760  // at the end of "used value slots" in the termlist table).
761  key.append(d, e - d);
762  }
763  } else {
764  key = cur.current_key;
765  }
766  bool compressed = cur.read_tag(true);
767  out->add(key, cur.current_tag, compressed);
768  }
769  }
770 }
771 
772 }
773 
774 using namespace GlassCompact;
775 
776 void
778  const char * destdir,
779  int fd,
780  const vector<Xapian::Database::Internal*> & sources,
781  const vector<Xapian::docid> & offset,
782  size_t block_size,
784  unsigned flags,
785  Xapian::docid last_docid)
786 {
787  struct table_list {
788  // The "base name" of the table.
789  char name[9];
790  // The type.
791  Glass::table_type type;
792  // Create tables after position lazily.
793  bool lazy;
794  };
795 
796  static const table_list tables[] = {
797  // name type lazy
798  { "postlist", Glass::POSTLIST, false },
799  { "docdata", Glass::DOCDATA, true },
800  { "termlist", Glass::TERMLIST, false },
801  { "position", Glass::POSITION, true },
802  { "spelling", Glass::SPELLING, true },
803  { "synonym", Glass::SYNONYM, true }
804  };
805  const table_list * tables_end = tables +
806  (sizeof(tables) / sizeof(tables[0]));
807 
808  const int FLAGS = Xapian::DB_DANGEROUS;
809 
810  bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
811  bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
812  if (single_file) {
813  // FIXME: Support this combination - we need to put temporary files
814  // somewhere.
815  multipass = false;
816  }
817 
818  for (size_t i = 0; i != sources.size(); ++i) {
819  auto db = static_cast<const GlassDatabase*>(sources[i]);
820  if (db->has_uncommitted_changes()) {
821  const char * m =
822  "Can't compact from a WritableDatabase with uncommitted "
823  "changes - either call commit() first, or create a new "
824  "Database object from the filename on disk";
826  }
827  }
828 
829  if (block_size < 2048 || block_size > 65536 ||
830  (block_size & (block_size - 1)) != 0) {
831  block_size = GLASS_DEFAULT_BLOCKSIZE;
832  }
833 
834  FlintLock lock(destdir ? destdir : "");
835  if (!single_file) {
836  string explanation;
837  FlintLock::reason why = lock.lock(true, false, explanation);
838  if (why != FlintLock::SUCCESS) {
839  lock.throw_databaselockerror(why, destdir, explanation);
840  }
841  }
842 
843  AutoPtr<GlassVersion> version_file_out;
844  if (single_file) {
845  if (destdir) {
846  fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
847  if (fd < 0) {
848  throw Xapian::DatabaseCreateError("open() failed", errno);
849  }
850  }
851  version_file_out.reset(new GlassVersion(fd));
852  } else {
853  fd = -1;
854  version_file_out.reset(new GlassVersion(destdir));
855  }
856 
857  version_file_out->create(block_size);
858  for (size_t i = 0; i != sources.size(); ++i) {
859  GlassDatabase * db = static_cast<GlassDatabase*>(sources[i]);
860  version_file_out->merge_stats(db->version_file);
861  }
862 
863  string fl_serialised;
864  if (single_file) {
865  GlassFreeList fl;
866  fl.set_first_unused_block(1); // FIXME: Assumption?
867  fl.pack(fl_serialised);
868  }
869 
870  vector<GlassTable *> tabs;
871  tabs.reserve(tables_end - tables);
872  off_t prev_size = block_size;
873  for (const table_list * t = tables; t < tables_end; ++t) {
874  // The postlist table requires an N-way merge, adjusting the
875  // headers of various blocks. The spelling and synonym tables also
876  // need special handling. The other tables have keys sorted in
877  // docid order, so we can merge them by simply copying all the keys
878  // from each source table in turn.
879  if (compactor)
880  compactor->set_status(t->name, string());
881 
882  string dest;
883  if (!single_file) {
884  dest = destdir;
885  dest += '/';
886  dest += t->name;
887  dest += '.';
888  }
889 
890  bool output_will_exist = !t->lazy;
891 
892  // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
893  // on certain systems).
894  bool bad_stat = false;
895 
896  // We can't currently report input sizes if there's a single file DB
897  // amongst the inputs.
898  bool single_file_in = false;
899 
900  off_t in_size = 0;
901 
902  vector<GlassTable*> inputs;
903  inputs.reserve(sources.size());
904  size_t inputs_present = 0;
905  for (auto src : sources) {
906  GlassDatabase * db = static_cast<GlassDatabase*>(src);
907  GlassTable * table;
908  switch (t->type) {
909  case Glass::POSTLIST:
910  table = &(db->postlist_table);
911  break;
912  case Glass::DOCDATA:
913  table = &(db->docdata_table);
914  break;
915  case Glass::TERMLIST:
916  table = &(db->termlist_table);
917  break;
918  case Glass::POSITION:
919  table = &(db->position_table);
920  break;
921  case Glass::SPELLING:
922  table = &(db->spelling_table);
923  break;
924  case Glass::SYNONYM:
925  table = &(db->synonym_table);
926  break;
927  default:
928  Assert(false);
929  return;
930  }
931 
932  if (db->single_file()) {
933  if (t->lazy && table->empty()) {
934  // Essentially doesn't exist.
935  } else {
936  // FIXME: Find actual size somehow?
937  // in_size += table->size() / 1024;
938  single_file_in = true;
939  output_will_exist = true;
940  ++inputs_present;
941  }
942  } else {
943  off_t db_size = file_size(table->get_path());
944  if (errno == 0) {
945  in_size += db_size / 1024;
946  output_will_exist = true;
947  ++inputs_present;
948  } else if (errno != ENOENT) {
949  // We get ENOENT for an optional table.
950  bad_stat = true;
951  output_will_exist = true;
952  ++inputs_present;
953  }
954  }
955  inputs.push_back(table);
956  }
957 
958  // If any inputs lack a termlist table, suppress it in the output.
959  if (t->type == Glass::TERMLIST && inputs_present != sources.size()) {
960  if (inputs_present != 0) {
961  if (compactor) {
962  string m = str(inputs_present);
963  m += " of ";
964  m += str(sources.size());
965  m += " inputs present, so suppressing output";
966  compactor->set_status(t->name, m);
967  }
968  continue;
969  }
970  output_will_exist = false;
971  }
972 
973  if (!output_will_exist) {
974  if (compactor)
975  compactor->set_status(t->name, "doesn't exist");
976  continue;
977  }
978 
979  GlassTable * out;
980  if (single_file) {
981  out = new GlassTable(t->name, fd, version_file_out->get_offset(),
982  false, false);
983  } else {
984  out = new GlassTable(t->name, dest, false, t->lazy);
985  }
986  tabs.push_back(out);
987  RootInfo * root_info = version_file_out->root_to_set(t->type);
988  if (single_file) {
989  root_info->set_free_list(fl_serialised);
990  out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
991  } else {
992  out->create_and_open(FLAGS, *root_info);
993  }
994 
995  out->set_full_compaction(compaction != compactor->STANDARD);
996  if (compaction == compactor->FULLER) out->set_max_item_size(1);
997 
998  switch (t->type) {
999  case Glass::POSTLIST: {
1000  if (multipass && inputs.size() > 3) {
1001  multimerge_postlists(compactor, out, destdir,
1002  inputs, offset);
1003  } else {
1004  merge_postlists(compactor, out, offset.begin(),
1005  inputs.begin(), inputs.end());
1006  }
1007  break;
1008  }
1009  case Glass::SPELLING:
1010  merge_spellings(out, inputs.begin(), inputs.end());
1011  break;
1012  case Glass::SYNONYM:
1013  merge_synonyms(out, inputs.begin(), inputs.end());
1014  break;
1015  case Glass::POSITION:
1016  merge_positions(out, inputs, offset);
1017  break;
1018  default:
1019  // DocData, Termlist
1020  merge_docid_keyed(out, inputs, offset);
1021  break;
1022  }
1023 
1024  if (out->is_modified()) {
1025  // Commit as revision 1.
1026  out->flush_db();
1027  out->commit(1, root_info);
1028  out->sync();
1029  }
1030  if (single_file) fl_serialised = root_info->get_free_list();
1031 
1032  off_t out_size = 0;
1033  if (!bad_stat && !single_file_in) {
1034  off_t db_size;
1035  if (single_file) {
1036  db_size = file_size(fd);
1037  } else {
1038  db_size = file_size(dest + GLASS_TABLE_EXTENSION);
1039  }
1040  if (errno == 0) {
1041  if (single_file) {
1042  off_t old_prev_size = max(prev_size, off_t(block_size));
1043  prev_size = db_size;
1044  db_size = max(db_size, off_t(block_size));
1045  db_size -= old_prev_size;
1046  }
1047  out_size = db_size / 1024;
1048  } else {
1049  bad_stat = (errno != ENOENT);
1050  }
1051  }
1052  if (bad_stat) {
1053  if (compactor)
1054  compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
1055  } else if (single_file_in) {
1056  if (compactor)
1057  compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
1058  } else {
1059  string status;
1060  if (out_size == in_size) {
1061  status = "Size unchanged (";
1062  } else {
1063  off_t delta;
1064  if (out_size < in_size) {
1065  delta = in_size - out_size;
1066  status = "Reduced by ";
1067  } else {
1068  delta = out_size - in_size;
1069  status = "INCREASED by ";
1070  }
1071  if (in_size) {
1072  status += str(100 * delta / in_size);
1073  status += "% ";
1074  }
1075  status += str(delta);
1076  status += "K (";
1077  status += str(in_size);
1078  status += "K -> ";
1079  }
1080  status += str(out_size);
1081  status += "K)";
1082  if (compactor)
1083  compactor->set_status(t->name, status);
1084  }
1085  }
1086 
1087  // If compacting to a single file output and all the tables are empty, pad
1088  // the output so that it isn't mistaken for a stub database when we try to
1089  // open it. For this it needs to be a multiple of 2KB in size.
1090  if (single_file && prev_size < off_t(block_size)) {
1091 #ifdef HAVE_FTRUNCATE
1092  if (ftruncate(fd, block_size) < 0) {
1093  throw Xapian::DatabaseError("Failed to set size of output database", errno);
1094  }
1095 #else
1096  const off_t off = block_size - 1;
1097  if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
1098  throw Xapian::DatabaseError("Failed to set size of output database", errno);
1099  }
1100 #endif
1101  }
1102 
1103  if (single_file) {
1104  if (lseek(fd, version_file_out->get_offset(), SEEK_SET) < 0) {
1105  throw Xapian::DatabaseError("lseek() failed", errno);
1106  }
1107  }
1108  version_file_out->set_last_docid(last_docid);
1109  string tmpfile = version_file_out->write(1, FLAGS);
1110  for (unsigned j = 0; j != tabs.size(); ++j) {
1111  tabs[j]->sync();
1112  }
1113  // Commit with revision 1.
1114  version_file_out->sync(tmpfile, 1, FLAGS);
1115  for (unsigned j = 0; j != tabs.size(); ++j) {
1116  delete tabs[j];
1117  }
1118 
1119  if (!single_file) lock.release();
1120 }
GlassVersion version_file
The file describing the Glass database.
void throw_databaselockerror(FlintLock::reason why, const std::string &db_dir, const std::string &explanation) const
Throw Xapian::DatabaseLockError.
Definition: flint_lock.cc:495
#define Assert(COND)
Definition: omassert.h:122
void release()
Release the lock.
Definition: flint_lock.cc:459
GlassVersion class.
typedefs for Xapian
MergeCursor(GlassTable *in)
Statistics about values.
GlassPositionListTable position_table
Table storing position lists.
void create_and_open(int flags_, const RootInfo &root_info)
Create a new empty btree structure on disk and open it at the initial revision.
table_type
Definition: glass_defs.h:53
static void merge_positions(GlassTable *out, const vector< GlassTable *> &inputs, const vector< Xapian::docid > &offset)
Allow oversize items to save more space (not recommended if you ever plan to update the compacted dat...
Definition: compactor.h:55
#define AssertRel(A, REL, B)
Definition: omassert.h:123
InvalidOperationError indicates the API was used in an invalid way.
Definition: error.h:283
bool empty() const
Return true if there are no entries in the table.
Definition: glass_table.h:681
Class managing a Btree table in a Glass database.
Definition: glass_table.h:425
GlassSynonymTable synonym_table
Table storing synonym data.
static void merge_synonyms(GlassTable *out, vector< GlassTable *>::const_iterator b, vector< GlassTable *>::const_iterator e)
Constants in the Xapian namespace.
The GlassVersion class manages the revision files.
Definition: glass_version.h:94
static void compact(Xapian::Compactor *compactor, const char *destdir, int fd, const std::vector< Xapian::Database::Internal *> &sources, const std::vector< Xapian::docid > &offset, size_t block_size, Xapian::Compactor::compaction_level compaction, unsigned flags, Xapian::docid last_docid)
#define O_BINARY
Definition: safefcntl.h:81
WritableDatabase open()
Construct a WritableDatabase object for a new, empty InMemory database.
Definition: dbfactory.h:104
Don&#39;t split items unnecessarily.
Definition: compactor.h:50
Compact a database, or merge and compact several.
bool next()
Advance to the next key.
static bool tags
STL namespace.
Definitions, types, etc for use inside glass.
Flint-compatible database locking.
void add(const std::string &key, const std::string &tag, bool already_compressed=false)
Add a key/tag pair to the table, replacing any existing pair with the same key.
static void merge_postlists(Xapian::Compactor *compactor, GlassTable *out, vector< Xapian::docid >::const_iterator offset, vector< GlassTable *>::const_iterator b, vector< GlassTable *>::const_iterator e)
bool after_end() const
Determine whether cursor is off the end of table.
Definition: glass_cursor.h:329
Utility functions for testing files.
#define O_CLOEXEC
Definition: safefcntl.h:90
string get_path() const
Definition: glass_table.h:728
GlassDocDataTable docdata_table
Table storing document data.
#define GLASS_TABLE_EXTENSION
Glass table extension.
Definition: glass_defs.h:27
bool read_tag(bool keep_compressed=false)
Read the tag from the table and store it in current_tag.
virtual void set_status(const std::string &table, const std::string &status)
Update progress.
Definition: compactor.cc:135
Hierarchy of classes which Xapian can throw as exceptions.
unsigned XAPIAN_TERMCOUNT_BASE_TYPE termcount
A counts of terms.
Definition: types.h:72
void flush_db()
Flush any outstanding changes to the DB file of the table.
RangeError indicates an attempt to access outside the bounds of a container.
Definition: error.h:971
void pack_uint_last(std::string &s, U value)
Append an encoded unsigned integer to a string as the last item.
Definition: pack.h:93
static void multimerge_postlists(Xapian::Compactor *compactor, GlassTable *out, const char *tmpdir, vector< GlassTable *> tmp, vector< Xapian::docid > off)
bool sync()
Definition: glass_table.h:535
string current_key
Current key pointed to by cursor.
Definition: glass_cursor.h:239
GlassPostListTable postlist_table
Table storing posting lists.
PostlistCursor(GlassTable *in, Xapian::docid offset_)
bool operator()(const PositionCursor *a, const PositionCursor *b) const
Return true if and only if a&#39;s key is strictly greater than b&#39;s key.
virtual std::string resolve_duplicate_metadata(const std::string &key, size_t num_tags, const std::string tags[])
Resolve multiple user metadata entries with the same key.
Definition: compactor.cc:142
bool is_modified() const
Determine whether the object contains uncommitted modifications.
Definition: glass_table.h:697
DatabaseCreateError indicates a failure to create a database.
Definition: error.h:451
void commit(glass_revision_number_t revision, RootInfo *root_info)
Commit any outstanding changes to the table.
string current_tag
Current tag pointed to by cursor.
Definition: glass_cursor.h:244
Compact a database, or merge and compact several.
Definition: compactor.h:42
static void merge_spellings(GlassTable *out, vector< GlassTable *>::const_iterator b, vector< GlassTable *>::const_iterator e)
static bool is_valuestats_key(const string &key)
string str(int value)
Convert int to std::string.
Definition: str.cc:90
GlassTermListTable termlist_table
Table storing term lists.
#define GLASS_DEFAULT_BLOCKSIZE
Default B-tree block size.
Definition: glass_defs.h:30
C++ class definition for glass database.
static void merge_docid_keyed(GlassTable *out, const vector< GlassTable *> &inputs, const vector< Xapian::docid > &offset)
static bool is_user_metadata_key(const string &key)
bool unpack_string_preserving_sort(const char **p, const char *end, std::string &result)
Decode a "sort preserved" std::string from a string.
Definition: pack.h:562
bool unpack_uint_preserving_sort(const char **p, const char *end, U *result)
Decode a "sort preserved" unsigned integer from a string.
Definition: pack.h:318
Btree implementation.
static string encode_valuestats(Xapian::doccount freq, const string &lbound, const string &ubound)
const int DB_DANGEROUS
Update the database in-place.
Definition: constants.h:103
void set_max_item_size(size_t block_capacity)
Set the maximum item size given the block capacity.
Definition: glass_table.h:704
A backend designed for efficient indexing and retrieval, using compressed posting lists and a btree s...
A cursor pointing to a position in a Btree table, for reading several entries in order, or finding approximate matches.
Definition: glass_cursor.h:147
const int DB_NO_SYNC
Don&#39;t attempt to ensure changes have hit disk.
Definition: constants.h:66
DatabaseCorruptError indicates database corruption was detected.
Definition: error.h:409
void open(int flags_, const RootInfo &root_info, glass_revision_number_t rev)
Open the btree.
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:382
void init(unsigned blocksize_, uint4 compress_min_)
bool single_file() const
unsigned get_blocksize() const
Definition: glass_version.h:64
unsigned XAPIAN_DOCID_BASE_TYPE doccount
A count of documents.
Definition: types.h:38
#define MAGIC_XOR_VALUE
void pack_string(std::string &s, const std::string &value)
Append an encoded std::string to a string.
Definition: pack.h:477
Interface to Btree cursors.
std::string pack_glass_postlist_key(const std::string &term)
Definition: pack.h:613
compaction_level
Compaction level.
Definition: compactor.h:48
Pack types into strings and unpack them again.
unsigned valueno
The number for a value slot in a document.
Definition: types.h:108
bool unpack_uint_last(const char **p, const char *end, U *result)
Decode an unsigned integer as the last item in a string.
Definition: pack.h:111
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:413
PositionCursor(GlassTable *in, Xapian::docid offset_)
const int DBCOMPACT_MULTIPASS
If merging more than 3 databases, merge the postlists in multiple passes.
Definition: constants.h:262
reason lock(bool exclusive, bool wait, std::string &explanation)
Attempt to obtain the lock.
Definition: flint_lock.cc:125
off_t file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:71
Definition: header.h:151
void pack(std::string &buf)
void append(const std::string &word)
bool unpack_string(const char **p, const char *end, std::string &result)
Decode a std::string from a string.
Definition: pack.h:504
bool find_entry(const string &key)
Position the cursor on the highest entry with key <= key.
unsigned XAPIAN_DOCID_BASE_TYPE docid
A unique identifier for a document.
Definition: types.h:52
DatabaseError indicates some sort of database related error.
Definition: error.h:367
static bool is_valuechunk_key(const string &key)
const int DBCOMPACT_SINGLE_FILE
Produce a single-file database.
Definition: constants.h:268
void set_first_unused_block(uint4 base)
const std::string & get_free_list() const
Definition: glass_version.h:69
void set_free_list(const std::string &s)
Definition: glass_version.h:80
bool operator()(const PostlistCursor *a, const PostlistCursor *b) const
Return true if and only if a&#39;s key is strictly greater than b&#39;s key.
GlassSpellingTable spelling_table
Table storing spelling correction data.
Types used internally.
Wrapper around standard unique_ptr template.
static bool is_doclenchunk_key(const string &key)
void set_full_compaction(bool parity)
void pack_string_preserving_sort(std::string &s, const std::string &value, bool last=false)
Append an encoded std::string to a string, preserving the sort order.
Definition: pack.h:539
void pack_uint_preserving_sort(std::string &s, U value)
Append an encoded unsigned integer to a string, preserving the sort order.
Definition: pack.h:269
bool operator()(const GlassCursor *a, const GlassCursor *b) const
Return true if and only if a&#39;s key is strictly greater than b&#39;s key.
const string & get_tag() const