42 # include <type_traits> 58 #define CHUNKSIZE 4096 74 XAPIAN_NORETURN(
static void throw_timeout(
const char*,
const string&));
83 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
86 ++overlapped.OffsetHigh;
91 const string & context_)
92 : fdin(fdin_), fdout(fdout_), context(context_)
95 memset(&overlapped, 0,
sizeof(overlapped));
96 overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
97 if (!overlapped.hEvent)
101 #elif defined USE_SO_NOSIGPIPE 118 if (setsockopt(
fdout, SOL_SOCKET, SO_NOSIGPIPE,
119 reinterpret_cast<char*>(&on),
sizeof(on)) < 0) {
126 if (errno != ENOTSOCK && errno != EBADF) {
131 #elif defined USE_MSG_NOSIGNAL 143 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
150 RemoteConnection::~RemoteConnection()
152 if (overlapped.hEvent)
153 CloseHandle(overlapped.hEvent);
160 LOGCALL(REMOTE,
bool,
"RemoteConnection::read_at_least", min_len | end_time);
165 HANDLE hin = fd_to_handle(
fdin);
169 BOOL ok = ReadFile(hin, buf,
sizeof(buf), &received, &overlapped);
171 int errcode = GetLastError();
172 if (errcode != ERROR_IO_PENDING)
176 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
177 if (waitrc != WAIT_OBJECT_0) {
178 LOGLINE(REMOTE,
"read: timeout has expired");
182 if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
184 context, -
int(GetLastError()));
191 buffer.append(buf, received);
194 update_overlapped_offset(overlapped, received);
195 }
while (
buffer.length() < min_len);
198 if (fcntl(
fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
205 ssize_t received = read(
fdin, buf,
sizeof(buf));
208 buffer.append(buf, received);
217 LOGLINE(REMOTE,
"read gave errno = " << errno);
218 if (errno == EINTR)
continue;
227 double time_diff = end_time -
now;
230 LOGLINE(REMOTE,
"read: timeout has expired");
239 int poll_result = poll(&fds, 1,
int(time_diff * 1000));
240 if (poll_result > 0)
break;
242 if (poll_result == 0)
247 if (errno != EINTR && errno != EAGAIN)
251 if (
fdin >= FD_SETSIZE) {
258 FD_SET(
fdin, &fdset);
262 int select_result = select(
fdin + 1, &fdset, 0, 0, &tv);
263 if (select_result > 0)
break;
265 if (select_result == 0)
272 if (errno != EINTR && errno != EAGAIN)
285 LOGCALL(REMOTE,
bool,
"RemoteConnection::ready_to_read", NO_ARGS);
296 RETURN(poll(&fds, 1, 0) > 0);
299 if (
fdin >= FD_SETSIZE) {
306 FD_SET(
fdin, &fdset);
311 RETURN(select(
fdin + 1, &fdset, 0, 0, &tv) > 0);
319 # ifdef USE_MSG_NOSIGNAL 321 ssize_t n = send(
fdout, p, len, send_flags);
322 if (
usual(n >= 0 || errno != ENOTSOCK))
return n;
329 return write(
fdout, p, len);
337 LOGCALL_VOID(REMOTE,
"RemoteConnection::send_message", type | message | end_time);
346 HANDLE hout = fd_to_handle(
fdout);
347 const string *
str = &header;
352 BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
354 int errcode = GetLastError();
355 if (errcode != ERROR_IO_PENDING)
359 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
360 if (waitrc != WAIT_OBJECT_0) {
361 LOGLINE(REMOTE,
"write: timeout has expired");
365 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
367 context, -
int(GetLastError()));
373 update_overlapped_offset(overlapped, n);
375 if (count == str->size()) {
376 if (str == &message || message.empty())
return;
383 if (fcntl(
fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
388 const string * str = &header;
394 ssize_t n =
send_or_write(str->data() + count, str->size() - count);
398 if (count == str->size()) {
399 if (str == &message || message.empty())
return;
406 LOGLINE(REMOTE,
"write gave errno = " << errno);
407 if (errno == EINTR)
continue;
413 double time_diff = end_time -
now;
415 LOGLINE(REMOTE,
"write: timeout has expired");
423 fds.events = POLLOUT;
424 int result = poll(&fds, 1,
int(time_diff * 1000));
425 # define POLLSELECT "poll" 427 if (
fdout >= FD_SETSIZE) {
435 FD_SET(
fdout, &fdset);
439 int result = select(
fdout + 1, 0, &fdset, 0, &tv);
440 # define POLLSELECT "select" 444 if (errno == EINTR || errno == EAGAIN) {
464 LOGCALL_VOID(REMOTE,
"RemoteConnection::send_file", type | fd | end_time);
478 c += enc_size.size();
481 memcpy(buf + 1, enc_size.data(), enc_size.size());
485 HANDLE hout = fd_to_handle(
fdout);
489 BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
491 int errcode = GetLastError();
492 if (errcode != ERROR_IO_PENDING)
496 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
497 if (waitrc != WAIT_OBJECT_0) {
498 LOGLINE(REMOTE,
"write: timeout has expired");
502 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
504 context, -
int(GetLastError()));
510 update_overlapped_offset(overlapped, n);
513 if (size == 0)
return;
517 res = read(fd, buf,
sizeof(buf));
518 }
while (res < 0 && errno == EINTR);
528 if (fcntl(
fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
542 if (size == 0)
return;
546 res = read(fd, buf,
sizeof(buf));
547 }
while (res < 0 && errno == EINTR);
557 LOGLINE(REMOTE,
"write gave errno = " << errno);
558 if (errno == EINTR)
continue;
564 double time_diff = end_time -
now;
566 LOGLINE(REMOTE,
"write: timeout has expired");
574 fds.events = POLLOUT;
575 int result = poll(&fds, 1,
int(time_diff * 1000));
576 # define POLLSELECT "poll" 578 if (
fdout >= FD_SETSIZE) {
586 FD_SET(
fdout, &fdset);
590 int result = select(
fdout + 1, 0, &fdset, 0, &tv);
591 # define POLLSELECT "select" 595 if (errno == EINTR || errno == EAGAIN) {
615 LOGCALL(REMOTE,
int,
"RemoteConnection::sniff_next_message_type", end_time);
621 unsigned char type =
buffer[0];
628 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message", result | end_time);
634 size_t len =
static_cast<unsigned char>(
buffer[1]);
638 result.assign(
buffer.data() + 2, len);
639 unsigned char type =
buffer[0];
644 string::const_iterator i =
buffer.begin() + 2;
648 if (i ==
buffer.end() || shift > 28) {
653 len |= size_t(ch & 0x7f) << shift;
655 }
while ((ch & 0x80) == 0);
657 size_t header_len = (i -
buffer.begin());
660 result.assign(
buffer.data() + header_len, len);
661 unsigned char type =
buffer[0];
662 buffer.erase(0, header_len + len);
669 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message_chunked", end_time);
676 uint_least64_t len =
static_cast<unsigned char>(
buffer[1]);
686 string::const_iterator i =
buffer.begin() + 2;
695 const int SHIFT_LIMIT = 63;
696 if (
rare(i ==
buffer.end() || shift >= SHIFT_LIMIT)) {
701 uint_least64_t bits = ch & 0x7f;
702 len |= bits << shift;
704 }
while ((ch & 0x80) == 0);
708 if (
sizeof(off_t) * CHAR_BIT < 63) {
715 unsigned char type =
buffer[0];
716 size_t header_len = (i -
buffer.begin());
717 buffer.erase(0, header_len);
725 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message_chunk", result | at_least | end_time);
729 if (at_least <= result.size())
RETURN(
true);
730 at_least -= result.size();
739 result.append(
buffer, 0, retlen);
751 ssize_t c = write(fd, p, n);
753 if (errno == EINTR)
continue;
764 LOGCALL(REMOTE,
int,
"RemoteConnection::receive_file", file | end_time);
780 buffer.erase(0, min_read);
788 LOGCALL_VOID(REMOTE,
"RemoteConnection::shutdown", NO_ARGS);
790 if (
fdin < 0)
return;
796 HANDLE hin = fd_to_handle(
fdin);
799 BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
800 if (!ok && GetLastError() == ERROR_IO_PENDING) {
802 (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
813 res = poll(&fds, 1, -1);
814 }
while (res < 0 && (errno == EINTR || errno == EAGAIN));
816 if (
fdin < FD_SETSIZE) {
819 FD_SET(
fdin, &fdset);
822 res = select(
fdin + 1, &fdset, 0, 0, NULL);
823 }
while (res < 0 && (errno == EINTR || errno == EAGAIN));
834 LOGCALL_VOID(REMOTE,
"RemoteConnection::do_close", NO_ARGS);
853 RemoteConnection::calc_read_wait_msecs(
double end_time)
862 if (time_diff < 0.0) {
865 return static_cast<DWORD
>(time_diff * 1000.0);
Define the XAPIAN_NORETURN macro.
RemoteConnection class used by the remote backend.
include <sys/select.h> with portability workarounds.
std::enable_if< std::is_unsigned< T1 >::value &&std::is_unsigned< T2 >::value &&std::is_unsigned< R >::value, bool >::type add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Indicates an attempt to access a closed database.
length encoded as a string
int get_message(std::string &result, double end_time)
Read one message from fdin.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
#define AssertRel(A, REL, B)
RemoteConnection(const RemoteConnection &)
Don't allow copying.
Indicates a timeout expired while communicating with a remote database.
Provides wrappers with POSIXy semantics.
Arithmetic operations with overflow checks.
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
void sleep(double t)
Sleep until the time represented by this object.
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
ssize_t send_or_write(const void *p, size_t n)
Helper which calls send() or write().
Utility functions for testing files.
static void throw_database_closed()
std::string encode_length(T len)
Encode a length as a variable-length string.
std::string context
The context to report with errors.
static void throw_timeout(const char *, const string &)
Hierarchy of classes which Xapian can throw as exceptions.
static void throw_network_error_insane_message_length()
int fdin
The file descriptor used for reading.
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
string str(int value)
Convert int to std::string.
Wrapper class around a file descriptor to avoid leaks.
std::string buffer
Buffer to hold unprocessed input.
void close_fd_or_socket(int fd)
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
Socket handling utilities.
void shutdown()
Shutdown the connection.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
double now()
Return the current time.
Indicates a problem communicating with a remote database.
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
void send_message(char type, const std::string &s, double end_time)
Send a message.
<unistd.h>, but with compat.
off_t file_size(const char *path)
Returns the size of a file.
Various assertion macros.
Functions for handling a time or time interval in a double.
bool ready_to_read() const
See if there is data available to read.
int fdout
The file descriptor used for writing.
int sniff_next_message_type(double end_time)
Check what the next message type is.
void do_close()
Close the connection.
include <fcntl.h>, but working around broken platforms.
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)