xapian-core  1.4.27
remoteconnection.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2006-2024 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include "remoteconnection.h"
24 
25 #include <xapian/error.h>
26 
27 #include "safefcntl.h"
28 #include "safeunistd.h"
29 
30 #ifdef HAVE_POLL_H
31 # include <poll.h>
32 #else
33 # include "safesysselect.h"
34 #endif
35 
36 #include <algorithm>
37 #include <cerrno>
38 #include <climits>
39 #include <cstdint>
40 #include <string>
41 #ifdef __WIN32__
42 # include <type_traits>
43 #endif
44 
45 #include "debuglog.h"
46 #include "fd.h"
47 #include "filetests.h"
48 #include "noreturn.h"
49 #include "omassert.h"
50 #include "overflow.h"
51 #include "posixy_wrapper.h"
52 #include "realtime.h"
53 #include "length.h"
54 #include "socket_utils.h"
55 
56 using namespace std;
57 
58 #define CHUNKSIZE 4096
59 
60 XAPIAN_NORETURN(static void throw_database_closed());
61 static void
63 {
64  throw Xapian::DatabaseClosedError("Database has been closed");
65 }
66 
67 XAPIAN_NORETURN(static void throw_network_error_insane_message_length());
68 static void
70 {
71  throw Xapian::NetworkError("Insane message length specified!");
72 }
73 
74 XAPIAN_NORETURN(static void throw_timeout(const char*, const string&));
75 static void
76 throw_timeout(const char* msg, const string& context)
77 {
78  throw Xapian::NetworkTimeoutError(msg, context);
79 }
80 
81 #ifdef __WIN32__
82 static inline void
83 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
84 {
85  if (add_overflows(overlapped.Offset, n, overlapped.Offset))
86  ++overlapped.OffsetHigh;
87 }
88 #endif
89 
90 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
91  const string & context_)
92  : fdin(fdin_), fdout(fdout_), context(context_)
93 {
94 #ifdef __WIN32__
95  memset(&overlapped, 0, sizeof(overlapped));
96  overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
97  if (!overlapped.hEvent)
98  throw Xapian::NetworkError("Failed to setup OVERLAPPED",
99  context, -int(GetLastError()));
100 
101 #elif defined USE_SO_NOSIGPIPE
102  // SO_NOSIGPIPE is a non-standardised socket option supported by a number
103  // of platforms - at least DragonFlyBSD, FreeBSD, macOS (not older
104  // versions, e.g. 10.15 apparently lacks it), Solaris; notably not
105  // supported by Linux or OpenBSD though.
106  //
107  // We use it where supported due to one big advantage over POSIX's
108  // MSG_NOSIGNAL which is that we can just set it once for a socket whereas
109  // with MSG_NOSIGNAL we need to call send(..., MSG_NOSIGNAL) instead of
110  // write(...), but send() only works on sockets, so with MSG_NOSIGNAL any
111  // code which might be working with files or pipes as well as sockets needs
112  // conditional handling depending on whether the fd is a socket or not.
113  //
114  // SO_NOSIGPIPE is present on NetBSD, but it seems when using it we still
115  // get SIGPIPE (reproduced on NetBSD 9.3 and 10.0) so we avoid using it
116  // there.
117  int on = 1;
118  if (setsockopt(fdout, SOL_SOCKET, SO_NOSIGPIPE,
119  reinterpret_cast<char*>(&on), sizeof(on)) < 0) {
120  // Some platforms (including FreeBSD, macOS, DragonflyBSD) seem to
121  // fail with EBADF instead of ENOTSOCK when passed a non-socket so
122  // allow either. If the descriptor is actually not valid we'll report
123  // it the next time we try to use it (as we would when not trying to
124  // use SO_NOSIGPIPE so this actually gives a more consistent error
125  // across platforms.
126  if (errno != ENOTSOCK && errno != EBADF) {
127  throw Xapian::NetworkError("Couldn't set SO_NOSIGPIPE on socket",
128  errno);
129  }
130  }
131 #elif defined USE_MSG_NOSIGNAL
132  // We can use send(..., MSG_NOSIGNAL) to avoid generating SIGPIPE
133  // (MSG_NOSIGNAL was added in POSIX.1-2008). This seems to be pretty much
134  // universally supported by current Unix-like platforms, but older macOS
135  // and Solaris apparently didn't have it.
136  //
137  // If fdout is not a socket, we'll set send_flags = 0 when the first send()
138  // fails with ENOTSOCK and use write() instead from then on.
139 #else
140  // It's simplest to just ignore SIGPIPE. Not ideal, but it seems only old
141  // versions of macOS and of Solaris will end up here so let's not bother
142  // trying to do any clever trickery.
143  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
144  throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
145  }
146 #endif
147 }
148 
149 #ifdef __WIN32__
150 RemoteConnection::~RemoteConnection()
151 {
152  if (overlapped.hEvent)
153  CloseHandle(overlapped.hEvent);
154 }
155 #endif
156 
157 bool
159 {
160  LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
161 
162  if (buffer.length() >= min_len) RETURN(true);
163 
164 #ifdef __WIN32__
165  HANDLE hin = fd_to_handle(fdin);
166  do {
167  char buf[CHUNKSIZE];
168  DWORD received;
169  BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
170  if (!ok) {
171  int errcode = GetLastError();
172  if (errcode != ERROR_IO_PENDING)
173  throw Xapian::NetworkError("read failed", context, -errcode);
174  // Is asynch - just wait for the data to be received or a timeout.
175  DWORD waitrc;
176  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
177  if (waitrc != WAIT_OBJECT_0) {
178  LOGLINE(REMOTE, "read: timeout has expired");
179  throw_timeout("Timeout expired while trying to read", context);
180  }
181  // Get the final result of the read.
182  if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
183  throw Xapian::NetworkError("Failed to get overlapped result",
184  context, -int(GetLastError()));
185  }
186 
187  if (received == 0) {
188  RETURN(false);
189  }
190 
191  buffer.append(buf, received);
192 
193  // We must update the offset in the OVERLAPPED structure manually.
194  update_overlapped_offset(overlapped, received);
195  } while (buffer.length() < min_len);
196 #else
197  // If there's no end_time, just use blocking I/O.
198  if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
199  throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
200  context, errno);
201  }
202 
203  while (true) {
204  char buf[CHUNKSIZE];
205  ssize_t received = read(fdin, buf, sizeof(buf));
206 
207  if (received > 0) {
208  buffer.append(buf, received);
209  if (buffer.length() >= min_len) RETURN(true);
210  continue;
211  }
212 
213  if (received == 0) {
214  RETURN(false);
215  }
216 
217  LOGLINE(REMOTE, "read gave errno = " << errno);
218  if (errno == EINTR) continue;
219 
220  if (errno != EAGAIN)
221  throw Xapian::NetworkError("read failed", context, errno);
222 
223  Assert(end_time != 0.0);
224  while (true) {
225  // Calculate how far in the future end_time is.
226  double now = RealTime::now();
227  double time_diff = end_time - now;
228  // Check if the timeout has expired.
229  if (time_diff < 0) {
230  LOGLINE(REMOTE, "read: timeout has expired");
231  throw_timeout("Timeout expired while trying to read", context);
232  }
233 
234  // Wait until there is data, an error, or the timeout is reached.
235 # ifdef HAVE_POLL
236  struct pollfd fds;
237  fds.fd = fdin;
238  fds.events = POLLIN;
239  int poll_result = poll(&fds, 1, int(time_diff * 1000));
240  if (poll_result > 0) break;
241 
242  if (poll_result == 0)
243  throw_timeout("Timeout expired while trying to read", context);
244 
245  // EINTR means poll was interrupted by a signal. EAGAIN means that
246  // allocation of internal data structures failed.
247  if (errno != EINTR && errno != EAGAIN)
248  throw Xapian::NetworkError("poll failed during read",
249  context, errno);
250 # else
251  if (fdin >= FD_SETSIZE) {
252  // We can't block with a timeout, so just sleep and retry.
253  RealTime::sleep(now + min(0.001, time_diff / 4));
254  break;
255  }
256  fd_set fdset;
257  FD_ZERO(&fdset);
258  FD_SET(fdin, &fdset);
259 
260  struct timeval tv;
261  RealTime::to_timeval(time_diff, &tv);
262  int select_result = select(fdin + 1, &fdset, 0, 0, &tv);
263  if (select_result > 0) break;
264 
265  if (select_result == 0)
266  throw_timeout("Timeout expired while trying to read", context);
267 
268  // EINTR means select was interrupted by a signal. The Linux
269  // select(2) man page says: "Portable programs may wish to check
270  // for EAGAIN and loop, just as with EINTR" and that seems to be
271  // necessary for cygwin at least.
272  if (errno != EINTR && errno != EAGAIN)
273  throw Xapian::NetworkError("select failed during read",
274  context, errno);
275 # endif
276  }
277  }
278 #endif
279  RETURN(true);
280 }
281 
282 bool
284 {
285  LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
286  if (fdin == -1)
288 
289  if (!buffer.empty()) RETURN(true);
290 
291  // See if there's data available to be read.
292 #ifdef HAVE_POLL
293  struct pollfd fds;
294  fds.fd = fdin;
295  fds.events = POLLIN;
296  RETURN(poll(&fds, 1, 0) > 0);
297 #else
298 # ifndef __WIN32__
299  if (fdin >= FD_SETSIZE) {
300  // Not ideal, but OK for how this method is currently used.
301  RETURN(true);
302  }
303 # endif
304  fd_set fdset;
305  FD_ZERO(&fdset);
306  FD_SET(fdin, &fdset);
307 
308  struct timeval tv;
309  tv.tv_sec = 0;
310  tv.tv_usec = 0;
311  RETURN(select(fdin + 1, &fdset, 0, 0, &tv) > 0);
312 #endif
313 }
314 
315 #ifndef __WIN32__
316 ssize_t
317 RemoteConnection::send_or_write(const void* p, size_t len)
318 {
319 # ifdef USE_MSG_NOSIGNAL
320  if (send_flags) {
321  ssize_t n = send(fdout, p, len, send_flags);
322  if (usual(n >= 0 || errno != ENOTSOCK)) return n;
323  // In some testcases in the testsuite and in xapian-progsrv (in some
324  // cases) fdout won't be a socket. Clear send_flags so we only try
325  // send() once in this case.
326  send_flags = 0;
327  }
328 # endif
329  return write(fdout, p, len);
330 }
331 #endif
332 
333 void
334 RemoteConnection::send_message(char type, const string &message,
335  double end_time)
336 {
337  LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
338  if (fdout == -1)
340 
341  string header;
342  header += type;
343  header += encode_length(message.size());
344 
345 #ifdef __WIN32__
346  HANDLE hout = fd_to_handle(fdout);
347  const string * str = &header;
348 
349  size_t count = 0;
350  while (true) {
351  DWORD n;
352  BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
353  if (!ok) {
354  int errcode = GetLastError();
355  if (errcode != ERROR_IO_PENDING)
356  throw Xapian::NetworkError("write failed", context, -errcode);
357  // Just wait for the data to be sent, or a timeout.
358  DWORD waitrc;
359  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
360  if (waitrc != WAIT_OBJECT_0) {
361  LOGLINE(REMOTE, "write: timeout has expired");
362  throw_timeout("Timeout expired while trying to write", context);
363  }
364  // Get the final result.
365  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
366  throw Xapian::NetworkError("Failed to get overlapped result",
367  context, -int(GetLastError()));
368  }
369 
370  count += n;
371 
372  // We must update the offset in the OVERLAPPED structure manually.
373  update_overlapped_offset(overlapped, n);
374 
375  if (count == str->size()) {
376  if (str == &message || message.empty()) return;
377  str = &message;
378  count = 0;
379  }
380  }
381 #else
382  // If there's no end_time, just use blocking I/O.
383  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
384  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
385  context, errno);
386  }
387 
388  const string * str = &header;
389 
390  size_t count = 0;
391  while (true) {
392  // We've set write to non-blocking, so just try writing as there
393  // will usually be space.
394  ssize_t n = send_or_write(str->data() + count, str->size() - count);
395 
396  if (n >= 0) {
397  count += n;
398  if (count == str->size()) {
399  if (str == &message || message.empty()) return;
400  str = &message;
401  count = 0;
402  }
403  continue;
404  }
405 
406  LOGLINE(REMOTE, "write gave errno = " << errno);
407  if (errno == EINTR) continue;
408 
409  if (errno != EAGAIN)
410  throw Xapian::NetworkError("write failed", context, errno);
411 
412  double now = RealTime::now();
413  double time_diff = end_time - now;
414  if (time_diff < 0) {
415  LOGLINE(REMOTE, "write: timeout has expired");
416  throw_timeout("Timeout expired while trying to write", context);
417  }
418 
419  // Wait until there is space or the timeout is reached.
420 # ifdef HAVE_POLL
421  struct pollfd fds;
422  fds.fd = fdout;
423  fds.events = POLLOUT;
424  int result = poll(&fds, 1, int(time_diff * 1000));
425 # define POLLSELECT "poll"
426 # else
427  if (fdout >= FD_SETSIZE) {
428  // We can't block with a timeout, so just sleep and retry.
429  RealTime::sleep(now + min(0.001, time_diff / 4));
430  continue;
431  }
432 
433  fd_set fdset;
434  FD_ZERO(&fdset);
435  FD_SET(fdout, &fdset);
436 
437  struct timeval tv;
438  RealTime::to_timeval(time_diff, &tv);
439  int result = select(fdout + 1, 0, &fdset, 0, &tv);
440 # define POLLSELECT "select"
441 # endif
442 
443  if (result < 0) {
444  if (errno == EINTR || errno == EAGAIN) {
445  // EINTR/EAGAIN means select was interrupted by a signal.
446  // We could just retry the poll/select, but it's easier to just
447  // retry the write.
448  continue;
449  }
450  throw Xapian::NetworkError(POLLSELECT " failed during write",
451  context, errno);
452 # undef POLLSELECT
453  }
454 
455  if (result == 0)
456  throw_timeout("Timeout expired while trying to write", context);
457  }
458 #endif
459 }
460 
461 void
462 RemoteConnection::send_file(char type, int fd, double end_time)
463 {
464  LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
465  if (fdout == -1)
467 
468  off_t size = file_size(fd);
469  if (errno)
470  throw Xapian::NetworkError("Couldn't stat file to send", errno);
471  // FIXME: Use sendfile() or similar if available?
472 
473  char buf[CHUNKSIZE];
474  buf[0] = type;
475  size_t c = 1;
476  {
477  string enc_size = encode_length(size);
478  c += enc_size.size();
479  // An encoded length should be just a few bytes.
480  AssertRel(c, <=, sizeof(buf));
481  memcpy(buf + 1, enc_size.data(), enc_size.size());
482  }
483 
484 #ifdef __WIN32__
485  HANDLE hout = fd_to_handle(fdout);
486  size_t count = 0;
487  while (true) {
488  DWORD n;
489  BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
490  if (!ok) {
491  int errcode = GetLastError();
492  if (errcode != ERROR_IO_PENDING)
493  throw Xapian::NetworkError("write failed", context, -errcode);
494  // Just wait for the data to be sent, or a timeout.
495  DWORD waitrc;
496  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
497  if (waitrc != WAIT_OBJECT_0) {
498  LOGLINE(REMOTE, "write: timeout has expired");
499  throw_timeout("Timeout expired while trying to write", context);
500  }
501  // Get the final result.
502  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
503  throw Xapian::NetworkError("Failed to get overlapped result",
504  context, -int(GetLastError()));
505  }
506 
507  count += n;
508 
509  // We must update the offset in the OVERLAPPED structure manually.
510  update_overlapped_offset(overlapped, n);
511 
512  if (count == c) {
513  if (size == 0) return;
514 
515  ssize_t res;
516  do {
517  res = read(fd, buf, sizeof(buf));
518  } while (res < 0 && errno == EINTR);
519  if (res < 0) throw Xapian::NetworkError("read failed", errno);
520  c = size_t(res);
521 
522  size -= c;
523  count = 0;
524  }
525  }
526 #else
527  // If there's no end_time, just use blocking I/O.
528  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
529  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
530  context, errno);
531  }
532 
533  size_t count = 0;
534  while (true) {
535  // We've set write to non-blocking, so just try writing as there
536  // will usually be space.
537  ssize_t n = send_or_write(buf + count, c - count);
538 
539  if (n >= 0) {
540  count += n;
541  if (count == c) {
542  if (size == 0) return;
543 
544  ssize_t res;
545  do {
546  res = read(fd, buf, sizeof(buf));
547  } while (res < 0 && errno == EINTR);
548  if (res < 0) throw Xapian::NetworkError("read failed", errno);
549  c = size_t(res);
550 
551  size -= c;
552  count = 0;
553  }
554  continue;
555  }
556 
557  LOGLINE(REMOTE, "write gave errno = " << errno);
558  if (errno == EINTR) continue;
559 
560  if (errno != EAGAIN)
561  throw Xapian::NetworkError("write failed", context, errno);
562 
563  double now = RealTime::now();
564  double time_diff = end_time - now;
565  if (time_diff < 0) {
566  LOGLINE(REMOTE, "write: timeout has expired");
567  throw_timeout("Timeout expired while trying to write", context);
568  }
569 
570  // Wait until there is space or the timeout is reached.
571 # ifdef HAVE_POLL
572  struct pollfd fds;
573  fds.fd = fdout;
574  fds.events = POLLOUT;
575  int result = poll(&fds, 1, int(time_diff * 1000));
576 # define POLLSELECT "poll"
577 # else
578  if (fdout >= FD_SETSIZE) {
579  // We can't block with a timeout, so just sleep and retry.
580  RealTime::sleep(now + min(0.001, time_diff / 4));
581  continue;
582  }
583 
584  fd_set fdset;
585  FD_ZERO(&fdset);
586  FD_SET(fdout, &fdset);
587 
588  struct timeval tv;
589  RealTime::to_timeval(time_diff, &tv);
590  int result = select(fdout + 1, 0, &fdset, 0, &tv);
591 # define POLLSELECT "select"
592 # endif
593 
594  if (result < 0) {
595  if (errno == EINTR || errno == EAGAIN) {
596  // EINTR/EAGAIN means select was interrupted by a signal.
597  // We could just retry the poll/select, but it's easier to just
598  // retry the write.
599  continue;
600  }
601  throw Xapian::NetworkError(POLLSELECT " failed during write",
602  context, errno);
603 # undef POLLSELECT
604  }
605 
606  if (result == 0)
607  throw_timeout("Timeout expired while trying to write", context);
608  }
609 #endif
610 }
611 
612 int
614 {
615  LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
616  if (fdin == -1)
618 
619  if (!read_at_least(1, end_time))
620  RETURN(-1);
621  unsigned char type = buffer[0];
622  RETURN(type);
623 }
624 
625 int
627 {
628  LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
629  if (fdin == -1)
631 
632  if (!read_at_least(2, end_time))
633  RETURN(-1);
634  size_t len = static_cast<unsigned char>(buffer[1]);
635  if (!read_at_least(len + 2, end_time))
636  RETURN(-1);
637  if (len != 0xff) {
638  result.assign(buffer.data() + 2, len);
639  unsigned char type = buffer[0];
640  buffer.erase(0, len + 2);
641  RETURN(type);
642  }
643  len = 0;
644  string::const_iterator i = buffer.begin() + 2;
645  unsigned char ch;
646  int shift = 0;
647  do {
648  if (i == buffer.end() || shift > 28) {
649  // Something is very wrong...
651  }
652  ch = *i++;
653  len |= size_t(ch & 0x7f) << shift;
654  shift += 7;
655  } while ((ch & 0x80) == 0);
656  len += 255;
657  size_t header_len = (i - buffer.begin());
658  if (!read_at_least(header_len + len, end_time))
659  RETURN(-1);
660  result.assign(buffer.data() + header_len, len);
661  unsigned char type = buffer[0];
662  buffer.erase(0, header_len + len);
663  RETURN(type);
664 }
665 
666 int
668 {
669  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
670 
671  if (fdin == -1)
673 
674  if (!read_at_least(2, end_time))
675  RETURN(-1);
676  uint_least64_t len = static_cast<unsigned char>(buffer[1]);
677  if (len != 0xff) {
678  chunked_data_left = off_t(len);
679  char type = buffer[0];
680  buffer.erase(0, 2);
681  RETURN(type);
682  }
683  if (!read_at_least(len + 2, end_time))
684  RETURN(-1);
685  len = 0;
686  string::const_iterator i = buffer.begin() + 2;
687  unsigned char ch;
688  int shift = 0;
689  do {
690  // Allow at most 63 bits for message lengths - it's neatly a multiple
691  // of 7 bits and anything longer than this is almost certainly a
692  // corrupt value.
693  // The value also needs to be representable as an
694  // off_t (which is a signed type), so use that size if it is smaller.
695  const int SHIFT_LIMIT = 63;
696  if (rare(i == buffer.end() || shift >= SHIFT_LIMIT)) {
697  // Something is very wrong...
699  }
700  ch = *i++;
701  uint_least64_t bits = ch & 0x7f;
702  len |= bits << shift;
703  shift += 7;
704  } while ((ch & 0x80) == 0);
705  len += 255;
706 
707  chunked_data_left = off_t(len);
708  if (sizeof(off_t) * CHAR_BIT < 63) {
709  // Check that the value of len fits in an off_t without loss.
710  if (rare(uint_least64_t(chunked_data_left) != len)) {
712  }
713  }
714 
715  unsigned char type = buffer[0];
716  size_t header_len = (i - buffer.begin());
717  buffer.erase(0, header_len);
718  RETURN(type);
719 }
720 
721 int
722 RemoteConnection::get_message_chunk(string &result, size_t at_least,
723  double end_time)
724 {
725  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
726  if (fdin == -1)
728 
729  if (at_least <= result.size()) RETURN(true);
730  at_least -= result.size();
731 
732  bool read_enough = (off_t(at_least) <= chunked_data_left);
733  if (!read_enough) at_least = size_t(chunked_data_left);
734 
735  if (!read_at_least(at_least, end_time))
736  RETURN(-1);
737 
738  size_t retlen = min(off_t(buffer.size()), chunked_data_left);
739  result.append(buffer, 0, retlen);
740  buffer.erase(0, retlen);
741  chunked_data_left -= retlen;
742 
743  RETURN(int(read_enough));
744 }
745 
747 static void
748 write_all(int fd, const char * p, size_t n)
749 {
750  while (n) {
751  ssize_t c = write(fd, p, n);
752  if (c < 0) {
753  if (errno == EINTR) continue;
754  throw Xapian::NetworkError("Error writing to file", errno);
755  }
756  p += c;
757  n -= c;
758  }
759 }
760 
761 int
762 RemoteConnection::receive_file(const string &file, double end_time)
763 {
764  LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
765  if (fdin == -1)
767 
768  // FIXME: Do we want to be able to delete the file during writing?
769  FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
770  if (fd == -1)
771  throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
772 
773  int type = get_message_chunked(end_time);
774  do {
775  off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
776  if (!read_at_least(min_read, end_time))
777  RETURN(-1);
778  write_all(fd, buffer.data(), min_read);
779  chunked_data_left -= min_read;
780  buffer.erase(0, min_read);
781  } while (chunked_data_left);
782  RETURN(type);
783 }
784 
785 void
787 {
788  LOGCALL_VOID(REMOTE, "RemoteConnection::shutdown", NO_ARGS);
789 
790  if (fdin < 0) return;
791 
792  // We can be called from a destructor, so we can't throw an exception.
793  try {
794  send_message(MSG_SHUTDOWN, string(), 0.0);
795 #ifdef __WIN32__
796  HANDLE hin = fd_to_handle(fdin);
797  char dummy;
798  DWORD received;
799  BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
800  if (!ok && GetLastError() == ERROR_IO_PENDING) {
801  // Wait for asynchronous read to complete.
802  (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
803  }
804 #else
805  // Wait for the connection to be closed - when this happens
806  // poll()/select() will report that a read won't block.
807 # ifdef HAVE_POLL
808  struct pollfd fds;
809  fds.fd = fdin;
810  fds.events = POLLIN;
811  int res;
812  do {
813  res = poll(&fds, 1, -1);
814  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
815 # else
816  if (fdin < FD_SETSIZE) {
817  fd_set fdset;
818  FD_ZERO(&fdset);
819  FD_SET(fdin, &fdset);
820  int res;
821  do {
822  res = select(fdin + 1, &fdset, 0, 0, NULL);
823  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
824  }
825 # endif
826 #endif
827  } catch (...) {
828  }
829 }
830 
831 void
833 {
834  LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", NO_ARGS);
835 
836  if (fdin >= 0) {
838 
839  // If the same fd is used in both directions, don't close it twice.
840  if (fdin == fdout) fdout = -1;
841 
842  fdin = -1;
843  }
844 
845  if (fdout >= 0) {
847  fdout = -1;
848  }
849 }
850 
851 #ifdef __WIN32__
852 DWORD
853 RemoteConnection::calc_read_wait_msecs(double end_time)
854 {
855  if (end_time == 0.0)
856  return INFINITE;
857 
858  // Calculate how far in the future end_time is.
859  double time_diff = end_time - RealTime::now();
860 
861  // DWORD is unsigned, so we mustn't try and return a negative value.
862  if (time_diff < 0.0) {
863  throw_timeout("Timeout expired before starting read", context);
864  }
865  return static_cast<DWORD>(time_diff * 1000.0);
866 }
867 #endif
#define RETURN(A)
Definition: debuglog.h:493
#define Assert(COND)
Definition: omassert.h:122
Define the XAPIAN_NORETURN macro.
RemoteConnection class used by the remote backend.
include <sys/select.h> with portability workarounds.
std::enable_if< std::is_unsigned< T1 >::value &&std::is_unsigned< T2 >::value &&std::is_unsigned< R >::value, bool >::type add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Definition: overflow.h:58
Indicates an attempt to access a closed database.
Definition: error.h:1097
length encoded as a string
int get_message(std::string &result, double end_time)
Read one message from fdin.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
#define AssertRel(A, REL, B)
Definition: omassert.h:123
#define usual(COND)
Definition: config.h:576
RemoteConnection(const RemoteConnection &)
Don&#39;t allow copying.
Indicates a timeout expired while communicating with a remote database.
Definition: error.h:845
Provides wrappers with POSIXy semantics.
Arithmetic operations with overflow checks.
#define POLLSELECT
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
void sleep(double t)
Sleep until the time represented by this object.
Definition: realtime.h:127
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Definition: debuglog.h:488
STL namespace.
ssize_t send_or_write(const void *p, size_t n)
Helper which calls send() or write().
Utility functions for testing files.
static void throw_database_closed()
#define O_CLOEXEC
Definition: safefcntl.h:90
std::string encode_length(T len)
Encode a length as a variable-length string.
Definition: length.h:36
#define rare(COND)
Definition: config.h:575
std::string context
The context to report with errors.
static void throw_timeout(const char *, const string &)
Hierarchy of classes which Xapian can throw as exceptions.
const char * dummy[]
Definition: version_h.cc:7
static void throw_network_error_insane_message_length()
int fdin
The file descriptor used for reading.
Definition: fd.h:30
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
string str(int value)
Convert int to std::string.
Definition: str.cc:90
Wrapper class around a file descriptor to avoid leaks.
std::string buffer
Buffer to hold unprocessed input.
void close_fd_or_socket(int fd)
Definition: socket_utils.h:38
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Socket handling utilities.
void shutdown()
Shutdown the connection.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
double now()
Return the current time.
Definition: realtime.h:49
Indicates a problem communicating with a remote database.
Definition: error.h:803
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
Definition: realtime.h:110
void send_message(char type, const std::string &s, double end_time)
Send a message.
#define posixy_open
<unistd.h>, but with compat.
off_t file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:71
Various assertion macros.
Functions for handling a time or time interval in a double.
bool ready_to_read() const
See if there is data available to read.
#define LOGLINE(a, b)
Definition: debuglog.h:494
int fdout
The file descriptor used for writing.
int sniff_next_message_type(double end_time)
Check what the next message type is.
void do_close()
Close the connection.
include <fcntl.h>, but working around broken platforms.
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
Debug logging macros.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:487
#define CHUNKSIZE