42 # include <type_traits> 58 #define CHUNKSIZE 4096 74 XAPIAN_NORETURN(
static void throw_timeout(
const char*,
const string&));
83 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
86 ++overlapped.OffsetHigh;
91 const string & context_)
92 : fdin(fdin_), fdout(fdout_), context(context_)
95 memset(&overlapped, 0,
sizeof(overlapped));
96 overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
97 if (!overlapped.hEvent)
105 RemoteConnection::~RemoteConnection()
107 if (overlapped.hEvent)
108 CloseHandle(overlapped.hEvent);
115 LOGCALL(REMOTE,
bool,
"RemoteConnection::read_at_least", min_len | end_time);
120 HANDLE hin = fd_to_handle(
fdin);
124 BOOL ok = ReadFile(hin, buf,
sizeof(buf), &received, &overlapped);
126 int errcode = GetLastError();
127 if (errcode != ERROR_IO_PENDING)
131 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
132 if (waitrc != WAIT_OBJECT_0) {
133 LOGLINE(REMOTE,
"read: timeout has expired");
137 if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
139 context, -
int(GetLastError()));
146 buffer.append(buf, received);
149 update_overlapped_offset(overlapped, received);
150 }
while (
buffer.length() < min_len);
153 if (fcntl(
fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
160 ssize_t received = read(
fdin, buf,
sizeof(buf));
163 buffer.append(buf, received);
172 LOGLINE(REMOTE,
"read gave errno = " << errno);
173 if (errno == EINTR)
continue;
182 double time_diff = end_time -
now;
185 LOGLINE(REMOTE,
"read: timeout has expired");
194 int poll_result = poll(&fds, 1,
int(time_diff * 1000));
195 if (poll_result > 0)
break;
197 if (poll_result == 0)
202 if (errno != EINTR && errno != EAGAIN)
206 if (
fdin >= FD_SETSIZE) {
213 FD_SET(
fdin, &fdset);
217 int select_result = select(
fdin + 1, &fdset, 0, 0, &tv);
218 if (select_result > 0)
break;
220 if (select_result == 0)
227 if (errno != EINTR && errno != EAGAIN)
240 LOGCALL(REMOTE,
bool,
"RemoteConnection::ready_to_read", NO_ARGS);
251 RETURN(poll(&fds, 1, 0) > 0);
254 if (
fdin >= FD_SETSIZE) {
261 FD_SET(
fdin, &fdset);
266 RETURN(select(
fdin + 1, &fdset, 0, 0, &tv) > 0);
274 LOGCALL_VOID(REMOTE,
"RemoteConnection::send_message", type | message | end_time);
283 HANDLE hout = fd_to_handle(
fdout);
284 const string *
str = &header;
289 BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
291 int errcode = GetLastError();
292 if (errcode != ERROR_IO_PENDING)
296 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
297 if (waitrc != WAIT_OBJECT_0) {
298 LOGLINE(REMOTE,
"write: timeout has expired");
302 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
304 context, -
int(GetLastError()));
310 update_overlapped_offset(overlapped, n);
312 if (count == str->size()) {
313 if (str == &message || message.empty())
return;
320 if (fcntl(
fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
325 const string * str = &header;
331 #if defined MSG_NOSIGNAL && !defined SO_NOSIGPIPE 332 ssize_t n = send(
fdout, str->data() + count, str->size() - count,
334 if (n < 0 && errno == ENOTSOCK) {
337 n = write(
fdout, str->data() + count, str->size() - count);
340 ssize_t n = write(
fdout, str->data() + count, str->size() - count);
345 if (count == str->size()) {
346 if (str == &message || message.empty())
return;
353 LOGLINE(REMOTE,
"write gave errno = " << errno);
354 if (errno == EINTR)
continue;
360 double time_diff = end_time -
now;
362 LOGLINE(REMOTE,
"write: timeout has expired");
370 fds.events = POLLOUT;
371 int result = poll(&fds, 1,
int(time_diff * 1000));
372 # define POLLSELECT "poll" 374 if (
fdout >= FD_SETSIZE) {
382 FD_SET(
fdout, &fdset);
386 int result = select(
fdout + 1, 0, &fdset, 0, &tv);
387 # define POLLSELECT "select" 391 if (errno == EINTR || errno == EAGAIN) {
411 LOGCALL_VOID(REMOTE,
"RemoteConnection::send_file", type | fd | end_time);
425 c += enc_size.size();
428 memcpy(buf + 1, enc_size.data(), enc_size.size());
432 HANDLE hout = fd_to_handle(
fdout);
436 BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
438 int errcode = GetLastError();
439 if (errcode != ERROR_IO_PENDING)
443 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
444 if (waitrc != WAIT_OBJECT_0) {
445 LOGLINE(REMOTE,
"write: timeout has expired");
449 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
451 context, -
int(GetLastError()));
457 update_overlapped_offset(overlapped, n);
460 if (size == 0)
return;
464 res = read(fd, buf,
sizeof(buf));
465 }
while (res < 0 && errno == EINTR);
475 if (fcntl(
fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
484 #if defined MSG_NOSIGNAL && !defined SO_NOSIGPIPE 485 ssize_t n = send(
fdout, buf + count, c - count, MSG_NOSIGNAL);
486 if (n < 0 && errno == ENOTSOCK) {
489 n = write(
fdout, buf + count, c - count);
492 ssize_t n = write(
fdout, buf + count, c - count);
498 if (size == 0)
return;
502 res = read(fd, buf,
sizeof(buf));
503 }
while (res < 0 && errno == EINTR);
513 LOGLINE(REMOTE,
"write gave errno = " << errno);
514 if (errno == EINTR)
continue;
520 double time_diff = end_time -
now;
522 LOGLINE(REMOTE,
"write: timeout has expired");
530 fds.events = POLLOUT;
531 int result = poll(&fds, 1,
int(time_diff * 1000));
532 # define POLLSELECT "poll" 534 if (
fdout >= FD_SETSIZE) {
542 FD_SET(
fdout, &fdset);
546 int result = select(
fdout + 1, 0, &fdset, 0, &tv);
547 # define POLLSELECT "select" 551 if (errno == EINTR || errno == EAGAIN) {
571 LOGCALL(REMOTE,
int,
"RemoteConnection::sniff_next_message_type", end_time);
577 unsigned char type =
buffer[0];
584 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message", result | end_time);
590 size_t len =
static_cast<unsigned char>(
buffer[1]);
594 result.assign(
buffer.data() + 2, len);
595 unsigned char type =
buffer[0];
600 string::const_iterator i =
buffer.begin() + 2;
604 if (i ==
buffer.end() || shift > 28) {
609 len |= size_t(ch & 0x7f) << shift;
611 }
while ((ch & 0x80) == 0);
613 size_t header_len = (i -
buffer.begin());
616 result.assign(
buffer.data() + header_len, len);
617 unsigned char type =
buffer[0];
618 buffer.erase(0, header_len + len);
625 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message_chunked", end_time);
632 uint_least64_t len =
static_cast<unsigned char>(
buffer[1]);
642 string::const_iterator i =
buffer.begin() + 2;
651 const int SHIFT_LIMIT = 63;
652 if (
rare(i ==
buffer.end() || shift >= SHIFT_LIMIT)) {
657 uint_least64_t bits = ch & 0x7f;
658 len |= bits << shift;
660 }
while ((ch & 0x80) == 0);
664 if (
sizeof(off_t) * CHAR_BIT < 63) {
671 unsigned char type =
buffer[0];
672 size_t header_len = (i -
buffer.begin());
673 buffer.erase(0, header_len);
681 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message_chunk", result | at_least | end_time);
685 if (at_least <= result.size())
RETURN(
true);
686 at_least -= result.size();
695 result.append(
buffer, 0, retlen);
707 ssize_t c = write(fd, p, n);
709 if (errno == EINTR)
continue;
720 LOGCALL(REMOTE,
int,
"RemoteConnection::receive_file", file | end_time);
736 buffer.erase(0, min_read);
744 LOGCALL_VOID(REMOTE,
"RemoteConnection::shutdown", NO_ARGS);
746 if (
fdin < 0)
return;
752 HANDLE hin = fd_to_handle(
fdin);
755 BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
756 if (!ok && GetLastError() == ERROR_IO_PENDING) {
758 (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
769 res = poll(&fds, 1, -1);
770 }
while (res < 0 && (errno == EINTR || errno == EAGAIN));
772 if (
fdin < FD_SETSIZE) {
775 FD_SET(
fdin, &fdset);
778 res = select(
fdin + 1, &fdset, 0, 0, NULL);
779 }
while (res < 0 && (errno == EINTR || errno == EAGAIN));
790 LOGCALL_VOID(REMOTE,
"RemoteConnection::do_close", NO_ARGS);
809 RemoteConnection::calc_read_wait_msecs(
double end_time)
818 if (time_diff < 0.0) {
821 return static_cast<DWORD
>(time_diff * 1000.0);
Define the XAPIAN_NORETURN macro.
RemoteConnection class used by the remote backend.
include <sys/select.h> with portability workarounds.
std::enable_if< std::is_unsigned< T1 >::value &&std::is_unsigned< T2 >::value &&std::is_unsigned< R >::value, bool >::type add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Indicates an attempt to access a closed database.
length encoded as a string
int get_message(std::string &result, double end_time)
Read one message from fdin.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
#define AssertRel(A, REL, B)
RemoteConnection(const RemoteConnection &)
Don't allow copying.
Indicates a timeout expired while communicating with a remote database.
Provides wrappers with POSIXy semantics.
Arithmetic operations with overflow checks.
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
void sleep(double t)
Sleep until the time represented by this object.
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Utility functions for testing files.
static void throw_database_closed()
std::string encode_length(T len)
Encode a length as a variable-length string.
std::string context
The context to report with errors.
static void throw_timeout(const char *, const string &)
Hierarchy of classes which Xapian can throw as exceptions.
static void throw_network_error_insane_message_length()
int fdin
The file descriptor used for reading.
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
string str(int value)
Convert int to std::string.
Wrapper class around a file descriptor to avoid leaks.
std::string buffer
Buffer to hold unprocessed input.
void close_fd_or_socket(int fd)
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Socket handling utilities.
void shutdown()
Shutdown the connection.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
double now()
Return the current time.
Indicates a problem communicating with a remote database.
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
void send_message(char type, const std::string &s, double end_time)
Send a message.
<unistd.h>, but with compat.
off_t file_size(const char *path)
Returns the size of a file.
Various assertion macros.
Functions for handling a time or time interval in a double.
bool ready_to_read() const
See if there is data available to read.
int fdout
The file descriptor used for writing.
int sniff_next_message_type(double end_time)
Check what the next message type is.
void do_close()
Close the connection.
include <fcntl.h>, but working around broken platforms.
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)