xapian-core  2.0.0
remoteconnection.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2006-2025 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, see
18  * <https://www.gnu.org/licenses/>.
19  */
20 
21 #include <config.h>
22 
23 #include "remoteconnection.h"
24 
25 #include <xapian/error.h>
26 
27 #include "safefcntl.h"
28 #include "safeunistd.h"
29 
30 #ifdef HAVE_POLL_H
31 # include <poll.h>
32 #else
33 # include "safesysselect.h"
34 #endif
35 
36 #include <algorithm>
37 #include <cerrno>
38 #include <climits>
39 #include <cstdint>
40 #include <string>
41 #ifdef __WIN32__
42 # include <type_traits>
43 #endif
44 
45 #include "debuglog.h"
46 #include "fd.h"
47 #include "filetests.h"
48 #include "omassert.h"
49 #include "overflow.h"
50 #include "pack.h"
51 #include "posixy_wrapper.h"
52 #include "realtime.h"
53 #include "socket_utils.h"
54 
55 using namespace std;
56 
57 #define CHUNKSIZE 4096
58 
59 [[noreturn]]
60 static void
62 {
63  throw Xapian::DatabaseClosedError("Database has been closed");
64 }
65 
66 [[noreturn]]
67 static void
69 {
70  throw Xapian::NetworkError("Message too long for size to fit in off_t");
71 }
72 
73 [[noreturn]]
74 static void
75 throw_timeout(const char* msg, const string& context)
76 {
77  throw Xapian::NetworkTimeoutError(msg, context);
78 }
79 
80 #ifdef __WIN32__
81 static inline void
82 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
83 {
84  if (add_overflows(overlapped.Offset, n, overlapped.Offset))
85  ++overlapped.OffsetHigh;
86 }
87 #endif
88 
89 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
90  const string & context_)
91  : fdin(fdin_), fdout(fdout_), context(context_)
92 {
93 #ifdef __WIN32__
94  memset(&overlapped, 0, sizeof(overlapped));
95  overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
96  if (!overlapped.hEvent)
97  throw Xapian::NetworkError("Failed to setup OVERLAPPED",
98  context, -int(GetLastError()));
99 
100 #elif defined USE_SO_NOSIGPIPE
101  // SO_NOSIGPIPE is a non-standardised socket option supported by a number
102  // of platforms - at least DragonFlyBSD, FreeBSD, macOS (not older
103  // versions, e.g. 10.15 apparently lacks it), Solaris; notably not
104  // supported by Linux or OpenBSD though.
105  //
106  // We use it where supported due to one big advantage over POSIX's
107  // MSG_NOSIGNAL which is that we can just set it once for a socket whereas
108  // with MSG_NOSIGNAL we need to call send(..., MSG_NOSIGNAL) instead of
109  // write(...), but send() only works on sockets, so with MSG_NOSIGNAL any
110  // code which might be working with files or pipes as well as sockets needs
111  // conditional handling depending on whether the fd is a socket or not.
112  //
113  // SO_NOSIGPIPE is present on NetBSD, but it seems when using it we still
114  // get SIGPIPE (reproduced on NetBSD 9.3 and 10.0) so we avoid using it
115  // there.
116  int on = 1;
117  if (setsockopt(fdout, SOL_SOCKET, SO_NOSIGPIPE,
118  reinterpret_cast<char*>(&on), sizeof(on)) < 0) {
119  // Some platforms (including FreeBSD, macOS, DragonflyBSD) seem to
120  // fail with EBADF instead of ENOTSOCK when passed a non-socket so
121  // allow either. If the descriptor is actually not valid we'll report
122  // it the next time we try to use it (as we would when not trying to
123  // use SO_NOSIGPIPE so this actually gives a more consistent error
124  // across platforms.
125  if (errno != ENOTSOCK && errno != EBADF) {
126  throw Xapian::NetworkError("Couldn't set SO_NOSIGPIPE on socket",
127  errno);
128  }
129  }
130 #elif defined USE_MSG_NOSIGNAL
131  // We can use send(..., MSG_NOSIGNAL) to avoid generating SIGPIPE
132  // (MSG_NOSIGNAL was added in POSIX.1-2008). This seems to be pretty much
133  // universally supported by current Unix-like platforms, but older macOS
134  // and Solaris apparently didn't have it.
135  //
136  // If fdout is not a socket, we'll set send_flags = 0 when the first send()
137  // fails with ENOTSOCK and use write() instead from then on.
138 #else
139  // It's simplest to just ignore SIGPIPE. Not ideal, but it seems only old
140  // versions of macOS and of Solaris will end up here so let's not bother
141  // trying to do any clever trickery.
142  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
143  throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
144  }
145 #endif
146 }
147 
148 #ifdef __WIN32__
149 RemoteConnection::~RemoteConnection()
150 {
151  if (overlapped.hEvent)
152  CloseHandle(overlapped.hEvent);
153 }
154 #endif
155 
156 bool
158 {
159  LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
160 
161  if (buffer.length() >= min_len) RETURN(true);
162 
163 #ifdef __WIN32__
164  HANDLE hin = fd_to_handle(fdin);
165  do {
166  char buf[CHUNKSIZE];
167  DWORD received;
168  BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
169  if (!ok) {
170  int errcode = GetLastError();
171  if (errcode != ERROR_IO_PENDING)
172  throw Xapian::NetworkError("read failed", context, -errcode);
173  // Is asynch - just wait for the data to be received or a timeout.
174  DWORD waitrc;
175  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
176  if (waitrc != WAIT_OBJECT_0) {
177  LOGLINE(REMOTE, "read: timeout has expired");
178  throw_timeout("Timeout expired while trying to read", context);
179  }
180  // Get the final result of the read.
181  if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
182  throw Xapian::NetworkError("Failed to get overlapped result",
183  context, -int(GetLastError()));
184  }
185 
186  if (received == 0) {
187  RETURN(false);
188  }
189 
190  buffer.append(buf, received);
191 
192  // We must update the offset in the OVERLAPPED structure manually.
193  update_overlapped_offset(overlapped, received);
194  } while (buffer.length() < min_len);
195 #else
196  // If there's no end_time, just use blocking I/O.
197  if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
198  throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
199  context, errno);
200  }
201 
202  while (true) {
203  char buf[CHUNKSIZE];
204  ssize_t received = read(fdin, buf, sizeof(buf));
205 
206  if (received > 0) {
207  buffer.append(buf, received);
208  if (buffer.length() >= min_len) RETURN(true);
209  continue;
210  }
211 
212  if (received == 0) {
213  RETURN(false);
214  }
215 
216  LOGLINE(REMOTE, "read gave errno = " << errno);
217  if (errno == EINTR) continue;
218 
219  if (errno != EAGAIN)
220  throw Xapian::NetworkError("read failed", context, errno);
221 
222  Assert(end_time != 0.0);
223  while (true) {
224  // Calculate how far in the future end_time is.
225  double now = RealTime::now();
226  double time_diff = end_time - now;
227  // Check if the timeout has expired.
228  if (time_diff < 0) {
229  LOGLINE(REMOTE, "read: timeout has expired");
230  throw_timeout("Timeout expired while trying to read", context);
231  }
232 
233  // Wait until there is data, an error, or the timeout is reached.
234 # ifdef HAVE_POLL
235  struct pollfd fds;
236  fds.fd = fdin;
237  fds.events = POLLIN;
238  int poll_result = poll(&fds, 1, int(time_diff * 1000));
239  if (poll_result > 0) break;
240 
241  if (poll_result == 0)
242  throw_timeout("Timeout expired while trying to read", context);
243 
244  // EINTR means poll was interrupted by a signal. EAGAIN means that
245  // allocation of internal data structures failed.
246  if (errno != EINTR && errno != EAGAIN)
247  throw Xapian::NetworkError("poll failed during read",
248  context, errno);
249 # else
250  if (fdin >= FD_SETSIZE) {
251  // We can't block with a timeout, so just sleep and retry.
252  RealTime::sleep(now + min(0.001, time_diff / 4));
253  break;
254  }
255  fd_set fdset;
256  FD_ZERO(&fdset);
257  FD_SET(fdin, &fdset);
258 
259  struct timeval tv;
260  RealTime::to_timeval(time_diff, &tv);
261  int select_result = select(fdin + 1, &fdset, 0, 0, &tv);
262  if (select_result > 0) break;
263 
264  if (select_result == 0)
265  throw_timeout("Timeout expired while trying to read", context);
266 
267  // EINTR means select was interrupted by a signal. The Linux
268  // select(2) man page says: "Portable programs may wish to check
269  // for EAGAIN and loop, just as with EINTR" and that seems to be
270  // necessary for cygwin at least.
271  if (errno != EINTR && errno != EAGAIN)
272  throw Xapian::NetworkError("select failed during read",
273  context, errno);
274 # endif
275  }
276  }
277 #endif
278  RETURN(true);
279 }
280 
281 #ifndef __WIN32__
282 ssize_t
283 RemoteConnection::send_or_write(const void* p, size_t len)
284 {
285 # ifdef USE_MSG_NOSIGNAL
286  if (send_flags) {
287  ssize_t n = send(fdout, p, len, send_flags);
288  if (usual(n >= 0 || errno != ENOTSOCK)) return n;
289  // In some testcases in the testsuite and in xapian-progsrv (in some
290  // cases) fdout won't be a socket. Clear send_flags so we only try
291  // send() once in this case.
292  send_flags = 0;
293  }
294 # endif
295  return write(fdout, p, len);
296 }
297 #endif
298 
299 void
300 RemoteConnection::send_message(char type, string_view message, double end_time)
301 {
302  LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
303  if (fdout == -1)
305 
306  string header;
307  header += type;
308  pack_uint(header, message.size());
309  string_view header_view = header;
310 
311 #ifdef __WIN32__
312  HANDLE hout = fd_to_handle(fdout);
313  const string_view* str = &header_view;
314 
315  size_t count = 0;
316  while (true) {
317  DWORD n;
318  BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
319  if (!ok) {
320  int errcode = GetLastError();
321  if (errcode != ERROR_IO_PENDING)
322  throw Xapian::NetworkError("write failed", context, -errcode);
323  // Just wait for the data to be sent, or a timeout.
324  DWORD waitrc;
325  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
326  if (waitrc != WAIT_OBJECT_0) {
327  LOGLINE(REMOTE, "write: timeout has expired");
328  throw_timeout("Timeout expired while trying to write", context);
329  }
330  // Get the final result.
331  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
332  throw Xapian::NetworkError("Failed to get overlapped result",
333  context, -int(GetLastError()));
334  }
335 
336  count += n;
337 
338  // We must update the offset in the OVERLAPPED structure manually.
339  update_overlapped_offset(overlapped, n);
340 
341  if (count == str->size()) {
342  if (str == &message || message.empty()) return;
343  str = &message;
344  count = 0;
345  }
346  }
347 #else
348  // If there's no end_time, just use blocking I/O.
349  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
350  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
351  context, errno);
352  }
353 
354  const string_view* str = &header_view;
355 
356  size_t count = 0;
357  while (true) {
358  // We've set write to non-blocking, so just try writing as there
359  // will usually be space.
360  ssize_t n = send_or_write(str->data() + count, str->size() - count);
361 
362  if (n >= 0) {
363  count += n;
364  if (count == str->size()) {
365  if (str == &message || message.empty()) return;
366  str = &message;
367  count = 0;
368  }
369  continue;
370  }
371 
372  LOGLINE(REMOTE, "write gave errno = " << errno);
373  if (errno == EINTR) continue;
374 
375  if (errno != EAGAIN)
376  throw Xapian::NetworkError("write failed", context, errno);
377 
378  double now = RealTime::now();
379  double time_diff = end_time - now;
380  if (time_diff < 0) {
381  LOGLINE(REMOTE, "write: timeout has expired");
382  throw_timeout("Timeout expired while trying to write", context);
383  }
384 
385  // Wait until there is space or the timeout is reached.
386 # ifdef HAVE_POLL
387  struct pollfd fds;
388  fds.fd = fdout;
389  fds.events = POLLOUT;
390  int result = poll(&fds, 1, int(time_diff * 1000));
391 # define POLLSELECT "poll"
392 # else
393  if (fdout >= FD_SETSIZE) {
394  // We can't block with a timeout, so just sleep and retry.
395  RealTime::sleep(now + min(0.001, time_diff / 4));
396  continue;
397  }
398 
399  fd_set fdset;
400  FD_ZERO(&fdset);
401  FD_SET(fdout, &fdset);
402 
403  struct timeval tv;
404  RealTime::to_timeval(time_diff, &tv);
405  int result = select(fdout + 1, 0, &fdset, 0, &tv);
406 # define POLLSELECT "select"
407 # endif
408 
409  if (result < 0) {
410  if (errno == EINTR || errno == EAGAIN) {
411  // EINTR/EAGAIN means select was interrupted by a signal.
412  // We could just retry the poll/select, but it's easier to just
413  // retry the write.
414  continue;
415  }
416  throw Xapian::NetworkError(POLLSELECT " failed during write",
417  context, errno);
418 # undef POLLSELECT
419  }
420 
421  if (result == 0)
422  throw_timeout("Timeout expired while trying to write", context);
423  }
424 #endif
425 }
426 
427 void
428 RemoteConnection::send_file(char type, int fd, double end_time)
429 {
430  LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
431  if (fdout == -1)
433 
434  auto size = file_size(fd);
435  if (errno)
436  throw Xapian::NetworkError("Couldn't stat file to send", errno);
437  // FIXME: Use sendfile() or similar if available?
438 
439  char buf[CHUNKSIZE];
440  buf[0] = type;
441  size_t c = 1;
442  {
443  string enc_size;
444  pack_uint(enc_size, size);
445  c += enc_size.size();
446  // An encoded length should be just a few bytes.
447  AssertRel(c, <=, sizeof(buf));
448  memcpy(buf + 1, enc_size.data(), enc_size.size());
449  }
450 
451 #ifdef __WIN32__
452  HANDLE hout = fd_to_handle(fdout);
453  size_t count = 0;
454  while (true) {
455  DWORD n;
456  BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
457  if (!ok) {
458  int errcode = GetLastError();
459  if (errcode != ERROR_IO_PENDING)
460  throw Xapian::NetworkError("write failed", context, -errcode);
461  // Just wait for the data to be sent, or a timeout.
462  DWORD waitrc;
463  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
464  if (waitrc != WAIT_OBJECT_0) {
465  LOGLINE(REMOTE, "write: timeout has expired");
466  throw_timeout("Timeout expired while trying to write", context);
467  }
468  // Get the final result.
469  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
470  throw Xapian::NetworkError("Failed to get overlapped result",
471  context, -int(GetLastError()));
472  }
473 
474  count += n;
475 
476  // We must update the offset in the OVERLAPPED structure manually.
477  update_overlapped_offset(overlapped, n);
478 
479  if (count == c) {
480  if (size == 0) return;
481 
482  ssize_t res;
483  do {
484  res = read(fd, buf, sizeof(buf));
485  } while (res < 0 && errno == EINTR);
486  if (res < 0) throw Xapian::NetworkError("read failed", errno);
487  c = size_t(res);
488 
489  size -= c;
490  count = 0;
491  }
492  }
493 #else
494  // If there's no end_time, just use blocking I/O.
495  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
496  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
497  context, errno);
498  }
499 
500  size_t count = 0;
501  while (true) {
502  // We've set write to non-blocking, so just try writing as there
503  // will usually be space.
504  ssize_t n = send_or_write(buf + count, c - count);
505 
506  if (n >= 0) {
507  count += n;
508  if (count == c) {
509  if (size == 0) return;
510 
511  ssize_t res;
512  do {
513  res = read(fd, buf, sizeof(buf));
514  } while (res < 0 && errno == EINTR);
515  if (res < 0) throw Xapian::NetworkError("read failed", errno);
516  c = size_t(res);
517 
518  size -= c;
519  count = 0;
520  }
521  continue;
522  }
523 
524  LOGLINE(REMOTE, "write gave errno = " << errno);
525  if (errno == EINTR) continue;
526 
527  if (errno != EAGAIN)
528  throw Xapian::NetworkError("write failed", context, errno);
529 
530  double now = RealTime::now();
531  double time_diff = end_time - now;
532  if (time_diff < 0) {
533  LOGLINE(REMOTE, "write: timeout has expired");
534  throw_timeout("Timeout expired while trying to write", context);
535  }
536 
537  // Wait until there is space or the timeout is reached.
538 # ifdef HAVE_POLL
539  struct pollfd fds;
540  fds.fd = fdout;
541  fds.events = POLLOUT;
542  int result = poll(&fds, 1, int(time_diff * 1000));
543 # define POLLSELECT "poll"
544 # else
545  if (fdout >= FD_SETSIZE) {
546  // We can't block with a timeout, so just sleep and retry.
547  RealTime::sleep(now + min(0.001, time_diff / 4));
548  continue;
549  }
550 
551  fd_set fdset;
552  FD_ZERO(&fdset);
553  FD_SET(fdout, &fdset);
554 
555  struct timeval tv;
556  RealTime::to_timeval(time_diff, &tv);
557  int result = select(fdout + 1, 0, &fdset, 0, &tv);
558 # define POLLSELECT "select"
559 # endif
560 
561  if (result < 0) {
562  if (errno == EINTR || errno == EAGAIN) {
563  // EINTR/EAGAIN means select was interrupted by a signal.
564  // We could just retry the poll/select, but it's easier to just
565  // retry the write.
566  continue;
567  }
568  throw Xapian::NetworkError(POLLSELECT " failed during write",
569  context, errno);
570 # undef POLLSELECT
571  }
572 
573  if (result == 0)
574  throw_timeout("Timeout expired while trying to write", context);
575  }
576 #endif
577 }
578 
579 int
581 {
582  LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
583  if (fdin == -1)
585 
586  if (!read_at_least(1, end_time))
587  RETURN(-1);
588  unsigned char type = buffer[0];
589  RETURN(type);
590 }
591 
592 int
594 {
595  LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
596  if (fdin == -1)
598 
599  if (!read_at_least(2, end_time))
600  RETURN(-1);
601  // This code assume things about the pack_uint() encoding in order to
602  // handle partial reads.
603  size_t len = static_cast<unsigned char>(buffer[1]);
604  if (len < 128) {
605  if (!read_at_least(len + 2, end_time))
606  RETURN(-1);
607  result.assign(buffer.data() + 2, len);
608  unsigned char type = buffer[0];
609  buffer.erase(0, len + 2);
610  RETURN(type);
611  }
612 
613  // We know the message payload is at least 128 bytes of data, and if we
614  // read that much we'll definitely have the whole of the length.
615  if (!read_at_least(128 + 2, end_time))
616  RETURN(-1);
617  const char* p = buffer.data();
618  const char* p_end = p + buffer.size();
619  ++p;
620  if (!unpack_uint(&p, p_end, &len)) {
621  RETURN(-1);
622  }
623  size_t header_len = (p - buffer.data());
624  if (!read_at_least(header_len + len, end_time))
625  RETURN(-1);
626  result.assign(buffer.data() + header_len, len);
627  unsigned char type = buffer[0];
628  buffer.erase(0, header_len + len);
629  RETURN(type);
630 }
631 
632 int
634 {
635  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
636 
637  if (fdin == -1)
639 
640  if (!read_at_least(2, end_time))
641  RETURN(-1);
642  // This code assume things about the pack_uint() encoding in order to
643  // handle partial reads.
644  uint_least64_t len = static_cast<unsigned char>(buffer[1]);
645  if (len < 128) {
646  chunked_data_left = off_t(len);
647  char type = buffer[0];
648  buffer.erase(0, 2);
649  RETURN(type);
650  }
651 
652  // We know the message payload is at least 128 bytes of data, and if we
653  // read that much we'll definitely have the whole of the length.
654  if (!read_at_least(128 + 2, end_time))
655  RETURN(-1);
656  const char* p = buffer.data();
657  const char* p_end = p + buffer.size();
658  ++p;
659  if (!unpack_uint(&p, p_end, &len)) {
660  RETURN(-1);
661  }
662  chunked_data_left = off_t(len);
663  // Check that the value of len fits in an off_t without loss.
664  if (rare(uint_least64_t(chunked_data_left) != len)) {
666  }
667  size_t header_len = (p - buffer.data());
668  unsigned char type = buffer[0];
669  buffer.erase(0, header_len);
670  RETURN(type);
671 }
672 
673 int
674 RemoteConnection::get_message_chunk(string &result, size_t at_least,
675  double end_time)
676 {
677  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
678  if (fdin == -1)
680 
681  if (at_least <= result.size()) RETURN(true);
682  at_least -= result.size();
683 
684  bool read_enough = (off_t(at_least) <= chunked_data_left);
685  if (!read_enough) at_least = size_t(chunked_data_left);
686 
687  if (!read_at_least(at_least, end_time))
688  RETURN(-1);
689 
690  size_t retlen = min(off_t(buffer.size()), chunked_data_left);
691  result.append(buffer, 0, retlen);
692  buffer.erase(0, retlen);
693  chunked_data_left -= retlen;
694 
695  RETURN(int(read_enough));
696 }
697 
699 static void
700 write_all(int fd, const char * p, size_t n)
701 {
702  while (n) {
703  ssize_t c = write(fd, p, n);
704  if (c < 0) {
705  if (errno == EINTR) continue;
706  throw Xapian::NetworkError("Error writing to file", errno);
707  }
708  p += c;
709  n -= c;
710  }
711 }
712 
713 int
714 RemoteConnection::receive_file(const string &file, double end_time)
715 {
716  LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
717  if (fdin == -1)
719 
720  // FIXME: Do we want to be able to delete the file during writing?
721  FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
722  if (fd == -1)
723  throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
724 
725  int type = get_message_chunked(end_time);
726  do {
727  off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
728  if (!read_at_least(min_read, end_time))
729  RETURN(-1);
730  write_all(fd, buffer.data(), min_read);
731  chunked_data_left -= min_read;
732  buffer.erase(0, min_read);
733  } while (chunked_data_left);
734  RETURN(type);
735 }
736 
737 void
739 {
740  LOGCALL_VOID(REMOTE, "RemoteConnection::shutdown", NO_ARGS);
741 
742  if (fdin < 0) return;
743 
744  // We can be called from a destructor, so we can't throw an exception.
745  try {
746  send_message(MSG_SHUTDOWN, {}, 0.0);
747 #ifdef __WIN32__
748  HANDLE hin = fd_to_handle(fdin);
749  char dummy;
750  DWORD received;
751  BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
752  if (!ok && GetLastError() == ERROR_IO_PENDING) {
753  // Wait for asynchronous read to complete.
754  (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
755  }
756 #else
757  // Wait for the connection to be closed - when this happens
758  // poll()/select() will report that a read won't block.
759 # ifdef HAVE_POLL
760  struct pollfd fds;
761  fds.fd = fdin;
762  fds.events = POLLIN;
763  int res;
764  do {
765  res = poll(&fds, 1, -1);
766  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
767 # else
768  if (fdin < FD_SETSIZE) {
769  fd_set fdset;
770  FD_ZERO(&fdset);
771  FD_SET(fdin, &fdset);
772  int res;
773  do {
774  res = select(fdin + 1, &fdset, 0, 0, NULL);
775  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
776  }
777 # endif
778 #endif
779  } catch (...) {
780  }
781 }
782 
783 void
785 {
786  LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", NO_ARGS);
787 
788  if (fdin >= 0) {
790 
791  // If the same fd is used in both directions, don't close it twice.
792  if (fdin == fdout) fdout = -1;
793 
794  fdin = -1;
795  }
796 
797  if (fdout >= 0) {
799  fdout = -1;
800  }
801 }
802 
803 #ifdef __WIN32__
804 DWORD
805 RemoteConnection::calc_read_wait_msecs(double end_time)
806 {
807  if (end_time == 0.0)
808  return INFINITE;
809 
810  // Calculate how far in the future end_time is.
811  double time_diff = end_time - RealTime::now();
812 
813  // DWORD is unsigned, so we mustn't try and return a negative value.
814  if (time_diff < 0.0) {
815  throw_timeout("Timeout expired before starting read", context);
816  }
817  return static_cast<DWORD>(time_diff * 1000.0);
818 }
819 #endif
Definition: fd.h:30
void send_message(char type, std::string_view s, double end_time)
Send a message.
int fdin
The file descriptor used for reading.
std::string buffer
Buffer to hold unprocessed input.
RemoteConnection(const RemoteConnection &)
Don't allow copying.
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
int fdout
The file descriptor used for writing.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
void do_close()
Close the connection.
int get_message(std::string &result, double end_time)
Read one message from fdin.
void shutdown()
Shutdown the connection.
int sniff_next_message_type(double end_time)
Check what the next message type is.
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
std::string context
The context to report with errors.
ssize_t send_or_write(const void *p, size_t n)
Helper which calls send() or write().
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
Indicates an attempt to access a closed database.
Definition: error.h:1085
Indicates a problem communicating with a remote database.
Definition: error.h:791
Indicates a timeout expired while communicating with a remote database.
Definition: error.h:833
#define usual(COND)
Definition: config.h:608
#define rare(COND)
Definition: config.h:607
PositionList * p
Debug logging macros.
#define RETURN(...)
Definition: debuglog.h:484
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:478
#define LOGLINE(a, b)
Definition: debuglog.h:485
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Definition: debuglog.h:479
Hierarchy of classes which Xapian can throw as exceptions.
Wrapper class around a file descriptor to avoid leaks.
Utility functions for testing files.
file_size_type file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:76
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
double now()
Return the current time.
Definition: realtime.h:49
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
Definition: realtime.h:110
void sleep(double t)
Sleep until the time represented by this object.
Definition: realtime.h:127
string str(int value)
Convert int to std::string.
Definition: str.cc:91
Various assertion macros.
#define AssertRel(A, REL, B)
Definition: omassert.h:123
#define Assert(COND)
Definition: omassert.h:122
Arithmetic operations with overflow checks.
std::enable_if_t< std::is_unsigned_v< T1 > &&std::is_unsigned_v< T2 > &&std::is_unsigned_v< R >, bool > add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Definition: overflow.h:58
Pack types into strings and unpack them again.
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
Definition: pack.h:346
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Definition: pack.h:315
Provides wrappers with POSIXy semantics.
#define posixy_open
Functions for handling a time or time interval in a double.
#define CHUNKSIZE
#define POLLSELECT
static void throw_database_closed()
static void throw_timeout(const char *msg, const string &context)
static void throw_network_error_message_too_long_for_off_t()
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
RemoteConnection class used by the remote backend.
@ MSG_SHUTDOWN
include <fcntl.h>, but working around broken platforms.
#define O_CLOEXEC
Definition: safefcntl.h:89
include <sys/select.h> with portability workarounds.
<unistd.h>, but with compat.
Socket handling utilities.
void close_fd_or_socket(int fd)
Definition: socket_utils.h:119
const char * dummy[]
Definition: version_h.cc:7