xapian-core  1.4.19
remoteconnection.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include "remoteconnection.h"
24 
25 #include <xapian/error.h>
26 
27 #include "safefcntl.h"
28 #include "safeunistd.h"
29 
30 #ifdef HAVE_POLL_H
31 # include <poll.h>
32 #else
33 # include "safesysselect.h"
34 #endif
35 
36 #include <algorithm>
37 #include <cerrno>
38 #include <climits>
39 #include <cstdint>
40 #include <string>
41 #ifdef __WIN32__
42 # include <type_traits>
43 #endif
44 
45 #include "debuglog.h"
46 #include "fd.h"
47 #include "filetests.h"
48 #include "noreturn.h"
49 #include "omassert.h"
50 #include "overflow.h"
51 #include "posixy_wrapper.h"
52 #include "realtime.h"
53 #include "length.h"
54 #include "socket_utils.h"
55 
56 using namespace std;
57 
58 #define CHUNKSIZE 4096
59 
60 XAPIAN_NORETURN(static void throw_database_closed());
61 static void
63 {
64  throw Xapian::DatabaseClosedError("Database has been closed");
65 }
66 
67 XAPIAN_NORETURN(static void throw_network_error_insane_message_length());
68 static void
70 {
71  throw Xapian::NetworkError("Insane message length specified!");
72 }
73 
74 XAPIAN_NORETURN(static void throw_timeout(const char*, const string&));
75 static void
76 throw_timeout(const char* msg, const string& context)
77 {
78  throw Xapian::NetworkTimeoutError(msg, context);
79 }
80 
81 #ifdef __WIN32__
82 static inline void
83 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
84 {
85  if (add_overflows(overlapped.Offset, n, overlapped.Offset))
86  ++overlapped.OffsetHigh;
87 }
88 #endif
89 
90 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
91  const string & context_)
92  : fdin(fdin_), fdout(fdout_), context(context_)
93 {
94 #ifdef __WIN32__
95  memset(&overlapped, 0, sizeof(overlapped));
96  overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
97  if (!overlapped.hEvent)
98  throw Xapian::NetworkError("Failed to setup OVERLAPPED",
99  context, -int(GetLastError()));
100 
101 #endif
102 }
103 
104 #ifdef __WIN32__
105 RemoteConnection::~RemoteConnection()
106 {
107  if (overlapped.hEvent)
108  CloseHandle(overlapped.hEvent);
109 }
110 #endif
111 
112 bool
114 {
115  LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
116 
117  if (buffer.length() >= min_len) RETURN(true);
118 
119 #ifdef __WIN32__
120  HANDLE hin = fd_to_handle(fdin);
121  do {
122  char buf[CHUNKSIZE];
123  DWORD received;
124  BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
125  if (!ok) {
126  int errcode = GetLastError();
127  if (errcode != ERROR_IO_PENDING)
128  throw Xapian::NetworkError("read failed", context, -errcode);
129  // Is asynch - just wait for the data to be received or a timeout.
130  DWORD waitrc;
131  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
132  if (waitrc != WAIT_OBJECT_0) {
133  LOGLINE(REMOTE, "read: timeout has expired");
134  throw_timeout("Timeout expired while trying to read", context);
135  }
136  // Get the final result of the read.
137  if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
138  throw Xapian::NetworkError("Failed to get overlapped result",
139  context, -int(GetLastError()));
140  }
141 
142  if (received == 0) {
143  RETURN(false);
144  }
145 
146  buffer.append(buf, received);
147 
148  // We must update the offset in the OVERLAPPED structure manually.
149  update_overlapped_offset(overlapped, received);
150  } while (buffer.length() < min_len);
151 #else
152  // If there's no end_time, just use blocking I/O.
153  if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
154  throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
155  context, errno);
156  }
157 
158  while (true) {
159  char buf[CHUNKSIZE];
160  ssize_t received = read(fdin, buf, sizeof(buf));
161 
162  if (received > 0) {
163  buffer.append(buf, received);
164  if (buffer.length() >= min_len) RETURN(true);
165  continue;
166  }
167 
168  if (received == 0) {
169  RETURN(false);
170  }
171 
172  LOGLINE(REMOTE, "read gave errno = " << errno);
173  if (errno == EINTR) continue;
174 
175  if (errno != EAGAIN)
176  throw Xapian::NetworkError("read failed", context, errno);
177 
178  Assert(end_time != 0.0);
179  while (true) {
180  // Calculate how far in the future end_time is.
181  double now = RealTime::now();
182  double time_diff = end_time - now;
183  // Check if the timeout has expired.
184  if (time_diff < 0) {
185  LOGLINE(REMOTE, "read: timeout has expired");
186  throw_timeout("Timeout expired while trying to read", context);
187  }
188 
189  // Wait until there is data, an error, or the timeout is reached.
190 # ifdef HAVE_POLL
191  struct pollfd fds;
192  fds.fd = fdin;
193  fds.events = POLLIN;
194  int poll_result = poll(&fds, 1, int(time_diff * 1000));
195  if (poll_result > 0) break;
196 
197  if (poll_result == 0)
198  throw_timeout("Timeout expired while trying to read", context);
199 
200  // EINTR means poll was interrupted by a signal. EAGAIN means that
201  // allocation of internal data structures failed.
202  if (errno != EINTR && errno != EAGAIN)
203  throw Xapian::NetworkError("poll failed during read",
204  context, errno);
205 # else
206  if (fdin >= FD_SETSIZE) {
207  // We can't block with a timeout, so just sleep and retry.
208  RealTime::sleep(now + min(0.001, time_diff / 4));
209  break;
210  }
211  fd_set fdset;
212  FD_ZERO(&fdset);
213  FD_SET(fdin, &fdset);
214 
215  struct timeval tv;
216  RealTime::to_timeval(time_diff, &tv);
217  int select_result = select(fdin + 1, &fdset, 0, 0, &tv);
218  if (select_result > 0) break;
219 
220  if (select_result == 0)
221  throw_timeout("Timeout expired while trying to read", context);
222 
223  // EINTR means select was interrupted by a signal. The Linux
224  // select(2) man page says: "Portable programs may wish to check
225  // for EAGAIN and loop, just as with EINTR" and that seems to be
226  // necessary for cygwin at least.
227  if (errno != EINTR && errno != EAGAIN)
228  throw Xapian::NetworkError("select failed during read",
229  context, errno);
230 # endif
231  }
232  }
233 #endif
234  RETURN(true);
235 }
236 
237 bool
239 {
240  LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
241  if (fdin == -1)
243 
244  if (!buffer.empty()) RETURN(true);
245 
246  // See if there's data available to be read.
247 #ifdef HAVE_POLL
248  struct pollfd fds;
249  fds.fd = fdin;
250  fds.events = POLLIN;
251  RETURN(poll(&fds, 1, 0) > 0);
252 #else
253 # ifndef __WIN32__
254  if (fdin >= FD_SETSIZE) {
255  // Not ideal, but OK for how this method is currently used.
256  RETURN(true);
257  }
258 # endif
259  fd_set fdset;
260  FD_ZERO(&fdset);
261  FD_SET(fdin, &fdset);
262 
263  struct timeval tv;
264  tv.tv_sec = 0;
265  tv.tv_usec = 0;
266  RETURN(select(fdin + 1, &fdset, 0, 0, &tv) > 0);
267 #endif
268 }
269 
270 void
271 RemoteConnection::send_message(char type, const string &message,
272  double end_time)
273 {
274  LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
275  if (fdout == -1)
277 
278  string header;
279  header += type;
280  header += encode_length(message.size());
281 
282 #ifdef __WIN32__
283  HANDLE hout = fd_to_handle(fdout);
284  const string * str = &header;
285 
286  size_t count = 0;
287  while (true) {
288  DWORD n;
289  BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
290  if (!ok) {
291  int errcode = GetLastError();
292  if (errcode != ERROR_IO_PENDING)
293  throw Xapian::NetworkError("write failed", context, -errcode);
294  // Just wait for the data to be sent, or a timeout.
295  DWORD waitrc;
296  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
297  if (waitrc != WAIT_OBJECT_0) {
298  LOGLINE(REMOTE, "write: timeout has expired");
299  throw_timeout("Timeout expired while trying to write", context);
300  }
301  // Get the final result.
302  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
303  throw Xapian::NetworkError("Failed to get overlapped result",
304  context, -int(GetLastError()));
305  }
306 
307  count += n;
308 
309  // We must update the offset in the OVERLAPPED structure manually.
310  update_overlapped_offset(overlapped, n);
311 
312  if (count == str->size()) {
313  if (str == &message || message.empty()) return;
314  str = &message;
315  count = 0;
316  }
317  }
318 #else
319  // If there's no end_time, just use blocking I/O.
320  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
321  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
322  context, errno);
323  }
324 
325  const string * str = &header;
326 
327  size_t count = 0;
328  while (true) {
329  // We've set write to non-blocking, so just try writing as there
330  // will usually be space.
331  ssize_t n = write(fdout, str->data() + count, str->size() - count);
332 
333  if (n >= 0) {
334  count += n;
335  if (count == str->size()) {
336  if (str == &message || message.empty()) return;
337  str = &message;
338  count = 0;
339  }
340  continue;
341  }
342 
343  LOGLINE(REMOTE, "write gave errno = " << errno);
344  if (errno == EINTR) continue;
345 
346  if (errno != EAGAIN)
347  throw Xapian::NetworkError("write failed", context, errno);
348 
349  double now = RealTime::now();
350  double time_diff = end_time - now;
351  if (time_diff < 0) {
352  LOGLINE(REMOTE, "write: timeout has expired");
353  throw_timeout("Timeout expired while trying to write", context);
354  }
355 
356  // Wait until there is space or the timeout is reached.
357 # ifdef HAVE_POLL
358  struct pollfd fds;
359  fds.fd = fdout;
360  fds.events = POLLOUT;
361  int result = poll(&fds, 1, int(time_diff * 1000));
362 # define POLLSELECT "poll"
363 # else
364  if (fdout >= FD_SETSIZE) {
365  // We can't block with a timeout, so just sleep and retry.
366  RealTime::sleep(now + min(0.001, time_diff / 4));
367  continue;
368  }
369 
370  fd_set fdset;
371  FD_ZERO(&fdset);
372  FD_SET(fdout, &fdset);
373 
374  struct timeval tv;
375  RealTime::to_timeval(time_diff, &tv);
376  int result = select(fdout + 1, 0, &fdset, 0, &tv);
377 # define POLLSELECT "select"
378 # endif
379 
380  if (result < 0) {
381  if (errno == EINTR || errno == EAGAIN) {
382  // EINTR/EAGAIN means select was interrupted by a signal.
383  // We could just retry the poll/select, but it's easier to just
384  // retry the write.
385  continue;
386  }
387  throw Xapian::NetworkError(POLLSELECT " failed during write",
388  context, errno);
389 # undef POLLSELECT
390  }
391 
392  if (result == 0)
393  throw_timeout("Timeout expired while trying to write", context);
394  }
395 #endif
396 }
397 
398 void
399 RemoteConnection::send_file(char type, int fd, double end_time)
400 {
401  LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
402  if (fdout == -1)
404 
405  off_t size = file_size(fd);
406  if (errno)
407  throw Xapian::NetworkError("Couldn't stat file to send", errno);
408  // FIXME: Use sendfile() or similar if available?
409 
410  char buf[CHUNKSIZE];
411  buf[0] = type;
412  size_t c = 1;
413  {
414  string enc_size = encode_length(size);
415  c += enc_size.size();
416  // An encoded length should be just a few bytes.
417  AssertRel(c, <=, sizeof(buf));
418  memcpy(buf + 1, enc_size.data(), enc_size.size());
419  }
420 
421 #ifdef __WIN32__
422  HANDLE hout = fd_to_handle(fdout);
423  size_t count = 0;
424  while (true) {
425  DWORD n;
426  BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
427  if (!ok) {
428  int errcode = GetLastError();
429  if (errcode != ERROR_IO_PENDING)
430  throw Xapian::NetworkError("write failed", context, -errcode);
431  // Just wait for the data to be sent, or a timeout.
432  DWORD waitrc;
433  waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
434  if (waitrc != WAIT_OBJECT_0) {
435  LOGLINE(REMOTE, "write: timeout has expired");
436  throw_timeout("Timeout expired while trying to write", context);
437  }
438  // Get the final result.
439  if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
440  throw Xapian::NetworkError("Failed to get overlapped result",
441  context, -int(GetLastError()));
442  }
443 
444  count += n;
445 
446  // We must update the offset in the OVERLAPPED structure manually.
447  update_overlapped_offset(overlapped, n);
448 
449  if (count == c) {
450  if (size == 0) return;
451 
452  ssize_t res;
453  do {
454  res = read(fd, buf, sizeof(buf));
455  } while (res < 0 && errno == EINTR);
456  if (res < 0) throw Xapian::NetworkError("read failed", errno);
457  c = size_t(res);
458 
459  size -= c;
460  count = 0;
461  }
462  }
463 #else
464  // If there's no end_time, just use blocking I/O.
465  if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
466  throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
467  context, errno);
468  }
469 
470  size_t count = 0;
471  while (true) {
472  // We've set write to non-blocking, so just try writing as there
473  // will usually be space.
474  ssize_t n = write(fdout, buf + count, c - count);
475 
476  if (n >= 0) {
477  count += n;
478  if (count == c) {
479  if (size == 0) return;
480 
481  ssize_t res;
482  do {
483  res = read(fd, buf, sizeof(buf));
484  } while (res < 0 && errno == EINTR);
485  if (res < 0) throw Xapian::NetworkError("read failed", errno);
486  c = size_t(res);
487 
488  size -= c;
489  count = 0;
490  }
491  continue;
492  }
493 
494  LOGLINE(REMOTE, "write gave errno = " << errno);
495  if (errno == EINTR) continue;
496 
497  if (errno != EAGAIN)
498  throw Xapian::NetworkError("write failed", context, errno);
499 
500  double now = RealTime::now();
501  double time_diff = end_time - now;
502  if (time_diff < 0) {
503  LOGLINE(REMOTE, "write: timeout has expired");
504  throw_timeout("Timeout expired while trying to write", context);
505  }
506 
507  // Wait until there is space or the timeout is reached.
508 # ifdef HAVE_POLL
509  struct pollfd fds;
510  fds.fd = fdout;
511  fds.events = POLLOUT;
512  int result = poll(&fds, 1, int(time_diff * 1000));
513 # define POLLSELECT "poll"
514 # else
515  if (fdout >= FD_SETSIZE) {
516  // We can't block with a timeout, so just sleep and retry.
517  RealTime::sleep(now + min(0.001, time_diff / 4));
518  continue;
519  }
520 
521  fd_set fdset;
522  FD_ZERO(&fdset);
523  FD_SET(fdout, &fdset);
524 
525  struct timeval tv;
526  RealTime::to_timeval(time_diff, &tv);
527  int result = select(fdout + 1, 0, &fdset, 0, &tv);
528 # define POLLSELECT "select"
529 # endif
530 
531  if (result < 0) {
532  if (errno == EINTR || errno == EAGAIN) {
533  // EINTR/EAGAIN means select was interrupted by a signal.
534  // We could just retry the poll/select, but it's easier to just
535  // retry the write.
536  continue;
537  }
538  throw Xapian::NetworkError(POLLSELECT " failed during write",
539  context, errno);
540 # undef POLLSELECT
541  }
542 
543  if (result == 0)
544  throw_timeout("Timeout expired while trying to write", context);
545  }
546 #endif
547 }
548 
549 int
551 {
552  LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
553  if (fdin == -1)
555 
556  if (!read_at_least(1, end_time))
557  RETURN(-1);
558  unsigned char type = buffer[0];
559  RETURN(type);
560 }
561 
562 int
564 {
565  LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
566  if (fdin == -1)
568 
569  if (!read_at_least(2, end_time))
570  RETURN(-1);
571  size_t len = static_cast<unsigned char>(buffer[1]);
572  if (!read_at_least(len + 2, end_time))
573  RETURN(-1);
574  if (len != 0xff) {
575  result.assign(buffer.data() + 2, len);
576  unsigned char type = buffer[0];
577  buffer.erase(0, len + 2);
578  RETURN(type);
579  }
580  len = 0;
581  string::const_iterator i = buffer.begin() + 2;
582  unsigned char ch;
583  int shift = 0;
584  do {
585  if (i == buffer.end() || shift > 28) {
586  // Something is very wrong...
588  }
589  ch = *i++;
590  len |= size_t(ch & 0x7f) << shift;
591  shift += 7;
592  } while ((ch & 0x80) == 0);
593  len += 255;
594  size_t header_len = (i - buffer.begin());
595  if (!read_at_least(header_len + len, end_time))
596  RETURN(-1);
597  result.assign(buffer.data() + header_len, len);
598  unsigned char type = buffer[0];
599  buffer.erase(0, header_len + len);
600  RETURN(type);
601 }
602 
603 int
605 {
606  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
607 
608  if (fdin == -1)
610 
611  if (!read_at_least(2, end_time))
612  RETURN(-1);
613  uint_least64_t len = static_cast<unsigned char>(buffer[1]);
614  if (len != 0xff) {
615  chunked_data_left = off_t(len);
616  char type = buffer[0];
617  buffer.erase(0, 2);
618  RETURN(type);
619  }
620  if (!read_at_least(len + 2, end_time))
621  RETURN(-1);
622  len = 0;
623  string::const_iterator i = buffer.begin() + 2;
624  unsigned char ch;
625  int shift = 0;
626  do {
627  // Allow at most 63 bits for message lengths - it's neatly a multiple
628  // of 7 bits and anything longer than this is almost certainly a
629  // corrupt value.
630  // The value also needs to be representable as an
631  // off_t (which is a signed type), so use that size if it is smaller.
632  const int SHIFT_LIMIT = 63;
633  if (rare(i == buffer.end() || shift >= SHIFT_LIMIT)) {
634  // Something is very wrong...
636  }
637  ch = *i++;
638  uint_least64_t bits = ch & 0x7f;
639  len |= bits << shift;
640  shift += 7;
641  } while ((ch & 0x80) == 0);
642  len += 255;
643 
644  chunked_data_left = off_t(len);
645  if (sizeof(off_t) * CHAR_BIT < 63) {
646  // Check that the value of len fits in an off_t without loss.
647  if (rare(uint_least64_t(chunked_data_left) != len)) {
649  }
650  }
651 
652  unsigned char type = buffer[0];
653  size_t header_len = (i - buffer.begin());
654  buffer.erase(0, header_len);
655  RETURN(type);
656 }
657 
658 int
659 RemoteConnection::get_message_chunk(string &result, size_t at_least,
660  double end_time)
661 {
662  LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
663  if (fdin == -1)
665 
666  if (at_least <= result.size()) RETURN(true);
667  at_least -= result.size();
668 
669  bool read_enough = (off_t(at_least) <= chunked_data_left);
670  if (!read_enough) at_least = size_t(chunked_data_left);
671 
672  if (!read_at_least(at_least, end_time))
673  RETURN(-1);
674 
675  size_t retlen = min(off_t(buffer.size()), chunked_data_left);
676  result.append(buffer, 0, retlen);
677  buffer.erase(0, retlen);
678  chunked_data_left -= retlen;
679 
680  RETURN(int(read_enough));
681 }
682 
684 static void
685 write_all(int fd, const char * p, size_t n)
686 {
687  while (n) {
688  ssize_t c = write(fd, p, n);
689  if (c < 0) {
690  if (errno == EINTR) continue;
691  throw Xapian::NetworkError("Error writing to file", errno);
692  }
693  p += c;
694  n -= c;
695  }
696 }
697 
698 int
699 RemoteConnection::receive_file(const string &file, double end_time)
700 {
701  LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
702  if (fdin == -1)
704 
705  // FIXME: Do we want to be able to delete the file during writing?
706  FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
707  if (fd == -1)
708  throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
709 
710  int type = get_message_chunked(end_time);
711  do {
712  off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
713  if (!read_at_least(min_read, end_time))
714  RETURN(-1);
715  write_all(fd, buffer.data(), min_read);
716  chunked_data_left -= min_read;
717  buffer.erase(0, min_read);
718  } while (chunked_data_left);
719  RETURN(type);
720 }
721 
722 void
724 {
725  LOGCALL_VOID(REMOTE, "RemoteConnection::shutdown", NO_ARGS);
726 
727  if (fdin < 0) return;
728 
729  // We can be called from a destructor, so we can't throw an exception.
730  try {
731  send_message(MSG_SHUTDOWN, string(), 0.0);
732 #ifdef __WIN32__
733  HANDLE hin = fd_to_handle(fdin);
734  char dummy;
735  DWORD received;
736  BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
737  if (!ok && GetLastError() == ERROR_IO_PENDING) {
738  // Wait for asynchronous read to complete.
739  (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
740  }
741 #else
742  // Wait for the connection to be closed - when this happens
743  // poll()/select() will report that a read won't block.
744 # ifdef HAVE_POLL
745  struct pollfd fds;
746  fds.fd = fdin;
747  fds.events = POLLIN;
748  int res;
749  do {
750  res = poll(&fds, 1, -1);
751  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
752 # else
753  if (fdin < FD_SETSIZE) {
754  fd_set fdset;
755  FD_ZERO(&fdset);
756  FD_SET(fdin, &fdset);
757  int res;
758  do {
759  res = select(fdin + 1, &fdset, 0, 0, NULL);
760  } while (res < 0 && (errno == EINTR || errno == EAGAIN));
761  }
762 # endif
763 #endif
764  } catch (...) {
765  }
766 }
767 
768 void
770 {
771  LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", NO_ARGS);
772 
773  if (fdin >= 0) {
775 
776  // If the same fd is used in both directions, don't close it twice.
777  if (fdin == fdout) fdout = -1;
778 
779  fdin = -1;
780  }
781 
782  if (fdout >= 0) {
784  fdout = -1;
785  }
786 }
787 
788 #ifdef __WIN32__
789 DWORD
790 RemoteConnection::calc_read_wait_msecs(double end_time)
791 {
792  if (end_time == 0.0)
793  return INFINITE;
794 
795  // Calculate how far in the future end_time is.
796  double time_diff = end_time - RealTime::now();
797 
798  // DWORD is unsigned, so we mustn't try and return a negative value.
799  if (time_diff < 0.0) {
800  throw_timeout("Timeout expired before starting read", context);
801  }
802  return static_cast<DWORD>(time_diff * 1000.0);
803 }
804 #endif
#define RETURN(A)
Definition: debuglog.h:482
#define Assert(COND)
Definition: omassert.h:122
Define the XAPIAN_NORETURN macro.
RemoteConnection class used by the remote backend.
include <sys/select.h> with portability workarounds.
std::enable_if< std::is_unsigned< T1 >::value &&std::is_unsigned< T2 >::value &&std::is_unsigned< R >::value, bool >::type add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Definition: overflow.h:51
Indicates an attempt to access a closed database.
Definition: error.h:1097
length encoded as a string
int get_message(std::string &result, double end_time)
Read one message from fdin.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
#define AssertRel(A, REL, B)
Definition: omassert.h:123
RemoteConnection(const RemoteConnection &)
Don&#39;t allow copying.
Indicates a timeout expired while communicating with a remote database.
Definition: error.h:845
Provides wrappers with POSIXy semantics.
Arithmetic operations with overflow checks.
#define POLLSELECT
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
void sleep(double t)
Sleep until the time represented by this object.
Definition: realtime.h:127
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
Definition: realtime.h:95
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Definition: debuglog.h:477
STL namespace.
Utility functions for testing files.
static void throw_database_closed()
#define O_CLOEXEC
Definition: safefcntl.h:90
std::string encode_length(T len)
Encode a length as a variable-length string.
Definition: length.h:36
#define rare(COND)
Definition: config.h:543
std::string context
The context to report with errors.
static void throw_timeout(const char *, const string &)
Hierarchy of classes which Xapian can throw as exceptions.
const char * dummy[]
Definition: version_h.cc:7
static void throw_network_error_insane_message_length()
int fdin
The file descriptor used for reading.
Definition: fd.h:30
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
string str(int value)
Convert int to std::string.
Definition: str.cc:90
Wrapper class around a file descriptor to avoid leaks.
std::string buffer
Buffer to hold unprocessed input.
void close_fd_or_socket(int fd)
Definition: socket_utils.h:38
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Socket handling utilities.
void shutdown()
Shutdown the connection.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
double now()
Return the current time.
Definition: realtime.h:49
Indicates a problem communicating with a remote database.
Definition: error.h:803
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
Definition: realtime.h:110
void send_message(char type, const std::string &s, double end_time)
Send a message.
#define posixy_open
<unistd.h>, but with compat.
off_t file_size(const char *path)
Returns the size of a file.
Definition: filetests.h:71
Various assertion macros.
Functions for handling a time or time interval in a double.
bool ready_to_read() const
See if there is data available to read.
#define LOGLINE(a, b)
Definition: debuglog.h:483
int fdout
The file descriptor used for writing.
int sniff_next_message_type(double end_time)
Check what the next message type is.
void do_close()
Close the connection.
include <fcntl.h>, but working around broken platforms.
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
Debug logging macros.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
Definition: debuglog.h:476
#define CHUNKSIZE