00001
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022
00023 #include "remoteconnection.h"
00024
00025 #include <xapian/error.h>
00026
00027 #include "safeerrno.h"
00028 #include "safefcntl.h"
00029 #include "safesysselect.h"
00030 #include "safesysstat.h"
00031 #include "safeunistd.h"
00032
00033 #include <algorithm>
00034 #include <string>
00035
00036 #include "debuglog.h"
00037 #include "omassert.h"
00038 #include "realtime.h"
00039 #include "serialise.h"
00040 #include "socket_utils.h"
00041 #include "utils.h"
00042
00043 #ifdef __WIN32__
00044 # include "msvc_posix_wrapper.h"
00045 #endif
00046
00047 using namespace std;
00048
00049 #define CHUNKSIZE 4096
00050
00051 #ifdef __WIN32__
00052 inline void
00053 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
00054 {
00055 STATIC_ASSERT_UNSIGNED_TYPE(DWORD);
00056 overlapped.Offset += n;
00057 if (overlapped.Offset < n) ++overlapped.OffsetHigh;
00058 }
00059 #endif
00060
00061 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
00062 const string & context_)
00063 : fdin(fdin_), fdout(fdout_), context(context_)
00064 {
00065 #ifdef __WIN32__
00066 memset(&overlapped, 0, sizeof(overlapped));
00067 overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
00068 if (!overlapped.hEvent)
00069 throw Xapian::NetworkError("Failed to setup OVERLAPPED",
00070 context, -(int)GetLastError());
00071
00072 #endif
00073 }
00074
00075 RemoteConnection::~RemoteConnection()
00076 {
00077 #ifdef __WIN32__
00078 if (overlapped.hEvent)
00079 CloseHandle(overlapped.hEvent);
00080 #endif
00081 }
00082
00083 void
00084 RemoteConnection::read_at_least(size_t min_len, double end_time)
00085 {
00086 LOGCALL_VOID(REMOTE, "RemoteConnection::read_at_least", min_len | end_time);
00087
00088 if (buffer.length() >= min_len) return;
00089
00090 #ifdef __WIN32__
00091 HANDLE hin = fd_to_handle(fdin);
00092 do {
00093 char buf[CHUNKSIZE];
00094 DWORD received;
00095 BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
00096 if (!ok) {
00097 int errcode = GetLastError();
00098 if (errcode != ERROR_IO_PENDING)
00099 throw Xapian::NetworkError("read failed", context, -errcode);
00100
00101 DWORD waitrc;
00102 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
00103 if (waitrc != WAIT_OBJECT_0) {
00104 LOGLINE(REMOTE, "read: timeout has expired");
00105 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
00106 }
00107
00108 if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
00109 throw Xapian::NetworkError("Failed to get overlapped result",
00110 context, -(int)GetLastError());
00111 }
00112
00113 if (received == 0)
00114 throw Xapian::NetworkError("Received EOF", context);
00115
00116 buffer.append(buf, received);
00117
00118
00119 update_overlapped_offset(overlapped, received);
00120 } while (buffer.length() < min_len);
00121 #else
00122
00123 if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
00124 throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
00125 context, errno);
00126 }
00127
00128 while (true) {
00129 char buf[CHUNKSIZE];
00130 ssize_t received = read(fdin, buf, sizeof(buf));
00131
00132 if (received > 0) {
00133 buffer.append(buf, received);
00134 if (buffer.length() >= min_len) return;
00135 continue;
00136 }
00137
00138 if (received == 0)
00139 throw Xapian::NetworkError("Received EOF", context);
00140
00141 LOGLINE(REMOTE, "read gave errno = " << strerror(errno));
00142 if (errno == EINTR) continue;
00143
00144 if (errno != EAGAIN)
00145 throw Xapian::NetworkError("read failed", context, errno);
00146
00147 Assert(end_time != 0.0);
00148 while (true) {
00149
00150 double time_diff = end_time - RealTime::now();
00151
00152 if (time_diff < 0) {
00153 LOGLINE(REMOTE, "read: timeout has expired");
00154 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
00155 }
00156
00157
00158 fd_set fdset;
00159 FD_ZERO(&fdset);
00160 FD_SET(fdin, &fdset);
00161
00162 struct timeval tv;
00163 tv.tv_sec = long(time_diff);
00164 tv.tv_usec = long(fmod(time_diff, 1.0) * 1000000);
00165
00166 int select_result = select(fdin + 1, &fdset, 0, &fdset, &tv);
00167 if (select_result > 0) break;
00168
00169 if (select_result == 0)
00170 throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
00171
00172
00173 if (errno != EINTR)
00174 throw Xapian::NetworkError("select failed during read", context, errno);
00175 }
00176 }
00177 #endif
00178 }
00179
00180 bool
00181 RemoteConnection::ready_to_read() const
00182 {
00183 LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
00184 if (fdin == -1) {
00185 throw Xapian::DatabaseError("Database has been closed");
00186 }
00187
00188 if (!buffer.empty()) RETURN(true);
00189
00190
00191 fd_set fdset;
00192 FD_ZERO(&fdset);
00193 FD_SET(fdin, &fdset);
00194
00195
00196
00197
00198 struct timeval tv;
00199 tv.tv_sec = 0;
00200 tv.tv_usec = 100000;
00201 RETURN(select(fdin + 1, &fdset, 0, &fdset, &tv) > 0);
00202 }
00203
00204 void
00205 RemoteConnection::send_message(char type, const string &message,
00206 double end_time)
00207 {
00208 LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
00209 if (fdout == -1) {
00210 throw Xapian::DatabaseError("Database has been closed");
00211 }
00212
00213 string header;
00214 header += type;
00215 header += encode_length(message.size());
00216
00217 #ifdef __WIN32__
00218 HANDLE hout = fd_to_handle(fdout);
00219 const string * str = &header;
00220
00221 size_t count = 0;
00222 while (true) {
00223 DWORD n;
00224 BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
00225 if (!ok) {
00226 int errcode = GetLastError();
00227 if (errcode != ERROR_IO_PENDING)
00228 throw Xapian::NetworkError("write failed", context, -errcode);
00229
00230 DWORD waitrc;
00231 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
00232 if (waitrc != WAIT_OBJECT_0) {
00233 LOGLINE(REMOTE, "write: timeout has expired");
00234 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
00235 }
00236
00237 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
00238 throw Xapian::NetworkError("Failed to get overlapped result",
00239 context, -(int)GetLastError());
00240 }
00241
00242 count += n;
00243
00244
00245 update_overlapped_offset(overlapped, n);
00246
00247 if (count == str->size()) {
00248 if (str == &message || message.empty()) return;
00249 str = &message;
00250 count = 0;
00251 }
00252 }
00253 #else
00254
00255 if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
00256 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
00257 context, errno);
00258 }
00259
00260 const string * str = &header;
00261
00262 fd_set fdset;
00263 size_t count = 0;
00264 while (true) {
00265
00266
00267 ssize_t n = write(fdout, str->data() + count, str->size() - count);
00268
00269 if (n >= 0) {
00270 count += n;
00271 if (count == str->size()) {
00272 if (str == &message || message.empty()) return;
00273 str = &message;
00274 count = 0;
00275 }
00276 continue;
00277 }
00278
00279 LOGLINE(REMOTE, "write gave errno = " << strerror(errno));
00280 if (errno == EINTR) continue;
00281
00282 if (errno != EAGAIN)
00283 throw Xapian::NetworkError("write failed", context, errno);
00284
00285
00286 FD_ZERO(&fdset);
00287 FD_SET(fdout, &fdset);
00288
00289 double time_diff = end_time - RealTime::now();
00290 if (time_diff < 0) {
00291 LOGLINE(REMOTE, "write: timeout has expired");
00292 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
00293 }
00294
00295 struct timeval tv;
00296 tv.tv_sec = long(time_diff);
00297 tv.tv_usec = long(fmod(time_diff, 1.0) * 1000000);
00298
00299 int select_result = select(fdout + 1, 0, &fdset, &fdset, &tv);
00300
00301 if (select_result < 0) {
00302 if (errno == EINTR) {
00303
00304
00305
00306 continue;
00307 }
00308 throw Xapian::NetworkError("select failed during write", context, errno);
00309 }
00310
00311 if (select_result == 0)
00312 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
00313 }
00314 #endif
00315 }
00316
00317 void
00318 RemoteConnection::send_file(char type, int fd, double end_time)
00319 {
00320 LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
00321 if (fdout == -1) {
00322 throw Xapian::DatabaseError("Database has been closed");
00323 }
00324
00325 off_t size;
00326 {
00327 struct stat sb;
00328 if (fstat(fd, &sb) == -1)
00329 throw Xapian::NetworkError("Couldn't stat file to send", errno);
00330 size = sb.st_size;
00331 }
00332
00333
00334 char buf[CHUNKSIZE];
00335 buf[0] = type;
00336 size_t c = 1;
00337 {
00338 string enc_size = encode_length(size);
00339 c += enc_size.size();
00340
00341 AssertRel(c, <=, sizeof(buf));
00342 memcpy(buf + 1, enc_size.data(), enc_size.size());
00343 }
00344
00345 #ifdef __WIN32__
00346 HANDLE hout = fd_to_handle(fdout);
00347 size_t count = 0;
00348 while (true) {
00349 DWORD n;
00350 BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
00351 if (!ok) {
00352 int errcode = GetLastError();
00353 if (errcode != ERROR_IO_PENDING)
00354 throw Xapian::NetworkError("write failed", context, -errcode);
00355
00356 DWORD waitrc;
00357 waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
00358 if (waitrc != WAIT_OBJECT_0) {
00359 LOGLINE(REMOTE, "write: timeout has expired");
00360 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
00361 }
00362
00363 if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
00364 throw Xapian::NetworkError("Failed to get overlapped result",
00365 context, -(int)GetLastError());
00366 }
00367
00368 count += n;
00369
00370
00371 update_overlapped_offset(overlapped, n);
00372
00373 if (count == c) {
00374 if (size == 0) return;
00375
00376 ssize_t res;
00377 do {
00378 res = read(fd, buf, sizeof(buf));
00379 } while (res < 0 && errno == EINTR);
00380 if (res < 0) throw Xapian::NetworkError("read failed", errno);
00381 c = size_t(res);
00382
00383 size -= c;
00384 count = 0;
00385 }
00386 }
00387 #else
00388
00389 if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
00390 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
00391 context, errno);
00392 }
00393
00394 fd_set fdset;
00395 size_t count = 0;
00396 while (true) {
00397
00398
00399 ssize_t n = write(fdout, buf + count, c - count);
00400
00401 if (n >= 0) {
00402 count += n;
00403 if (count == c) {
00404 if (size == 0) return;
00405
00406 ssize_t res;
00407 do {
00408 res = read(fd, buf, sizeof(buf));
00409 } while (res < 0 && errno == EINTR);
00410 if (res < 0) throw Xapian::NetworkError("read failed", errno);
00411 c = size_t(res);
00412
00413 size -= c;
00414 count = 0;
00415 }
00416 continue;
00417 }
00418
00419 LOGLINE(REMOTE, "write gave errno = " << strerror(errno));
00420 if (errno == EINTR) continue;
00421
00422 if (errno != EAGAIN)
00423 throw Xapian::NetworkError("write failed", context, errno);
00424
00425
00426 FD_ZERO(&fdset);
00427 FD_SET(fdout, &fdset);
00428
00429 double time_diff = end_time - RealTime::now();
00430 if (time_diff < 0) {
00431 LOGLINE(REMOTE, "write: timeout has expired");
00432 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
00433 }
00434
00435 struct timeval tv;
00436 tv.tv_sec = long(time_diff);
00437 tv.tv_usec = long(fmod(time_diff, 1.0) * 1000000);
00438
00439 int select_result = select(fdout + 1, 0, &fdset, &fdset, &tv);
00440
00441 if (select_result < 0) {
00442 if (errno == EINTR) {
00443
00444
00445
00446 continue;
00447 }
00448 throw Xapian::NetworkError("select failed during write", context, errno);
00449 }
00450
00451 if (select_result == 0)
00452 throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
00453 }
00454 #endif
00455 }
00456
00457 char
00458 RemoteConnection::sniff_next_message_type(double end_time)
00459 {
00460 LOGCALL(REMOTE, char, "RemoteConnection::sniff_next_message_type", end_time);
00461 if (fdin == -1) {
00462 throw Xapian::DatabaseError("Database has been closed");
00463 }
00464
00465 read_at_least(1, end_time);
00466 char type = buffer[0];
00467 RETURN(type);
00468 }
00469
00470 char
00471 RemoteConnection::get_message(string &result, double end_time)
00472 {
00473 LOGCALL(REMOTE, char, "RemoteConnection::get_message", result | end_time);
00474 if (fdin == -1) {
00475 throw Xapian::DatabaseError("Database has been closed");
00476 }
00477
00478 read_at_least(2, end_time);
00479 size_t len = static_cast<unsigned char>(buffer[1]);
00480 read_at_least(len + 2, end_time);
00481 if (len != 0xff) {
00482 result.assign(buffer.data() + 2, len);
00483 char type = buffer[0];
00484 buffer.erase(0, len + 2);
00485 RETURN(type);
00486 }
00487 len = 0;
00488 string::const_iterator i = buffer.begin() + 2;
00489 unsigned char ch;
00490 int shift = 0;
00491 do {
00492 if (i == buffer.end() || shift > 28) {
00493
00494 throw Xapian::NetworkError("Insane message length specified!");
00495 }
00496 ch = *i++;
00497 len |= size_t(ch & 0x7f) << shift;
00498 shift += 7;
00499 } while ((ch & 0x80) == 0);
00500 len += 255;
00501 size_t header_len = (i - buffer.begin());
00502 read_at_least(header_len + len, end_time);
00503 result.assign(buffer.data() + header_len, len);
00504 char type = buffer[0];
00505 buffer.erase(0, header_len + len);
00506 RETURN(type);
00507 }
00508
00509 char
00510 RemoteConnection::get_message_chunked(double end_time)
00511 {
00512 LOGCALL(REMOTE, char, "RemoteConnection::get_message_chunked", end_time);
00513 if (fdin == -1) {
00514 throw Xapian::DatabaseError("Database has been closed");
00515 }
00516
00517 read_at_least(2, end_time);
00518 off_t len = static_cast<unsigned char>(buffer[1]);
00519 if (len != 0xff) {
00520 chunked_data_left = len;
00521 char type = buffer[0];
00522 buffer.erase(0, 2);
00523 RETURN(type);
00524 }
00525 read_at_least(len + 2, end_time);
00526 len = 0;
00527 string::const_iterator i = buffer.begin() + 2;
00528 unsigned char ch;
00529 int shift = 0;
00530 do {
00531
00532
00533 if (i == buffer.end() || shift > 63) {
00534
00535 throw Xapian::NetworkError("Insane message length specified!");
00536 }
00537 ch = *i++;
00538 len |= off_t(ch & 0x7f) << shift;
00539 shift += 7;
00540 } while ((ch & 0x80) == 0);
00541 len += 255;
00542 chunked_data_left = len;
00543 char type = buffer[0];
00544 size_t header_len = (i - buffer.begin());
00545 buffer.erase(0, header_len);
00546 RETURN(type);
00547 }
00548
00549 bool
00550 RemoteConnection::get_message_chunk(string &result, size_t at_least,
00551 double end_time)
00552 {
00553 LOGCALL(REMOTE, bool, "RemoteConnection::get_message_chunk", result | at_least | end_time);
00554 if (fdin == -1) {
00555 throw Xapian::DatabaseError("Database has been closed");
00556 }
00557
00558 if (at_least <= result.size()) RETURN(true);
00559 at_least -= result.size();
00560
00561 bool read_enough = (off_t(at_least) <= chunked_data_left);
00562 if (!read_enough) at_least = size_t(chunked_data_left);
00563
00564 read_at_least(at_least, end_time);
00565
00566 size_t retlen(min(off_t(buffer.size()), chunked_data_left));
00567 result.append(buffer, 0, retlen);
00568 buffer.erase(0, retlen);
00569 chunked_data_left -= retlen;
00570
00571 RETURN(read_enough);
00572 }
00573
00575 static void
00576 write_all(int fd, const char * p, size_t n)
00577 {
00578 while (n) {
00579 ssize_t c = write(fd, p, n);
00580 if (c < 0) {
00581 if (errno == EINTR) continue;
00582 throw Xapian::NetworkError("Error writing to file", errno);
00583 }
00584 p += c;
00585 n -= c;
00586 }
00587 }
00588
00589 char
00590 RemoteConnection::receive_file(const string &file, double end_time)
00591 {
00592 LOGCALL(REMOTE, char, "RemoteConnection::receive_file", file | end_time);
00593 if (fdin == -1) {
00594 throw Xapian::DatabaseError("Database has been closed");
00595 }
00596
00597 #ifdef __WIN32__
00598
00599 int fd = msvc_posix_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC);
00600 #else
00601 int fd = open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666);
00602 #endif
00603 if (fd == -1) throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
00604 fdcloser closefd(fd);
00605
00606 read_at_least(2, end_time);
00607 size_t len = static_cast<unsigned char>(buffer[1]);
00608 read_at_least(len + 2, end_time);
00609 if (len != 0xff) {
00610 write_all(fd, buffer.data() + 2, len);
00611 char type = buffer[0];
00612 buffer.erase(0, len + 2);
00613 RETURN(type);
00614 }
00615 len = 0;
00616 string::const_iterator i = buffer.begin() + 2;
00617 unsigned char ch;
00618 int shift = 0;
00619 do {
00620 if (i == buffer.end() || shift > 28) {
00621
00622 throw Xapian::NetworkError("Insane message length specified!");
00623 }
00624 ch = *i++;
00625 len |= size_t(ch & 0x7f) << shift;
00626 shift += 7;
00627 } while ((ch & 0x80) == 0);
00628 len += 255;
00629 size_t header_len = (i - buffer.begin());
00630 size_t remainlen(min(buffer.size() - header_len, len));
00631 write_all(fd, buffer.data() + header_len, remainlen);
00632 len -= remainlen;
00633 char type = buffer[0];
00634 buffer.erase(0, header_len + remainlen);
00635 while (len > 0) {
00636 read_at_least(min(len, size_t(CHUNKSIZE)), end_time);
00637 remainlen = min(buffer.size(), len);
00638 write_all(fd, buffer.data(), remainlen);
00639 len -= remainlen;
00640 buffer.erase(0, remainlen);
00641 }
00642 RETURN(type);
00643 }
00644
00645 void
00646 RemoteConnection::do_close(bool wait)
00647 {
00648 LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", wait);
00649
00650 if (fdin >= 0) {
00651 if (wait) {
00652
00653
00654 try {
00655 send_message(MSG_SHUTDOWN, string(), 0.0);
00656 } catch (...) {
00657 }
00658 #ifdef __WIN32__
00659 HANDLE hin = fd_to_handle(fdin);
00660 char dummy;
00661 DWORD received;
00662 BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
00663 if (!ok && GetLastError() == ERROR_IO_PENDING) {
00664
00665 (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
00666 }
00667 #else
00668
00669
00670 fd_set fdset;
00671 FD_ZERO(&fdset);
00672 FD_SET(fdin, &fdset);
00673 int res;
00674 do {
00675 res = select(fdin + 1, &fdset, 0, &fdset, NULL);
00676 } while (res < 0 && errno == EINTR);
00677 #endif
00678 }
00679 close_fd_or_socket(fdin);
00680
00681
00682 if (fdin == fdout) fdout = -1;
00683
00684 fdin = -1;
00685 }
00686
00687 if (fdout >= 0) {
00688 close_fd_or_socket(fdout);
00689 fdout = -1;
00690 }
00691 }
00692
00693 #ifdef __WIN32__
00694 DWORD
00695 RemoteConnection::calc_read_wait_msecs(double end_time)
00696 {
00697 if (end_time == 0.0)
00698 return INFINITE;
00699
00700
00701 double time_diff = end_time - RealTime::now();
00702
00703
00704 if (time_diff < 0.0) {
00705 throw Xapian::NetworkTimeoutError("Timeout expired before starting read", context);
00706 }
00707 return static_cast<DWORD>(time_diff * 1000.0);
00708 }
00709 #endif