xapian-core  1.4.25
glass_compact.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19  * USA
20  */
21 
22 #include <config.h>
23 
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
28 
29 #include "autoptr.h"
30 #include <algorithm>
31 #include <queue>
32 
33 #include <cerrno>
34 #include <cstdio>
35 
36 #include "backends/flint_lock.h"
37 #include "glass_database.h"
38 #include "glass_defs.h"
39 #include "glass_table.h"
40 #include "glass_cursor.h"
41 #include "glass_version.h"
42 #include "filetests.h"
43 #include "internaltypes.h"
44 #include "pack.h"
45 #include "backends/valuestats.h"
46 
47 #include "../byte_length_strings.h"
48 #include "../prefix_compressed_strings.h"
49 
50 using namespace std;
51 
52 // Put all the helpers in a namespace to avoid symbols colliding with those of
53 // the same name in other flint-derived backends.
54 namespace GlassCompact {
55 
56 static inline bool
57 is_user_metadata_key(const string & key)
58 {
59  return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
60 }
61 
62 static inline bool
63 is_valuestats_key(const string & key)
64 {
65  return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
66 }
67 
68 static inline bool
69 is_valuechunk_key(const string & key)
70 {
71  return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
72 }
73 
74 static inline bool
75 is_doclenchunk_key(const string & key)
76 {
77  return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
78 }
79 
80 class PostlistCursor : private GlassCursor {
82 
83  public:
84  string key, tag;
87 
89  : GlassCursor(in), offset(offset_), firstdid(0)
90  {
91  find_entry(string());
92  next();
93  }
94 
95  bool next() {
96  if (!GlassCursor::next()) return false;
97  // We put all chunks into the non-initial chunk form here, then fix up
98  // the first chunk for each term in the merged database as we merge.
99  read_tag();
100  key = current_key;
101  tag = current_tag;
102  tf = cf = 0;
103  if (is_user_metadata_key(key)) return true;
104  if (is_valuestats_key(key)) return true;
105  if (is_valuechunk_key(key)) {
106  const char * p = key.data();
107  const char * end = p + key.length();
108  p += 2;
109  Xapian::valueno slot;
110  if (!unpack_uint(&p, end, &slot))
111  throw Xapian::DatabaseCorruptError("bad value key");
112  Xapian::docid did;
113  if (!unpack_uint_preserving_sort(&p, end, &did))
114  throw Xapian::DatabaseCorruptError("bad value key");
115  did += offset;
116 
117  key.assign("\0\xd8", 2);
118  pack_uint(key, slot);
119  pack_uint_preserving_sort(key, did);
120  return true;
121  }
122 
123  // Adjust key if this is *NOT* an initial chunk.
124  // key is: pack_string_preserving_sort(key, tname)
125  // plus optionally: pack_uint_preserving_sort(key, did)
126  const char * d = key.data();
127  const char * e = d + key.size();
128  if (is_doclenchunk_key(key)) {
129  d += 2;
130  } else {
131  string tname;
132  if (!unpack_string_preserving_sort(&d, e, tname))
133  throw Xapian::DatabaseCorruptError("Bad postlist key");
134  }
135 
136  if (d == e) {
137  // This is an initial chunk for a term, so adjust tag header.
138  d = tag.data();
139  e = d + tag.size();
140  if (!unpack_uint(&d, e, &tf) ||
141  !unpack_uint(&d, e, &cf) ||
142  !unpack_uint(&d, e, &firstdid)) {
143  throw Xapian::DatabaseCorruptError("Bad postlist key");
144  }
145  ++firstdid;
146  tag.erase(0, d - tag.data());
147  } else {
148  // Not an initial chunk, so adjust key.
149  size_t tmp = d - key.data();
150  if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
151  throw Xapian::DatabaseCorruptError("Bad postlist key");
152  if (is_doclenchunk_key(key)) {
153  key.erase(tmp);
154  } else {
155  key.erase(tmp - 1);
156  }
157  }
158  firstdid += offset;
159  return true;
160  }
161 };
162 
164  public:
167  bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
168  if (a->key > b->key) return true;
169  if (a->key != b->key) return false;
170  return (a->firstdid > b->firstdid);
171  }
172 };
173 
174 static string
176  const string & lbound, const string & ubound)
177 {
178  string value;
179  pack_uint(value, freq);
180  pack_string(value, lbound);
181  // We don't store or count empty values, so neither of the bounds
182  // can be empty. So we can safely store an empty upper bound when
183  // the bounds are equal.
184  if (lbound != ubound) value += ubound;
185  return value;
186 }
187 
188 static void
190  GlassTable * out, vector<Xapian::docid>::const_iterator offset,
191  vector<GlassTable*>::const_iterator b,
192  vector<GlassTable*>::const_iterator e)
193 {
194  priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
195  for ( ; b != e; ++b, ++offset) {
196  GlassTable *in = *b;
197  if (in->empty()) {
198  // Skip empty tables.
199  continue;
200  }
201 
202  pq.push(new PostlistCursor(in, *offset));
203  }
204 
205  string last_key;
206  {
207  // Merge user metadata.
208  vector<string> tags;
209  while (!pq.empty()) {
210  PostlistCursor * cur = pq.top();
211  const string& key = cur->key;
212  if (!is_user_metadata_key(key)) break;
213 
214  if (key != last_key) {
215  if (!tags.empty()) {
216  if (tags.size() > 1 && compactor) {
217  Assert(!last_key.empty());
218  // FIXME: It would be better to merge all duplicates
219  // for a key in one call, but currently we don't in
220  // multipass mode.
221  const string & resolved_tag =
222  compactor->resolve_duplicate_metadata(last_key,
223  tags.size(),
224  &tags[0]);
225  if (!resolved_tag.empty())
226  out->add(last_key, resolved_tag);
227  } else {
228  Assert(!last_key.empty());
229  out->add(last_key, tags[0]);
230  }
231  tags.resize(0);
232  }
233  last_key = key;
234  }
235  tags.push_back(cur->tag);
236 
237  pq.pop();
238  if (cur->next()) {
239  pq.push(cur);
240  } else {
241  delete cur;
242  }
243  }
244  if (!tags.empty()) {
245  if (tags.size() > 1 && compactor) {
246  Assert(!last_key.empty());
247  const string & resolved_tag =
248  compactor->resolve_duplicate_metadata(last_key,
249  tags.size(),
250  &tags[0]);
251  if (!resolved_tag.empty())
252  out->add(last_key, resolved_tag);
253  } else {
254  Assert(!last_key.empty());
255  out->add(last_key, tags[0]);
256  }
257  }
258  }
259 
260  {
261  // Merge valuestats.
262  Xapian::doccount freq = 0;
263  string lbound, ubound;
264 
265  while (!pq.empty()) {
266  PostlistCursor * cur = pq.top();
267  const string& key = cur->key;
268  if (!is_valuestats_key(key)) break;
269  if (key != last_key) {
270  // For the first valuestats key, last_key will be the previous
271  // key we wrote, which we don't want to overwrite. This is the
272  // only time that freq will be 0, so check that.
273  if (freq) {
274  out->add(last_key, encode_valuestats(freq, lbound, ubound));
275  freq = 0;
276  }
277  last_key = key;
278  }
279 
280  const string & tag = cur->tag;
281 
282  const char * pos = tag.data();
283  const char * end = pos + tag.size();
284 
286  string l, u;
287  if (!unpack_uint(&pos, end, &f)) {
288  if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
289  throw Xapian::RangeError("Frequency statistic in value table is too large");
290  }
291  if (!unpack_string(&pos, end, l)) {
292  if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
293  throw Xapian::RangeError("Lower bound in value table is too large");
294  }
295  size_t len = end - pos;
296  if (len == 0) {
297  u = l;
298  } else {
299  u.assign(pos, len);
300  }
301  if (freq == 0) {
302  freq = f;
303  lbound = l;
304  ubound = u;
305  } else {
306  freq += f;
307  if (l < lbound) lbound = l;
308  if (u > ubound) ubound = u;
309  }
310 
311  pq.pop();
312  if (cur->next()) {
313  pq.push(cur);
314  } else {
315  delete cur;
316  }
317  }
318 
319  if (freq) {
320  out->add(last_key, encode_valuestats(freq, lbound, ubound));
321  }
322  }
323 
324  // Merge valuestream chunks.
325  while (!pq.empty()) {
326  PostlistCursor * cur = pq.top();
327  const string & key = cur->key;
328  if (!is_valuechunk_key(key)) break;
330  out->add(key, cur->tag);
331  pq.pop();
332  if (cur->next()) {
333  pq.push(cur);
334  } else {
335  delete cur;
336  }
337  }
338 
339  Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
340  vector<pair<Xapian::docid, string>> tags;
341  while (true) {
342  PostlistCursor * cur = NULL;
343  if (!pq.empty()) {
344  cur = pq.top();
345  pq.pop();
346  }
347  Assert(cur == NULL || !is_user_metadata_key(cur->key));
348  if (cur == NULL || cur->key != last_key) {
349  if (!tags.empty()) {
350  string first_tag;
351  pack_uint(first_tag, tf);
352  pack_uint(first_tag, cf);
353  pack_uint(first_tag, tags[0].first - 1);
354  string tag = tags[0].second;
355  tag[0] = (tags.size() == 1) ? '1' : '0';
356  first_tag += tag;
357  out->add(last_key, first_tag);
358 
359  string term;
360  if (!is_doclenchunk_key(last_key)) {
361  const char * p = last_key.data();
362  const char * end = p + last_key.size();
363  if (!unpack_string_preserving_sort(&p, end, term) || p != end)
364  throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
365  }
366 
367  auto i = tags.begin();
368  while (++i != tags.end()) {
369  tag = i->second;
370  tag[0] = (i + 1 == tags.end()) ? '1' : '0';
371  out->add(pack_glass_postlist_key(term, i->first), tag);
372  }
373  }
374  tags.clear();
375  if (cur == NULL) break;
376  tf = cf = 0;
377  last_key = cur->key;
378  }
379  tf += cur->tf;
380  cf += cur->cf;
381  tags.push_back(make_pair(cur->firstdid, cur->tag));
382  if (cur->next()) {
383  pq.push(cur);
384  } else {
385  delete cur;
386  }
387  }
388 }
389 
390 struct MergeCursor : public GlassCursor {
391  explicit MergeCursor(GlassTable *in) : GlassCursor(in) {
392  find_entry(string());
393  next();
394  }
395 };
396 
397 struct CursorGt {
399  bool operator()(const GlassCursor *a, const GlassCursor *b) const {
400  if (b->after_end()) return false;
401  if (a->after_end()) return true;
402  return (a->current_key > b->current_key);
403  }
404 };
405 
406 static void
408  vector<GlassTable*>::const_iterator b,
409  vector<GlassTable*>::const_iterator e)
410 {
411  priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
412  for ( ; b != e; ++b) {
413  GlassTable *in = *b;
414  if (!in->empty()) {
415  pq.push(new MergeCursor(in));
416  }
417  }
418 
419  while (!pq.empty()) {
420  MergeCursor * cur = pq.top();
421  pq.pop();
422 
423  string key = cur->current_key;
424  if (pq.empty() || pq.top()->current_key > key) {
425  // No need to merge the tags, just copy the (possibly compressed)
426  // tag value.
427  bool compressed = cur->read_tag(true);
428  out->add(key, cur->current_tag, compressed);
429  if (cur->next()) {
430  pq.push(cur);
431  } else {
432  delete cur;
433  }
434  continue;
435  }
436 
437  // Merge tag values with the same key:
438  string tag;
439  if (key[0] != 'W') {
440  // We just want the union of words, so copy over the first instance
441  // and skip any identical ones.
442  priority_queue<PrefixCompressedStringItor *,
443  vector<PrefixCompressedStringItor *>,
445  // Stick all the MergeCursor pointers in a vector because their
446  // current_tag members must remain valid while we're merging their
447  // tags, but we need to call next() on them all afterwards.
448  vector<MergeCursor *> vec;
449  vec.reserve(pq.size());
450 
451  while (true) {
452  cur->read_tag();
453  pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
454  vec.push_back(cur);
455  if (pq.empty() || pq.top()->current_key != key) break;
456  cur = pq.top();
457  pq.pop();
458  }
459 
461  string lastword;
462  while (!pqtag.empty()) {
463  PrefixCompressedStringItor * it = pqtag.top();
464  pqtag.pop();
465  string word = **it;
466  if (word != lastword) {
467  lastword = word;
468  wr.append(lastword);
469  }
470  ++*it;
471  if (!it->at_end()) {
472  pqtag.push(it);
473  } else {
474  delete it;
475  }
476  }
477 
478  vector<MergeCursor *>::const_iterator i;
479  for (i = vec.begin(); i != vec.end(); ++i) {
480  cur = *i;
481  if (cur->next()) {
482  pq.push(cur);
483  } else {
484  delete cur;
485  }
486  }
487  } else {
488  // We want to sum the frequencies from tags for the same key.
489  Xapian::termcount tot_freq = 0;
490  while (true) {
491  cur->read_tag();
492  Xapian::termcount freq;
493  const char * p = cur->current_tag.data();
494  const char * end = p + cur->current_tag.size();
495  if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
496  throw Xapian::DatabaseCorruptError("Bad spelling word freq");
497  }
498  tot_freq += freq;
499  if (cur->next()) {
500  pq.push(cur);
501  } else {
502  delete cur;
503  }
504  if (pq.empty() || pq.top()->current_key != key) break;
505  cur = pq.top();
506  pq.pop();
507  }
508  tag.resize(0);
509  pack_uint_last(tag, tot_freq);
510  }
511  out->add(key, tag);
512  }
513 }
514 
515 static void
517  vector<GlassTable*>::const_iterator b,
518  vector<GlassTable*>::const_iterator e)
519 {
520  priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
521  for ( ; b != e; ++b) {
522  GlassTable *in = *b;
523  if (!in->empty()) {
524  pq.push(new MergeCursor(in));
525  }
526  }
527 
528  while (!pq.empty()) {
529  MergeCursor * cur = pq.top();
530  pq.pop();
531 
532  string key = cur->current_key;
533  if (pq.empty() || pq.top()->current_key > key) {
534  // No need to merge the tags, just copy the (possibly compressed)
535  // tag value.
536  bool compressed = cur->read_tag(true);
537  out->add(key, cur->current_tag, compressed);
538  if (cur->next()) {
539  pq.push(cur);
540  } else {
541  delete cur;
542  }
543  continue;
544  }
545 
546  // Merge tag values with the same key:
547  string tag;
548 
549  // We just want the union of words, so copy over the first instance
550  // and skip any identical ones.
551  priority_queue<ByteLengthPrefixedStringItor *,
552  vector<ByteLengthPrefixedStringItor *>,
554  vector<MergeCursor *> vec;
555 
556  while (true) {
557  cur->read_tag();
558  pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
559  vec.push_back(cur);
560  if (pq.empty() || pq.top()->current_key != key) break;
561  cur = pq.top();
562  pq.pop();
563  }
564 
565  string lastword;
566  while (!pqtag.empty()) {
567  ByteLengthPrefixedStringItor * it = pqtag.top();
568  pqtag.pop();
569  if (**it != lastword) {
570  lastword = **it;
571  tag += uint8_t(lastword.size() ^ MAGIC_XOR_VALUE);
572  tag += lastword;
573  }
574  ++*it;
575  if (!it->at_end()) {
576  pqtag.push(it);
577  } else {
578  delete it;
579  }
580  }
581 
582  vector<MergeCursor *>::const_iterator i;
583  for (i = vec.begin(); i != vec.end(); ++i) {
584  cur = *i;
585  if (cur->next()) {
586  pq.push(cur);
587  } else {
588  delete cur;
589  }
590  }
591 
592  out->add(key, tag);
593  }
594 }
595 
596 static void
598  GlassTable * out, const char * tmpdir,
599  vector<GlassTable *> tmp,
600  vector<Xapian::docid> off)
601 {
602  unsigned int c = 0;
603  while (tmp.size() > 3) {
604  vector<GlassTable *> tmpout;
605  tmpout.reserve(tmp.size() / 2);
606  vector<Xapian::docid> newoff;
607  newoff.resize(tmp.size() / 2);
608  for (unsigned int i = 0, j; i < tmp.size(); i = j) {
609  j = i + 2;
610  if (j == tmp.size() - 1) ++j;
611 
612  string dest = tmpdir;
613  char buf[64];
614  sprintf(buf, "/tmp%u_%u.", c, i / 2);
615  dest += buf;
616 
617  GlassTable * tmptab = new GlassTable("postlist", dest, false);
618 
619  // Use maximum blocksize for temporary tables. And don't compress
620  // entries in temporary tables, even if the final table would do
621  // so. Any already compressed entries will get copied in
622  // compressed form.
623  RootInfo root_info;
624  root_info.init(65536, 0);
625  const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
626  tmptab->create_and_open(flags, root_info);
627 
628  merge_postlists(compactor, tmptab, off.begin() + i,
629  tmp.begin() + i, tmp.begin() + j);
630  if (c > 0) {
631  for (unsigned int k = i; k < j; ++k) {
632  unlink(tmp[k]->get_path().c_str());
633  delete tmp[k];
634  tmp[k] = NULL;
635  }
636  }
637  tmpout.push_back(tmptab);
638  tmptab->flush_db();
639  tmptab->commit(1, &root_info);
640  AssertRel(root_info.get_blocksize(),==,65536);
641  }
642  swap(tmp, tmpout);
643  swap(off, newoff);
644  ++c;
645  }
646  merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
647  if (c > 0) {
648  for (size_t k = 0; k < tmp.size(); ++k) {
649  unlink(tmp[k]->get_path().c_str());
650  delete tmp[k];
651  tmp[k] = NULL;
652  }
653  }
654 }
655 
656 class PositionCursor : private GlassCursor {
658 
659  public:
660  string key;
662 
664  : GlassCursor(in), offset(offset_), firstdid(0) {
665  find_entry(string());
666  next();
667  }
668 
669  bool next() {
670  if (!GlassCursor::next()) return false;
671  read_tag();
672  const char * d = current_key.data();
673  const char * e = d + current_key.size();
674  string term;
675  Xapian::docid did;
676  if (!unpack_string_preserving_sort(&d, e, term) ||
677  !unpack_uint_preserving_sort(&d, e, &did) ||
678  d != e) {
679  throw Xapian::DatabaseCorruptError("Bad position key");
680  }
681 
682  key.resize(0);
683  pack_string_preserving_sort(key, term);
684  pack_uint_preserving_sort(key, did + offset);
685  return true;
686  }
687 
688  const string & get_tag() const {
689  return current_tag;
690  }
691 };
692 
694  public:
697  bool operator()(const PositionCursor *a, const PositionCursor *b) const {
698  return a->key > b->key;
699  }
700 };
701 
702 static void
703 merge_positions(GlassTable *out, const vector<GlassTable*> & inputs,
704  const vector<Xapian::docid> & offset)
705 {
706  priority_queue<PositionCursor *, vector<PositionCursor *>, PositionCursorGt> pq;
707  for (size_t i = 0; i < inputs.size(); ++i) {
708  GlassTable *in = inputs[i];
709  if (in->empty()) {
710  // Skip empty tables.
711  continue;
712  }
713 
714  pq.push(new PositionCursor(in, offset[i]));
715  }
716 
717  while (!pq.empty()) {
718  PositionCursor * cur = pq.top();
719  pq.pop();
720  out->add(cur->key, cur->get_tag());
721  if (cur->next()) {
722  pq.push(cur);
723  } else {
724  delete cur;
725  }
726  }
727 }
728 
729 static void
730 merge_docid_keyed(GlassTable *out, const vector<GlassTable*> & inputs,
731  const vector<Xapian::docid> & offset)
732 {
733  for (size_t i = 0; i < inputs.size(); ++i) {
734  Xapian::docid off = offset[i];
735 
736  GlassTable * in = inputs[i];
737  if (in->empty()) continue;
738 
739  GlassCursor cur(in);
740  cur.find_entry(string());
741 
742  string key;
743  while (cur.next()) {
744  // Adjust the key if this isn't the first database.
745  if (off) {
746  Xapian::docid did;
747  const char * d = cur.current_key.data();
748  const char * e = d + cur.current_key.size();
749  if (!unpack_uint_preserving_sort(&d, e, &did)) {
750  string msg = "Bad key in ";
751  msg += inputs[i]->get_path();
752  throw Xapian::DatabaseCorruptError(msg);
753  }
754  did += off;
755  key.resize(0);
756  pack_uint_preserving_sort(key, did);
757  if (d != e) {
758  // Copy over anything extra in the key (e.g. the zero byte
759  // at the end of "used value slots" in the termlist table).
760  key.append(d, e - d);
761  }
762  } else {
763  key = cur.current_key;
764  }
765  bool compressed = cur.read_tag(true);
766  out->add(key, cur.current_tag, compressed);
767  }
768  }
769 }
770 
771 }
772 
773 using namespace GlassCompact;
774 
775 void
777  const char * destdir,
778  int fd,
779  const vector<Xapian::Database::Internal*> & sources,
780  const vector<Xapian::docid> & offset,
781  size_t block_size,
783  unsigned flags,
784  Xapian::docid last_docid)
785 {
786  struct table_list {
787  // The "base name" of the table.
788  char name[9];
789  // The type.
790  Glass::table_type type;
791  // Create tables after position lazily.
792  bool lazy;
793  };
794 
795  static const table_list tables[] = {
796  // name type lazy
797  { "postlist", Glass::POSTLIST, false },
798  { "docdata", Glass::DOCDATA, true },
799  { "termlist", Glass::TERMLIST, false },
800  { "position", Glass::POSITION, true },
801  { "spelling", Glass::SPELLING, true },
802  { "synonym", Glass::SYNONYM, true }
803  };
804  const table_list * tables_end = tables +
805  (sizeof(tables) / sizeof(tables[0]));
806 
807  const int FLAGS = Xapian::DB_DANGEROUS;
808 
809  bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
810  bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
811  if (single_file) {
812  // FIXME: Support this combination - we need to put temporary files
813  // somewhere.
814  multipass = false;
815  }
816 
817  for (size_t i = 0; i != sources.size(); ++i) {
818  auto db = static_cast<const GlassDatabase*>(sources[i]);
819  if (db->has_uncommitted_changes()) {
820  const char * m =
821  "Can't compact from a WritableDatabase with uncommitted "
822  "changes - either call commit() first, or create a new "
823  "Database object from the filename on disk";
825  }
826  }
827 
828  if (block_size < 2048 || block_size > 65536 ||
829  (block_size & (block_size - 1)) != 0) {
830  block_size = GLASS_DEFAULT_BLOCKSIZE;
831  }
832 
833  FlintLock lock(destdir ? destdir : "");
834  if (!single_file) {
835  string explanation;
836  FlintLock::reason why = lock.lock(true, false, explanation);
837  if (why != FlintLock::SUCCESS) {
838  lock.throw_databaselockerror(why, destdir, explanation);
839  }
840  }
841 
842  AutoPtr<GlassVersion> version_file_out;
843  if (single_file) {
844  if (destdir) {
845  fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
846  if (fd < 0) {
847  throw Xapian::DatabaseCreateError("open() failed", errno);
848  }
849  }
850  version_file_out.reset(new GlassVersion(fd));
851  } else {
852  fd = -1;
853  version_file_out.reset(new GlassVersion(destdir));
854  }
855 
856  version_file_out->create(block_size);
857  for (size_t i = 0; i != sources.size(); ++i) {
858  GlassDatabase * db = static_cast<GlassDatabase*>(sources[i]);
859  version_file_out->merge_stats(db->version_file);
860  }
861 
862  string fl_serialised;
863  if (single_file) {
864  GlassFreeList fl;
865  fl.set_first_unused_block(1); // FIXME: Assumption?
866  fl.pack(fl_serialised);
867  }
868 
869  vector<GlassTable *> tabs;
870  tabs.reserve(tables_end - tables);
871  off_t prev_size = block_size;
872  for (const table_list * t = tables; t < tables_end; ++t) {
873  // The postlist table requires an N-way merge, adjusting the
874  // headers of various blocks. The spelling and synonym tables also
875  // need special handling. The other tables have keys sorted in
876  // docid order, so we can merge them by simply copying all the keys
877  // from each source table in turn.
878  if (compactor)
879  compactor->set_status(t->name, string());
880 
881  string dest;
882  if (!single_file) {
883  dest = destdir;
884  dest += '/';
885  dest += t->name;
886  dest += '.';
887  }
888 
889  bool output_will_exist = !t->lazy;
890 
891  // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
892  // on certain systems).
893  bool bad_stat = false;
894 
895  // We can't currently report input sizes if there's a single file DB
896  // amongst the inputs.
897  bool single_file_in = false;
898 
899  off_t in_size = 0;
900 
901  vector<GlassTable*> inputs;
902  inputs.reserve(sources.size());
903  size_t inputs_present = 0;
904  for (auto src : sources) {
905  GlassDatabase * db = static_cast<GlassDatabase*>(src);
906  GlassTable * table;
907  switch (t->type) {
908  case Glass::POSTLIST:
909  table = &(db->postlist_table);
910  break;
911  case Glass::DOCDATA:
912  table = &(db->docdata_table);
913  break;
914  case Glass::TERMLIST:
915  table = &(db->termlist_table);
916  break;
917  case Glass::POSITION:
918  table = &(db->position_table);
919  break;
920  case Glass::SPELLING:
921  table = &(db->spelling_table);
922  break;
923  case Glass::SYNONYM:
924  table = &(db->synonym_table);
925  break;
926  default:
927  Assert(false);
928  return;
929  }
930 
931  if (db->single_file()) {
932  if (t->lazy && table->empty()) {
933  // Essentially doesn't exist.
934  } else {
935  // FIXME: Find actual size somehow?
936  // in_size += table->size() / 1024;
937  single_file_in = true;
938  output_will_exist = true;
939  ++inputs_present;
940  }
941  } else {
942  off_t db_size = file_size(table->get_path());
943  if (errno == 0) {
944  in_size += db_size / 1024;
945  output_will_exist = true;
946  ++inputs_present;
947  } else if (errno != ENOENT) {
948  // We get ENOENT for an optional table.
949  bad_stat = true;
950  output_will_exist = true;
951  ++inputs_present;
952  }
953  }
954  inputs.push_back(table);
955  }
956 
957  // If any inputs lack a termlist table, suppress it in the output.
958  if (t->type == Glass::TERMLIST && inputs_present != sources.size()) {
959  if (inputs_present != 0) {
960  if (compactor) {
961  string m = str(inputs_present);
962  m += " of ";
963  m += str(sources.size());
964  m += " inputs present, so suppressing output";
965  compactor->set_status(t->name, m);
966  }
967  continue;
968  }
969  output_will_exist = false;
970  }
971 
972  if (!output_will_exist) {
973  if (compactor)
974  compactor->set_status(t->name, "doesn't exist");
975  continue;
976  }
977 
978  GlassTable * out;
979  if (single_file) {
980  out = new GlassTable(t->name, fd, version_file_out->get_offset(),
981  false, false);
982  } else {
983  out = new GlassTable(t->name, dest, false, t->lazy);
984  }
985  tabs.push_back(out);
986  RootInfo * root_info = version_file_out->root_to_set(t->type);
987  if (single_file) {
988  root_info->set_free_list(fl_serialised);
989  out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
990  } else {
991  out->create_and_open(FLAGS, *root_info);
992  }
993 
994  out->set_full_compaction(compaction != compactor->STANDARD);
995  if (compaction == compactor->FULLER) out->set_max_item_size(1);
996 
997  switch (t->type) {
998  case Glass::POSTLIST: {
999  if (multipass && inputs.size() > 3) {
1000  multimerge_postlists(compactor, out, destdir,
1001  inputs, offset);
1002  } else {
1003  merge_postlists(compactor, out, offset.begin(),
1004  inputs.begin(), inputs.end());
1005  }
1006  break;
1007  }
1008  case Glass::SPELLING:
1009  merge_spellings(out, inputs.begin(), inputs.end());
1010  break;
1011  case Glass::SYNONYM:
1012  merge_synonyms(out, inputs.begin(), inputs.end());
1013  break;
1014  case Glass::POSITION:
1015  merge_positions(out, inputs, offset);
1016  break;
1017  default:
1018  // DocData, Termlist
1019  merge_docid_keyed(out, inputs, offset);
1020  break;
1021  }
1022 
1023  if (out->is_modified()) {
1024  // Commit as revision 1.
1025  out->flush_db();
1026  out->commit(1, root_info);
1027  out->sync();
1028  }
1029  if (single_file) fl_serialised = root_info->get_free_list();
1030 
1031  off_t out_size = 0;
1032  if (!bad_stat && !single_file_in) {
1033  off_t db_size;
1034  if (single_file) {
1035  db_size = file_size(fd);
1036  } else {
1037  db_size = file_size(dest + GLASS_TABLE_EXTENSION);
1038  }
1039  if (errno == 0) {
1040  if (single_file) {
1041  off_t old_prev_size = max(prev_size, off_t(block_size));
1042  prev_size = db_size;
1043  db_size = max(db_size, off_t(block_size));
1044  db_size -= old_prev_size;
1045  }
1046  out_size = db_size / 1024;
1047  } else {
1048  bad_stat = (errno != ENOENT);
1049  }
1050  }
1051  if (bad_stat) {
1052  if (compactor)
1053  compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
1054  } else if (single_file_in) {
1055  if (compactor)
1056  compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
1057  } else {
1058  string status;
1059  if (out_size == in_size) {
1060  status = "Size unchanged (";
1061  } else {
1062  off_t delta;
1063  if (out_size < in_size) {
1064  delta = in_size - out_size;
1065  status = "Reduced by ";
1066  } else {
1067  delta = out_size - in_size;
1068  status = "INCREASED by ";
1069  }
1070  if (in_size) {
1071  status += str(100 * delta / in_size);
1072  status += "% ";
1073  }
1074  status += str(delta);
1075  status += "K (";
1076  status += str(in_size);
1077  status += "K -> ";
1078  }
1079  status += str(out_size);
1080  status += "K)";
1081  if (compactor)
1082  compactor->set_status(t->name, status);
1083  }
1084  }
1085 
1086  // If compacting to a single file output and all the tables are empty, pad
1087  // the output so that it isn't mistaken for a stub database when we try to
1088  // open it. For this it needs to be a multiple of 2KB in size.
1089  if (single_file && prev_size < off_t(block_size)) {
1090 #ifdef HAVE_FTRUNCATE
1091  if (ftruncate(fd, block_size) < 0) {
1092  throw Xapian::DatabaseError("Failed to set size of output database", errno);
1093  }
1094 #else
1095  const off_t off = block_size - 1;
1096  if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
1097  throw Xapian::DatabaseError("Failed to set size of output database", errno);
1098  }
1099 #endif
1100  }
1101 
1102  if (single_file) {
1103  if (lseek(fd, version_file_out->get_offset(), SEEK_SET) < 0) {
1104  throw Xapian::DatabaseError("lseek() failed", errno);
1105  }
1106  }
1107  version_file_out->set_last_docid(last_docid);
1108  string tmpfile = version_file_out->write(1, FLAGS);
1109  for (unsigned j = 0; j != tabs.size(); ++j) {
1110  tabs[j]->sync();
1111  }
1112  // Commit with revision 1.
1113  version_file_out->sync(tmpfile, 1, FLAGS);
1114  for (unsigned j = 0; j != tabs.size(); ++j) {
1115  delete tabs[j];
1116  }
1117 
1118  if (!single_file) lock.release();
1119 }
GlassVersion version_file
The file describing the Glass database.
void throw_databaselockerror(FlintLock::reason why, const std::string &db_dir, const std::string &explanation) const
Throw Xapian::DatabaseLockError.
Definition: flint_lock.cc:499
#define Assert(COND)
Definition: omassert.h:122
void release()
Release the lock.
Definition: flint_lock.cc:463
GlassVersion class.
typedefs for Xapian
MergeCursor(GlassTable *in)
Statistics about values.
GlassPositionListTable position_table
Table storing position lists.
void create_and_open(int flags_, const RootInfo &root_info)
Create a new empty btree structure on disk and open it at the initial revision.
table_type
Definition: glass_defs.h:53
static void merge_positions(GlassTable *out, const vector< GlassTable *> &inputs, const vector< Xapian::docid > &offset)
Allow oversize items to save more space (not recommended if you ever plan to update the compacted dat...
Definition: compactor.h:55
#define AssertRel(A, REL, B)
Definition: omassert.h:123
InvalidOperationError indicates the API was used in an invalid way.
Definition: error.h:283
bool empty() const
Return true if there are no entries in the table.
Definition: glass_table.h:681
Class managing a Btree table in a Glass database.
Definition: glass_table.h:425
GlassSynonymTable synonym_table
Table storing synonym data.
static void merge_synonyms(GlassTable *out, vector< GlassTable *>::const_iterator b, vector< GlassTable *>::const_iterator e)
Constants in the Xapian namespace.
The GlassVersion class manages the revision files.
Definition: glass_version.h:94
static void compact(Xapian::Compactor *compactor, const char *destdir, int fd, const std::vector< Xapian::Database::Internal *> &sources, const std::vector< Xapian::docid > &offset, size_t block_size, Xapian::Compactor::compaction_level compaction, unsigned flags, Xapian::docid last_docid)
#define O_BINARY
Definition: safefcntl.h:81
WritableDatabase open()
Construct a WritableDatabase object for a new, empty InMemory database.
Definition: dbfactory.h:104
Don&#39;t split items unnecessarily.
Definition: compactor.h:50
Compact a database, or merge and compact several.
bool next()
Advance to the next key.
static bool tags
STL namespace.
Definitions, types, etc for use inside glass.
Flint-compatible database locking.
void add(const std::string &key, const std::string &tag, bool already_compressed=false)
Add a key/tag pair to the table, replacing any existing pair with the same key.
static void merge_postlists(Xapian::Compactor *compactor, GlassTable *out, vector< Xapian::docid >::const_iterator offset, vector< GlassTable *>::const_iterator b, vector< GlassTable *>::const_iterator e)
bool after_end() const
Determine whether cursor is off the end of table.
Definition: glass_cursor.h:329
Utility functions for testing files.
#define O_CLOEXEC
Definition: safefcntl.h:90
string get_path() const
Definition: glass_table.h:728
GlassDocDataTable docdata_table
Table storing document data.
#define GLASS_TABLE_EXTENSION
Glass table extension.
Definition: glass_defs.h:27
bool read_tag(bool keep_compressed=false)
Read the tag from the table and store it in current_tag.
virtual void set_status(const std::string &table, const std::string &status)
Update progress.
Definition: compactor.cc:135
Hierarchy of classes which Xapian can throw as exceptions.
unsigned XAPIAN_TERMCOUNT_BASE_TYPE termcount
A counts of terms.
Definition: types.h:72
void flush_db()
Flush any outstanding changes to the DB file of the table.
RangeError indicates an attempt to access outside the bounds of a container.
Definition: error.h:971
void pack_uint_last(std::string &s, U value)
Append an encoded unsigned integer to a string as the last item.
Definition: pack.h:93
static void multimerge_postlists(Xapian::Compactor *compactor, GlassTable *out, const char *tmpdir, vector< GlassTable *> tmp, vector< Xapian::docid > off)
bool sync()
Definition: glass_table.h:535
string current_key
Current key pointed to by cursor.
Definition: glass_cursor.h:239
GlassPostListTable postlist_table
Table storing posting lists.
PostlistCursor(GlassTable *in, Xapian::docid offset_)
bool operator()(const PositionCursor *a, const PositionCursor *b) const
Return true if and only if a&#39;s key is strictly greater than b&#39;s key.
virtual std::string resolve_duplicate_metadata(const std::string &key, size_t num_tags, const std::string tags[])
Resolve multiple user metadata entries with the same key.
Definition: compactor.cc:142
bool is_modified() const
Determine whether the object contains uncommitted modifications.
Definition: glass_table.h:697
DatabaseCreateError indicates a failure to create a database.
Definition: error.h:451
void commit(glass_revision_number_t revision, RootInfo *root_info)
Commit any outstanding changes to the table.
string current_tag
Current tag pointed to by cursor.
Definition: glass_cursor.h:244
Compact a database, or merge and compact several.
Definition: compactor.h:42
static void merge_spellings(GlassTable *out, vector< GlassTable *>::const_iterator b, vector< GlassTable *>::const_iterator e)
static bool is_valuestats_key(const string &key)
string str(int value)
Convert int to std::string.
Definition: str.cc:90
GlassTermListTable termlist_table
Table storing term lists.
#define GLASS_DEFAULT_BLOCKSIZE
Default B-tree block size.
Definition: glass_defs.h:30
C++ class definition for glass database.
static void merge_docid_keyed(GlassTable *out, const vector< GlassTable *> &inputs, const vector< Xapian::docid > &offset)
static bool is_user_metadata_key(const string &key)
bool unpack_string_preserving_sort(const char **p, const char *end, std::string &result)
Decode a "sort preserved" std::string from a string.
Definition: pack.h:562
bool unpack_uint_preserving_sort(const char **p, const char *end, U *result)
Decode a "sort preserved" unsigned integer from a string.
Definition: pack.h:318
Btree implementation.
static string encode_valuestats(Xapian::doccount freq, const string &lbound, const string &ubound)
const int DB_DANGEROUS
Update the database in-place.
Definition: constants.h:103
void set_max_item_size(size_t block_capacity)
Set the maximum item size given the block capacity.
Definition: glass_table.h:704
A backend designed for efficient indexing and retrieval, using compressed posting lists and a btree s...
A cursor pointing to a position in a Btree table, for reading several entries in order, or finding approximate matches.
Definition: glass_cursor.h:147
const int DB_NO_SYNC
Don&#39;t attempt to ensure changes have hit disk.
Definition: constants.h:66
DatabaseCorruptError indicates database corruption was detected.
Definition: error.h:409
void open(int flags_, const RootInfo &root_info, glass_revision_number_t rev)
Open the btree.
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:382
void init(unsigned blocksize_, uint4 compress_min_)
bool single_file() const
unsigned get_blocksize() const
Definition: glass_version.h:64
unsigned XAPIAN_DOCID_BASE_TYPE doccount
A count of documents.
Definition: types.h:38
#define MAGIC_XOR_VALUE
void pack_string(std::string &s, const std::string &value)
Append an encoded std::string to a string.
Definition: pack.h:477
Interface to Btree cursors.
std::string pack_glass_postlist_key(const std::string &term)
Definition: pack.h:613
compaction_level
Compaction level.
Definition: compactor.h:48
Pack types into strings and unpack them again.
unsigned valueno
The number for a value slot in a document.
Definition: types.h:108
bool unpack_uint_last(const char **p, const char *end, U *result)
Decode an unsigned integer as the last item in a string.
Definition: pack.h:111
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:413
PositionCursor(GlassTable *in, Xapian::docid offset_)
const int DBCOMPACT_MULTIPASS
If merging more than 3 databases, merge the postlists in multiple passes.
Definition: constants.h:262
reason lock(bool exclusive, bool wait, std::string &explanation)
Attempt to obtain the lock.
Definition: flint_lock.cc:125
off_t file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:71
Definition: header.h:151
void pack(std::string &buf)
void append(const std::string &word)
bool unpack_string(const char **p, const char *end, std::string &result)
Decode a std::string from a string.
Definition: pack.h:504
bool find_entry(const string &key)
Position the cursor on the highest entry with key <= key.
unsigned XAPIAN_DOCID_BASE_TYPE docid
A unique identifier for a document.
Definition: types.h:52
DatabaseError indicates some sort of database related error.
Definition: error.h:367
static bool is_valuechunk_key(const string &key)
const int DBCOMPACT_SINGLE_FILE
Produce a single-file database.
Definition: constants.h:268
void set_first_unused_block(uint4 base)
const std::string & get_free_list() const
Definition: glass_version.h:69
void set_free_list(const std::string &s)
Definition: glass_version.h:80
bool operator()(const PostlistCursor *a, const PostlistCursor *b) const
Return true if and only if a&#39;s key is strictly greater than b&#39;s key.
GlassSpellingTable spelling_table
Table storing spelling correction data.
Types used internally.
Wrapper around standard unique_ptr template.
static bool is_doclenchunk_key(const string &key)
void set_full_compaction(bool parity)
void pack_string_preserving_sort(std::string &s, const std::string &value, bool last=false)
Append an encoded std::string to a string, preserving the sort order.
Definition: pack.h:539
void pack_uint_preserving_sort(std::string &s, U value)
Append an encoded unsigned integer to a string, preserving the sort order.
Definition: pack.h:269
bool operator()(const GlassCursor *a, const GlassCursor *b) const
Return true if and only if a&#39;s key is strictly greater than b&#39;s key.
const string & get_tag() const