42 # include <type_traits>
57 #define CHUNKSIZE 4096
82 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
85 ++overlapped.OffsetHigh;
90 const string & context_)
91 : fdin(fdin_), fdout(fdout_), context(context_)
94 memset(&overlapped, 0,
sizeof(overlapped));
95 overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
96 if (!overlapped.hEvent)
100 #elif defined USE_SO_NOSIGPIPE
117 if (setsockopt(
fdout, SOL_SOCKET, SO_NOSIGPIPE,
118 reinterpret_cast<char*
>(&on),
sizeof(on)) < 0) {
125 if (errno != ENOTSOCK && errno != EBADF) {
130 #elif defined USE_MSG_NOSIGNAL
142 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
149 RemoteConnection::~RemoteConnection()
151 if (overlapped.hEvent)
152 CloseHandle(overlapped.hEvent);
159 LOGCALL(REMOTE,
bool,
"RemoteConnection::read_at_least", min_len |
end_time);
164 HANDLE hin = fd_to_handle(
fdin);
168 BOOL ok = ReadFile(hin, buf,
sizeof(buf), &received, &overlapped);
170 int errcode = GetLastError();
171 if (errcode != ERROR_IO_PENDING)
175 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(
end_time));
176 if (waitrc != WAIT_OBJECT_0) {
177 LOGLINE(REMOTE,
"read: timeout has expired");
181 if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
183 context, -
int(GetLastError()));
190 buffer.append(buf, received);
193 update_overlapped_offset(overlapped, received);
194 }
while (
buffer.length() < min_len);
197 if (fcntl(
fdin, F_SETFL, (
end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
204 ssize_t received = read(
fdin, buf,
sizeof(buf));
207 buffer.append(buf, received);
216 LOGLINE(REMOTE,
"read gave errno = " << errno);
217 if (errno == EINTR)
continue;
229 LOGLINE(REMOTE,
"read: timeout has expired");
238 int poll_result = poll(&fds, 1,
int(time_diff * 1000));
239 if (poll_result > 0)
break;
241 if (poll_result == 0)
246 if (errno != EINTR && errno != EAGAIN)
250 if (
fdin >= FD_SETSIZE) {
257 FD_SET(
fdin, &fdset);
261 int select_result = select(
fdin + 1, &fdset, 0, 0, &tv);
262 if (select_result > 0)
break;
264 if (select_result == 0)
271 if (errno != EINTR && errno != EAGAIN)
285 # ifdef USE_MSG_NOSIGNAL
287 ssize_t n = send(
fdout,
p, len, send_flags);
288 if (
usual(n >= 0 || errno != ENOTSOCK))
return n;
295 return write(
fdout,
p, len);
309 string_view header_view = header;
312 HANDLE hout = fd_to_handle(
fdout);
313 const string_view*
str = &header_view;
318 BOOL ok = WriteFile(hout,
str->data() + count,
str->size() - count, &n, &overlapped);
320 int errcode = GetLastError();
321 if (errcode != ERROR_IO_PENDING)
325 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(
end_time));
326 if (waitrc != WAIT_OBJECT_0) {
327 LOGLINE(REMOTE,
"write: timeout has expired");
331 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
333 context, -
int(GetLastError()));
339 update_overlapped_offset(overlapped, n);
341 if (count ==
str->size()) {
342 if (
str == &message || message.empty())
return;
349 if (fcntl(
fdout, F_SETFL, (
end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
354 const string_view*
str = &header_view;
364 if (count ==
str->size()) {
365 if (
str == &message || message.empty())
return;
372 LOGLINE(REMOTE,
"write gave errno = " << errno);
373 if (errno == EINTR)
continue;
381 LOGLINE(REMOTE,
"write: timeout has expired");
389 fds.events = POLLOUT;
390 int result = poll(&fds, 1,
int(time_diff * 1000));
391 # define POLLSELECT "poll"
393 if (
fdout >= FD_SETSIZE) {
401 FD_SET(
fdout, &fdset);
405 int result = select(
fdout + 1, 0, &fdset, 0, &tv);
406 # define POLLSELECT "select"
410 if (errno == EINTR || errno == EAGAIN) {
445 c += enc_size.size();
448 memcpy(buf + 1, enc_size.data(), enc_size.size());
452 HANDLE hout = fd_to_handle(
fdout);
456 BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
458 int errcode = GetLastError();
459 if (errcode != ERROR_IO_PENDING)
463 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(
end_time));
464 if (waitrc != WAIT_OBJECT_0) {
465 LOGLINE(REMOTE,
"write: timeout has expired");
469 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
471 context, -
int(GetLastError()));
477 update_overlapped_offset(overlapped, n);
480 if (size == 0)
return;
484 res = read(fd, buf,
sizeof(buf));
485 }
while (res < 0 && errno == EINTR);
495 if (fcntl(
fdout, F_SETFL, (
end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
509 if (size == 0)
return;
513 res = read(fd, buf,
sizeof(buf));
514 }
while (res < 0 && errno == EINTR);
524 LOGLINE(REMOTE,
"write gave errno = " << errno);
525 if (errno == EINTR)
continue;
533 LOGLINE(REMOTE,
"write: timeout has expired");
541 fds.events = POLLOUT;
542 int result = poll(&fds, 1,
int(time_diff * 1000));
543 # define POLLSELECT "poll"
545 if (
fdout >= FD_SETSIZE) {
553 FD_SET(
fdout, &fdset);
557 int result = select(
fdout + 1, 0, &fdset, 0, &tv);
558 # define POLLSELECT "select"
562 if (errno == EINTR || errno == EAGAIN) {
582 LOGCALL(REMOTE,
int,
"RemoteConnection::sniff_next_message_type",
end_time);
588 unsigned char type =
buffer[0];
595 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message", result |
end_time);
603 size_t len =
static_cast<unsigned char>(
buffer[1]);
607 result.assign(
buffer.data() + 2, len);
608 unsigned char type =
buffer[0];
618 const char* p_end =
p +
buffer.size();
623 size_t header_len = (
p -
buffer.data());
626 result.assign(
buffer.data() + header_len, len);
627 unsigned char type =
buffer[0];
628 buffer.erase(0, header_len + len);
635 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message_chunked",
end_time);
644 uint_least64_t len =
static_cast<unsigned char>(
buffer[1]);
657 const char* p_end =
p +
buffer.size();
667 size_t header_len = (
p -
buffer.data());
668 unsigned char type =
buffer[0];
669 buffer.erase(0, header_len);
677 LOGCALL(REMOTE,
int,
"RemoteConnection::get_message_chunk", result | at_least |
end_time);
681 if (at_least <= result.size())
RETURN(
true);
682 at_least -= result.size();
691 result.append(
buffer, 0, retlen);
703 ssize_t c = write(fd,
p, n);
705 if (errno == EINTR)
continue;
716 LOGCALL(REMOTE,
int,
"RemoteConnection::receive_file", file |
end_time);
732 buffer.erase(0, min_read);
740 LOGCALL_VOID(REMOTE,
"RemoteConnection::shutdown", NO_ARGS);
742 if (
fdin < 0)
return;
748 HANDLE hin = fd_to_handle(
fdin);
751 BOOL ok = ReadFile(hin, &
dummy, 1, &received, &overlapped);
752 if (!ok && GetLastError() == ERROR_IO_PENDING) {
754 (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
765 res = poll(&fds, 1, -1);
766 }
while (res < 0 && (errno == EINTR || errno == EAGAIN));
768 if (
fdin < FD_SETSIZE) {
771 FD_SET(
fdin, &fdset);
774 res = select(
fdin + 1, &fdset, 0, 0, NULL);
775 }
while (res < 0 && (errno == EINTR || errno == EAGAIN));
786 LOGCALL_VOID(REMOTE,
"RemoteConnection::do_close", NO_ARGS);
805 RemoteConnection::calc_read_wait_msecs(
double end_time)
814 if (time_diff < 0.0) {
817 return static_cast<DWORD
>(time_diff * 1000.0);
void send_message(char type, std::string_view s, double end_time)
Send a message.
int fdin
The file descriptor used for reading.
std::string buffer
Buffer to hold unprocessed input.
RemoteConnection(const RemoteConnection &)
Don't allow copying.
int get_message_chunk(std::string &result, size_t at_least, double end_time)
Read a chunk of a message from fdin.
bool read_at_least(size_t min_len, double end_time)
Read until there are at least min_len bytes in buffer.
int fdout
The file descriptor used for writing.
int receive_file(const std::string &file, double end_time)
Save the contents of a message as a file.
off_t chunked_data_left
Remaining bytes of message data still to come over fdin for a chunked read.
void do_close()
Close the connection.
int get_message(std::string &result, double end_time)
Read one message from fdin.
void shutdown()
Shutdown the connection.
int sniff_next_message_type(double end_time)
Check what the next message type is.
int get_message_chunked(double end_time)
Prepare to read one message from fdin in chunks.
std::string context
The context to report with errors.
ssize_t send_or_write(const void *p, size_t n)
Helper which calls send() or write().
void send_file(char type, int fd, double end_time)
Send the contents of a file as a message.
Indicates an attempt to access a closed database.
Indicates a problem communicating with a remote database.
Indicates a timeout expired while communicating with a remote database.
#define LOGCALL(CATEGORY, TYPE, FUNC, PARAMS)
#define LOGCALL_VOID(CATEGORY, FUNC, PARAMS)
Hierarchy of classes which Xapian can throw as exceptions.
Wrapper class around a file descriptor to avoid leaks.
Utility functions for testing files.
file_size_type file_size(const char *path)
Returns the size of a file.
double end_time(double timeout)
Return the end time for a timeout in timeout seconds.
double now()
Return the current time.
void to_timeval(double t, struct timeval *tv)
Fill in struct timeval from number of seconds in a double.
void sleep(double t)
Sleep until the time represented by this object.
string str(int value)
Convert int to std::string.
Various assertion macros.
#define AssertRel(A, REL, B)
Arithmetic operations with overflow checks.
std::enable_if_t< std::is_unsigned_v< T1 > &&std::is_unsigned_v< T2 > &&std::is_unsigned_v< R >, bool > add_overflows(T1 a, T2 b, R &res)
Addition with overflow checking.
Pack types into strings and unpack them again.
bool unpack_uint(const char **p, const char *end, U *result)
Decode an unsigned integer from a string.
void pack_uint(std::string &s, U value)
Append an encoded unsigned integer to a string.
Provides wrappers with POSIXy semantics.
Functions for handling a time or time interval in a double.
static void throw_database_closed()
static void throw_timeout(const char *msg, const string &context)
static void throw_network_error_message_too_long_for_off_t()
static void write_all(int fd, const char *p, size_t n)
Write n bytes from block pointed to by p to file descriptor fd.
RemoteConnection class used by the remote backend.
include <fcntl.h>, but working around broken platforms.
include <sys/select.h> with portability workarounds.
<unistd.h>, but with compat.
Socket handling utilities.
void close_fd_or_socket(int fd)