xapian-core  1.4.25
remoteconnection.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2006-2023 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include "remoteconnection.h"
24 
25 #include <xapian/error.h>
26 
27 #include "safefcntl.h"
28 #include "safeunistd.h"
29 
30 #ifdef HAVE_POLL_H
31 # include <poll.h>
32 #else
33 # include "safesysselect.h"
34 #endif
35 
36 #include <algorithm>
37 #include <cerrno>
38 #include <climits>
39 #include <cstdint>
40 #include <string>
41 #ifdef __WIN32__
42 # include <type_traits>
43 #endif
44 
45 #include "debuglog.h"
46 #include "fd.h"
47 #include "filetests.h"
48 #include "noreturn.h"
49 #include "omassert.h"
50 #include "overflow.h"
51 #include "posixy_wrapper.h"
52 #include "realtime.h"
53 #include "length.h"
54 #include "socket_utils.h"
55 
56 using namespace std;
57 
58 #define CHUNKSIZE 4096
59 
60 XAPIAN_NORETURN(static void throw_database_closed());
61 static void
63 {
64  throw Xapian::DatabaseClosedError("Database has been closed");
65 }
66 
67 XAPIAN_NORETURN(static void throw_network_error_insane_message_length());
68 static void
70 {
71  throw Xapian::NetworkError("Insane message length specified!");
72 }
73 
74 XAPIAN_NORETURN(static void throw_timeout(const char*, const string&));
75 static void
76 throw_timeout(const char* msg, const string& context)
77 {
78  throw Xapian::NetworkTimeoutError(msg, context);
79 }
80 
81 #ifdef __WIN32__
82 static inline void
83 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
84 {
85  if (add_overflows(overlapped.Offset, n, overlapped.Offset))
86  ++overlapped.OffsetHigh;
87 }
88 #endif
89 
90 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
91  const string & context_)
92  : fdin(fdin_), fdout(fdout_), context(context_)
93 {
94 #ifdef __WIN32__
95  memset(&overlapped, 0, sizeof(overlapped));
96  overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
97  if (!overlapped.hEvent)
98  throw Xapian::NetworkError("Failed to setup OVERLAPPED",
99  context, -int(GetLastError()));
100 
101 #endif
102 }
103 
104 #ifdef __WIN32__
105 RemoteConnection::~RemoteConnection()
106 {
107  if (overlapped.hEvent)
108  CloseHandle(overlapped.hEvent);
109 }
110 #endif
111 
112 bool
114 {
115  LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
116 
117  if (buffer.length() >= min_len) RETURN(true);
118 
119 #ifdef __WIN32__
120  HANDLE hin = fd_to_handle(fdin);
121  do {
122  char buf[CHUNKSIZE];
123  DWORD received;
124  BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
125  if (!ok) {
126  int errcode = GetLastError();
127  if (errcode != ERROR_IO_PENDING)
128  throw Xapian::NetworkError("read failed", context, -errcode);
129  // Is asynch - just wait for the data to be received or a timeout.
130  DWORD waitrc;
131  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
132  if (waitrc != WAIT_OBJECT_0) {
133  LOGLINE(REMOTE, "read: timeout has expired");
134  throw_timeout("Timeout expired while trying to read", context);
135  }
136  // Get the final result of the read.
137  if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
138  throw Xapian::NetworkError("Failed to get overlapped result",
139  context, -int(GetLastError()));
140  }
141 
142  if (received == 0) {
143  RETURN(false);
144  }
145 
146  buffer.append(buf, received);
147 
148  // We must update the offset in the OVERLAPPED structure manually.
149  update_overlapped_offset(overlapped, received);
150  } while (buffer.length() < min_len);
151 #else
152  // If there's no end_time, just use blocking I/O.
153  if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
154  throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
155  context, errno);
156  }
157 
158  while (true) {
159  char buf[CHUNKSIZE];
160  ssize_t received = read(fdin, buf, sizeof(buf));
161 
162  if (received > 0) {
163  buffer.append(buf, received);
164  if (buffer.length() >= min_len) RETURN(true);
165  continue;
166  }
167 
168  if (received == 0) {
169  RETURN(false);
170  }
171 
172  LOGLINE(REMOTE, "read gave errno = " << errno);
173  if (errno == EINTR) continue;
174 
175  if (errno != EAGAIN)
176  throw Xapian::NetworkError("read failed", context, errno);
177 
178  Assert(end_time != 0.0);
179  while (true) {
180  // Calculate how far in the future end_time is.
181  double now = RealTime::now();
182  double time_diff = end_time - now;
183  // Check if the timeout has expired.
184  if (time_diff < 0) {
185  LOGLINE(REMOTE, "read: timeout has expired");
186  throw_timeout("Timeout expired while trying to read", context);
187  }
188 
189  // Wait until there is data, an error, or the timeout is reached.
190 # ifdef HAVE_POLL
191  struct pollfd fds;
192  fds.fd = fdin;
193  fds.events = POLLIN;
194  int poll_result = poll(&fds, 1, int(time_diff * 1000));
195  if (poll_result > 0) break;
196 
197  if (poll_result == 0)
198  throw_timeout("Timeout expired while trying to read", context);
199 
200  // EINTR means poll was interrupted by a signal. EAGAIN means that
201  // allocation of internal data structures failed.
202  if (errno != EINTR && errno != EAGAIN)
203  throw Xapian::NetworkError("poll failed during read",
204  context, errno);
205 # else
206  if (fdin >= FD_SETSIZE) {
207  // We can't block with a timeout, so just sleep and retry.
208  RealTime::sleep(now + min(0.001, time_diff / 4));
209  break;
210  }
211  fd_set fdset;
212  FD_ZERO(&fdset);
213  FD_SET(fdin, &fdset);
214 
215  struct timeval tv;
216  RealTime::to_timeval(time_diff, &tv);
217  int select_result = select(fdin + 1, &fdset, 0, 0, &tv);
218  if (select_result > 0) break;
219 
220  if (select_result == 0)
221  throw_timeout("Timeout expired while trying to read", context);
222 
223  // EINTR means select was interrupted by a signal. The Linux
224  // select(2) man page says: "Portable programs may wish to check
225  // for EAGAIN and loop, just as with EINTR" and that seems to be
226  // necessary for cygwin at least.
227  if (errno != EINTR && errno != EAGAIN)
228  throw Xapian::NetworkError("select failed during read",
229  context, errno);
230 # endif
231  }
232  }
233 #endif
234  RETURN(true);
235 }
236 
237 bool
239 {
240  LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
241  if (fdin == -1)
243 
244  if (!buffer.empty()) RETURN(true);
245 
246  // See if there's data available to be read.
247 #ifdef HAVE_POLL
248  struct pollfd fds;
249  fds.fd = fdin;
250  fds.events = POLLIN;
251  RETURN(poll(&fds, 1, 0) > 0);
252 #else
253 # ifndef __WIN32__
254  if (fdin >= FD_SETSIZE) {
255  // Not ideal, but OK for how this method is currently used.
256  RETURN(true);
257  }
258 # endif
259  fd_set fdset;
260  FD_ZERO(&fdset);
261  FD_SET(fdin, &fdset);
262 
263  struct timeval tv;
264  tv.tv_sec = 0;
265  tv.tv_usec = 0;
266  RETURN(select(fdin + 1, &fdset, 0, 0, &tv) > 0);
267 #endif
268 }
269 
270 void
271 RemoteConnection::send_message(char type, const string &message,
272  double end_time)
273 {
274  LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
275  if (fdout == -1)
277 
278  string header;
279  header += type;
280  header += encode_length(message.size());
281 
282 #ifdef __WIN32__
283  HANDLE hout = fd_to_handle(fdout);
284  const string * str = &header;
285 
286  size_t count = 0;
287  while (true) {
288  DWORD n;
289  BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
290  if (!ok) {
291  int errcode = GetLastError();
292  if (errcode != ERROR_IO_PENDING)
293  throw Xapian::NetworkError("write failed", context, -errcode);
294  // Just wait for the data to be sent, or a timeout.
295  DWORD waitrc;
296  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
297  if (waitrc != WAIT_OBJECT_0) {
298  LOGLINE(REMOTE, "write: timeout has expired");
299  throw_timeout("Timeout expired while trying to write", context);
300  }
301  // Get the final result.
302  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
303  throw Xapian::NetworkError("Failed to get overlapped result",
304  context, -int(GetLastError()));
305  }
306 
307  count += n;
308 
309  // We must update the offset in the OVERLAPPED structure manually.
310  update_overlapped_offset(overlapped, n);
311 
312  if (count == str->size()) {
313  if (str == &message || message.empty()) return;
314  str = &message;
315  count = 0;
316  }
317  }
318 #else
319  // If there's no end_time, just use blocking I/O.
320  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
321  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
322  context, errno);
323  }
324 
325  const string * str = &header;
326 
327  size_t count = 0;
328  while (true) {
329  // We've set write to non-blocking, so just try writing as there
330  // will usually be space.
331 #if defined MSG_NOSIGNAL && !defined SO_NOSIGPIPE
332  ssize_t n = send(fdout, str->data() + count, str->size() - count,
333  MSG_NOSIGNAL);
334  if (n < 0 && errno == ENOTSOCK) {
335  // In some testcases in the testsuite and in xapian-progsrv (in
336  // some cases) the fd won't be a socket.
337  n = write(fdout, str->data() + count, str->size() - count);
338  }
339 #else
340  ssize_t n = write(fdout, str->data() + count, str->size() - count);
341 #endif
342 
343  if (n >= 0) {
344  count += n;
345  if (count == str->size()) {
346  if (str == &message || message.empty()) return;
347  str = &message;
348  count = 0;
349  }
350  continue;
351  }
352 
353  LOGLINE(REMOTE, "write gave errno = " << errno);
354  if (errno == EINTR) continue;
355 
356  if (errno != EAGAIN)
357  throw Xapian::NetworkError("write failed", context, errno);
358 
359  double now = RealTime::now();
360  double time_diff = end_time - now;
361  if (time_diff < 0) {
362  LOGLINE(REMOTE, "write: timeout has expired");
363  throw_timeout("Timeout expired while trying to write", context);
364  }
365 
366  // Wait until there is space or the timeout is reached.
367 # ifdef HAVE_POLL
368  struct pollfd fds;
369  fds.fd = fdout;
370  fds.events = POLLOUT;
371  int result = poll(&fds, 1, int(time_diff * 1000));
372 # define POLLSELECT "poll"
373 # else
374  if (fdout >= FD_SETSIZE) {
375  // We can't block with a timeout, so just sleep and retry.
376  RealTime::sleep(now + min(0.001, time_diff / 4));
377  continue;
378  }
379 
380  fd_set fdset;
381  FD_ZERO(&fdset);
382  FD_SET(fdout, &fdset);
383 
384  struct timeval tv;
385  RealTime::to_timeval(time_diff, &tv);
386  int result = select(fdout + 1, 0, &fdset, 0, &tv);
387 # define POLLSELECT "select"
388 # endif
389 
390  if (result < 0) {
391  if (errno == EINTR || errno == EAGAIN) {
392  // EINTR/EAGAIN means select was interrupted by a signal.
393  // We could just retry the poll/select, but it's easier to just
394  // retry the write.
395  continue;
396  }
397  throw Xapian::NetworkError(POLLSELECT " failed during write",
398  context, errno);
399 # undef POLLSELECT
400  }
401 
402  if (result == 0)
403  throw_timeout("Timeout expired while trying to write", context);
404  }
405 #endif
406 }
407 
408 void
409 RemoteConnection::send_file(char type, int fd, double end_time)
410 {
411  LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
412  if (fdout == -1)
414 
415  off_t size = file_size(fd);
416  if (errno)
417  throw Xapian::NetworkError("Couldn't stat file to send", errno);
418  // FIXME: Use sendfile() or similar if available?
419 
420  char buf[CHUNKSIZE];
421  buf[0] = type;
422  size_t c = 1;
423  {
424  string enc_size = encode_length(size);
425  c += enc_size.size();
426  // An encoded length should be just a few bytes.
427  AssertRel(c, <=, sizeof(buf));
428  memcpy(buf + 1, enc_size.data(), enc_size.size());
429  }
430 
431 #ifdef __WIN32__
432  HANDLE hout = fd_to_handle(fdout);
433  size_t count = 0;
434  while (true) {
435  DWORD n;
436  BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
437  if (!ok) {
438  int errcode = GetLastError();
439  if (errcode != ERROR_IO_PENDING)
440  throw Xapian::NetworkError("write failed", context, -errcode);
441  // Just wait for the data to be sent, or a timeout.
442  DWORD waitrc;
443  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
444  if (waitrc != WAIT_OBJECT_0) {
445  LOGLINE(REMOTE, "write: timeout has expired");
446  throw_timeout("Timeout expired while trying to write", context);
447  }
448  // Get the final result.
449  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
450  throw Xapian::NetworkError("Failed to get overlapped result",
451  context, -int(GetLastError()));
452  }
453 
454  count += n;
455 
456  // We must update the offset in the OVERLAPPED structure manually.
457  update_overlapped_offset(overlapped, n);
458 
459  if (count == c) {
460  if (size == 0) return;
461 
462  ssize_t res;
463  do {
464  res = read(fd, buf, sizeof(buf));
465  } while (res < 0 && errno == EINTR);
466  if (res < 0) throw Xapian::NetworkError("read failed", errno);
467  c = size_t(res);
468 
469  size -= c;
470  count = 0;
471  }
472  }
473 #else
474  // If there's no end_time, just use blocking I/O.
475  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
476  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
477  context, errno);
478  }
479 
480  size_t count = 0;
481  while (true) {
482  // We've set write to non-blocking, so just try writing as there
483  // will usually be space.
484 #if defined MSG_NOSIGNAL && !defined SO_NOSIGPIPE
485  ssize_t n = send(fdout, buf + count, c - count, MSG_NOSIGNAL);
486  if (n < 0 && errno == ENOTSOCK) {
487  // In some testcases in the testsuite and in xapian-progsrv (in
488  // some cases) the fd won't be a socket.
489  n = write(fdout, buf + count, c - count);
490  }
491 #else
492  ssize_t n = write(fdout, buf + count, c - count);
493 #endif
494 
495  if (n >= 0) {
496  count += n;
497  if (count == c) {
498  if (size == 0) return;
499 
500  ssize_t res;
501  do {
502  res = read(fd, buf, sizeof(buf));
503  } while (res < 0 && errno == EINTR);
504  if (res < 0) throw Xapian::NetworkError("read failed", errno);
505  c = size_t(res);
506 
507  size -= c;
508  count = 0;
509  }
510  continue;
511  }
512 
513  LOGLINE(REMOTE, "write gave errno = " << errno);
514  if (errno == EINTR) continue;
515 
516  if (errno != EAGAIN)
517  throw Xapian::NetworkError("write failed", context, errno);
518 
519  double now = RealTime::now();
520  double time_diff = end_time - now;
521  if (time_diff < 0) {
522  LOGLINE(REMOTE, "write: timeout has expired");
523  throw_timeout("Timeout expired while trying to write", context);
524  }
525 
526  // Wait until there is space or the timeout is reached.
527 # ifdef HAVE_POLL
528  struct pollfd fds;
529  fds.fd = fdout;
530  fds.events = POLLOUT;
531  int result = poll(&fds, 1, int(time_diff * 1000));
532 # define POLLSELECT "poll"
533 # else
534  if (fdout >= FD_SETSIZE) {
535  // We can't block with a timeout, so just sleep and retry.
536  RealTime::sleep(now + min(0.001, time_diff / 4));
537  continue;
538  }
539 
540  fd_set fdset;
541  FD_ZERO(&fdset);
542  FD_SET(fdout, &fdset);
543 
544  struct timeval tv;
545  RealTime::to_timeval(time_diff, &tv);
546  int result = select(fdout + 1, 0, &fdset, 0, &tv);
547 # define POLLSELECT "select"
548 # endif
549 
550  if (result < 0) {
551  if (errno == EINTR || errno == EAGAIN) {
552  // EINTR/EAGAIN means select was interrupted by a signal.
553  // We could just retry the poll/select, but it's easier to just
554  // retry the write.
555  continue;
556  }
557  throw Xapian::NetworkError(POLLSELECT " failed during write",
558  context, errno);
559 # undef POLLSELECT
560  }
561 
562  if (result == 0)
563  throw_timeout("Timeout expired while trying to write", context);
564  }
565 #endif
566 }
567 
568 int
570 {
571  LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
572  if (fdin == -1)
574 
575  if (!read_at_least(1, end_time))
576  RETURN(-1);
577  unsigned char type = buffer[0];
578  RETURN(type);
579 }
580 
581 int
583 {
584  LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
585  if (fdin == -1)
587 
588  if (!read_at_least(2, end_time))
589  RETURN(-1);
590  size_t len = static_cast<unsigned char>(buffer[1]);
591  if (!read_at_least(len + 2, end_time))
592  RETURN(-1);
593  if (len != 0xff) {
594  result.assign(buffer.data() + 2, len);
595  unsigned char type = buffer[0];
596  buffer.erase(0, len + 2);
597  RETURN(type);
598  }
599  len = 0;
600  string::const_iterator i = buffer.begin() + 2;
601  unsigned char ch;
602  int shift = 0;
603  do {
604  if (i == buffer.end() || shift > 28) {
605  // Something is very wrong...
607  }
608  ch = *i++;
609  len |= size_t(ch & 0x7f) << shift;
610  shift += 7;
611  } while ((ch & 0x80) == 0);
612  len += 255;
613  size_t header_len = (i - buffer.begin());
614  if (!read_at_least(header_len + len, end_time))
615  RETURN(-1);
616  result.assign(buffer.data() + header_len, len);
617  unsigned char type = buffer[0];
618  buffer.erase(0, header_len + len);
619  RETURN(type);
620 }
621 
622 int
624 {
625  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
626 
627  if (fdin == -1)
629 
630  if (!read_at_least(2, end_time))
631  RETURN(-1);
632  uint_least64_t len = static_cast<unsigned char>(buffer[1]);
633  if (len != 0xff) {
634  chunked_data_left = off_t(len);
635  char type = buffer[0];
636  buffer.erase(0, 2);
637  RETURN(type);
638  }
639  if (!read_at_least(len + 2, end_time))
640  RETURN(-1);
641  len = 0;
642  string::const_iterator i = buffer.begin() + 2;
643  unsigned char ch;
644  int shift = 0;
645  do {
646  // Allow at most 63 bits for message lengths - it's neatly a multiple
647  // of 7 bits and anything longer than this is almost certainly a
648  // corrupt value.
649  // The value also needs to be representable as an
650  // off_t (which is a signed type), so use that size if it is smaller.
651  const int SHIFT_LIMIT = 63;
652  if (rare(i == buffer.end() || shift >= SHIFT_LIMIT)) {
653  // Something is very wrong...
655  }
656  ch = *i++;
657  uint_least64_t bits = ch & 0x7f;
658  len |= bits << shift;
659  shift += 7;
660  } while ((ch & 0x80) == 0);
661  len += 255;
662 
663  chunked_data_left = off_t(len);
664  if (sizeof(off_t) * CHAR_BIT < 63) {
665  // Check that the value of len fits in an off_t without loss.
666  if (rare(uint_least64_t(chunked_data_left) != len)) {
668  }
669  }
670 
671  unsigned char type = buffer[0];
672  size_t header_len = (i - buffer.begin());
673  buffer.erase(0, header_len);
674  RETURN(type);
675 }
676 
677 int
678 RemoteConnection::get_message_chunk(string &result, size_t at_least,
679  double end_time)
680 {
681  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
682  if (fdin == -1)
684 
685  if (at_least <= result.size()) RETURN(true);
686  at_least -= result.size();
687 
688  bool read_enough = (off_t(at_least) <= chunked_data_left);
689  if (!read_enough) at_least = size_t(chunked_data_left);
690 
691  if (!read_at_least(at_least, end_time))
692  RETURN(-1);
693 
694  size_t retlen = min(off_t(buffer.size()), chunked_data_left);
695  result.append(buffer, 0, retlen);
696  buffer.erase(0, retlen);
697  chunked_data_left -= retlen;
698 
699  RETURN(int(read_enough));
700 }
701 
703 static void
704 write_all(int fd, const char * p, size_t n)
705 {
706  while (n) {
707  ssize_t c = write(fd, p, n);
708  if (c < 0) {
709  if (errno == EINTR) continue;
710  throw Xapian::NetworkError("Error writing to file", errno);
711  }
712  p += c;
713  n -= c;
714  }
715 }
716 
717 int
718 RemoteConnection::receive_file(const string &file, double end_time)
719 {
720  LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
721  if (fdin == -1)
723 
724  // FIXME: Do we want to be able to delete the file during writing?
725  FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
726  if (fd == -1)
727  throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
728 
729  int type = get_message_chunked(end_time);
730  do {
731  off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
732  if (!read_at_least(min_read, end_time))
733  RETURN(-1);
734  write_all(fd, buffer.data(), min_read);
735  chunked_data_left -= min_read;
736  buffer.erase(0, min_read);
737  } while (chunked_data_left);
738  RETURN(type);
739 }
740 
741 void
743 {
744  LOGCALL_VOID(REMOTE, "RemoteConnection::shutdown", NO_ARGS);
745 
746  if (fdin < 0) return;
747 
748  // We can be called from a destructor, so we can't throw an exception.
749  try {
750  send_message(MSG_SHUTDOWN, string(), 0.0);
751 #ifdef __WIN32__
752  HANDLE hin = fd_to_handle(fdin);
753  char dummy;
754  DWORD received;
755  BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
756  if (!ok && GetLastError() == ERROR_IO_PENDING) {
757  // Wait for asynchronous read to complete.
758  (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
759  }
760 #else
761  // Wait for the connection to be closed - when this happens
762  // poll()/select() will report that a read won't block.
763 # ifdef HAVE_POLL
764  struct pollfd fds;
765  fds.fd = fdin;
766  fds.events = POLLIN;
767  int res;
768  do {
769  res = poll(&fds, 1, -1);
770  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
771 # else
772  if (fdin < FD_SETSIZE) {
773  fd_set fdset;
774  FD_ZERO(&fdset);
775  FD_SET(fdin, &fdset);
776  int res;
777  do {
778  res = select(fdin + 1, &fdset, 0, 0, NULL);
779  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
780  }
781 # endif
782 #endif
783  } catch (...) {
784  }
785 }
786 
787 void
789 {
790  LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", NO_ARGS);
791 
792  if (fdin >= 0) {
794 
795  // If the same fd is used in both directions, don't close it twice.
796  if (fdin == fdout) fdout = -1;
797 
798  fdin = -1;
799  }
800 
801  if (fdout >= 0) {
803  fdout = -1;
804  }
805 }
806 
807 #ifdef __WIN32__
808 DWORD
809 RemoteConnection::calc_read_wait_msecs(double end_time)
810 {
811  if (end_time == 0.0)
812  return INFINITE;
813 
814  // Calculate how far in the future end_time is.
815  double time_diff = end_time - RealTime::now();
816 
817  // DWORD is unsigned, so we mustn't try and return a negative value.
818  if (time_diff < 0.0) {
819  throw_timeout("Timeout expired before starting read", context);
820  }
821  return static_cast<DWORD>(time_diff * 1000.0);
822 }
823 #endif
#define RETURN(A)
Definition: debuglog.h:493
#define Assert(COND)
Definition: omassert.h:122
Define the XAPIAN_NORETURN macro.
RemoteConnection class used by the remote backend.
include <sys/select.h> with portability workarounds.
std::enable_if< std::is_unsigned< T1 >::value &&std::is_unsigned< T2 >::value &&std::is_unsigned< R >::value, bool >::type add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Definition: overflow.h:58
Indicates an attempt to access a closed database.
Definition: error.h:1097
length encoded as a string
int get_message(std::string &result, double end_time)
Read one message from fdin.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
#define AssertRel(A, REL, B)
Definition: omassert.h:123
RemoteConnection(const RemoteConnection &)
Don&#39;t allow copying.
Indicates a timeout expired while communicating with a remote database.
Definition: error.h:845
Provides wrappers with POSIXy semantics.
Arithmetic operations with overflow checks.
#define POLLSELECT
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
void sleep(double t)
Sleep until the time represented by this object.
Definition: realtime.h:127
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Definition: debuglog.h:488
STL namespace.
Utility functions for testing files.
static void throw_database_closed()
#define O_CLOEXEC
Definition: safefcntl.h:90
std::string encode_length(T len)
Encode a length as a variable-length string.
Definition: length.h:36
#define rare(COND)
Definition: config.h:565
std::string context
The context to report with errors.
static void throw_timeout(const char *, const string &)
Hierarchy of classes which Xapian can throw as exceptions.
const char * dummy[]
Definition: version_h.cc:7
static void throw_network_error_insane_message_length()
int fdin
The file descriptor used for reading.
Definition: fd.h:30
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
string str(int value)
Convert int to std::string.
Definition: str.cc:90
Wrapper class around a file descriptor to avoid leaks.
std::string buffer
Buffer to hold unprocessed input.
void close_fd_or_socket(int fd)
Definition: socket_utils.h:38
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Socket handling utilities.
void shutdown()
Shutdown the connection.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
double now()
Return the current time.
Definition: realtime.h:49
Indicates a problem communicating with a remote database.
Definition: error.h:803
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
Definition: realtime.h:110
void send_message(char type, const std::string &s, double end_time)
Send a message.
#define posixy_open
<unistd.h>, but with compat.
off_t file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:71
Various assertion macros.
Functions for handling a time or time interval in a double.
bool ready_to_read() const
See if there is data available to read.
#define LOGLINE(a, b)
Definition: debuglog.h:494
int fdout
The file descriptor used for writing.
int sniff_next_message_type(double end_time)
Check what the next message type is.
void do_close()
Close the connection.
include <fcntl.h>, but working around broken platforms.
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
Debug logging macros.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:487
#define CHUNKSIZE