xapian-core  1.4.22
backendmanager_remotetcp.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2006,2007,2008,2009,2013,2015 Olly Betts
5  * Copyright (C) 2008 Lemur Consulting Ltd
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <config.h>
23 
25 
26 #include <xapian.h>
27 
28 #include <stdio.h> // For fdopen().
29 #include <cerrno>
30 #include <cstring>
31 
32 #ifdef HAVE_FORK
33 # include <signal.h>
34 # include <sys/types.h>
35 # include "safesyssocket.h"
36 # include <sys/wait.h>
37 # include <unistd.h>
38 // Some older systems had SIGCLD rather than SIGCHLD.
39 # if !defined SIGCHLD && defined SIGCLD
40 # define SIGCHLD SIGCLD
41 # endif
42 #endif
43 
44 #ifdef __WIN32__
45 # include <io.h> // For _open_osfhandle().
46 # include "safefcntl.h"
47 # include "safewindows.h"
48 # include <cstdlib> // For free().
49 #endif
50 
51 #include "errno_to_string.h"
52 #include "noreturn.h"
53 #include "str.h"
54 
55 #include <string>
56 #include <vector>
57 
58 #ifdef HAVE_VALGRIND
59 # include <valgrind/memcheck.h>
60 #endif
61 
62 using namespace std;
63 
64 // We've had problems on some hosts which run tinderbox tests with "localhost"
65 // not being set in /etc/hosts - using the IP address equivalent seems more
66 // reliable.
67 #define LOCALHOST "127.0.0.1"
68 
69 // Start at DEFAULT port and try higher ports until one isn't already in use.
70 #define DEFAULT_PORT 1239
71 
72 #ifdef HAVE_FORK
73 
74 // We can't dynamically allocate memory for this because it confuses the leak
75 // detector. We only have 1-3 child fds open at once anyway, so a fixed size
76 // array isn't a problem, and linear scanning isn't a problem either.
77 struct pid_fd {
78  pid_t pid;
79  int fd;
80 };
81 
82 static pid_fd pid_to_fd[16];
83 
84 extern "C" {
85 
86 static void
87 on_SIGCHLD(int /*sig*/)
88 {
89  int status;
90  pid_t child;
91  while ((child = waitpid(-1, &status, WNOHANG)) > 0) {
92  for (unsigned i = 0; i < sizeof(pid_to_fd) / sizeof(pid_fd); ++i) {
93  if (pid_to_fd[i].pid == child) {
94  int fd = pid_to_fd[i].fd;
95  pid_to_fd[i].fd = 0;
96  pid_to_fd[i].pid = 0;
97  // NB close() *is* safe to use in a signal handler.
98  close(fd);
99  break;
100  }
101  }
102  }
103 }
104 
105 }
106 
107 static int
108 launch_xapian_tcpsrv(const string & args)
109 {
110  int port = DEFAULT_PORT;
111 
112  // We want to be able to get the exit status of the child process we fork
113  // if xapian-tcpsrv doesn't start listening successfully.
114  signal(SIGCHLD, SIG_DFL);
115 try_next_port:
116  string cmd = XAPIAN_TCPSRV " --one-shot --interface " LOCALHOST " --port ";
117  cmd += str(port);
118  cmd += " ";
119  cmd += args;
120 #ifdef HAVE_VALGRIND
121  if (RUNNING_ON_VALGRIND) cmd = "./runsrv " + cmd;
122 #endif
123  int fds[2];
124  if (socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, PF_UNSPEC, fds) < 0) {
125  string msg("Couldn't create socketpair: ");
126  errno_to_string(errno, msg);
127  throw msg;
128  }
129 
130  pid_t child = fork();
131  if (child == 0) {
132  // Child process.
133  close(fds[0]);
134  // Connect stdout and stderr to the socket.
135  //
136  // Make sure the socket isn't fd 1 or 2. We need to ensure that
137  // FD_CLOEXEC isn't set for stdout or stderr (which creating them with
138  // dup2() achieves), and that we close fds[1]. The cleanest way to
139  // address this seems to be to turn the unusual situation into the
140  // usual one.
141  if (fds[1] == 1 || fds[1] == 2) {
142  dup2(fds[1], 3);
143  fds[1] = 3;
144  }
145  dup2(fds[1], 1);
146  dup2(fds[1], 2);
147  close(fds[1]);
148  execl("/bin/sh", "/bin/sh", "-c", cmd.c_str(), static_cast<void*>(0));
149  _exit(-1);
150  }
151 
152  close(fds[1]);
153  if (child == -1) {
154  // Couldn't fork.
155  int fork_errno = errno;
156  close(fds[0]);
157  string msg("Couldn't fork: ");
158  errno_to_string(fork_errno, msg);
159  throw msg;
160  }
161 
162  // Parent process.
163 
164  // Wrap the file descriptor in a FILE * so we can read lines using fgets().
165  FILE * fh = fdopen(fds[0], "r");
166  if (fh == NULL) {
167  string msg("Failed to run command '");
168  msg += cmd;
169  msg += "': ";
170  errno_to_string(errno, msg);
171  throw msg;
172  }
173 
174  string output;
175  while (true) {
176  char buf[256];
177  if (fgets(buf, sizeof(buf), fh) == NULL) {
178  fclose(fh);
179  // Wait for the child to exit.
180  int status;
181  if (waitpid(child, &status, 0) == -1) {
182  string msg("waitpid failed: ");
183  errno_to_string(errno, msg);
184  throw msg;
185  }
186  if (++port < 65536 && status != 0) {
187  if (WIFEXITED(status) && WEXITSTATUS(status) == 69) {
188  // 69 is EX_UNAVAILABLE which xapian-tcpsrv exits
189  // with if (and only if) the port specified was
190  // in use.
191  goto try_next_port;
192  }
193  }
194  string msg("Failed to get 'Listening...' from command '");
195  msg += cmd;
196  msg += "' (output: ";
197  msg += output;
198  msg += ")";
199  throw msg;
200  }
201  if (strcmp(buf, "Listening...\n") == 0) break;
202  output += buf;
203  }
204 
205  // dup() the fd we wrapped with fdopen() so we can keep it open so the
206  // xapian-tcpsrv keeps running.
207  int tracked_fd = dup(fds[0]);
208 
209  // We must fclose() the FILE* to avoid valgrind detecting memory leaks from
210  // its buffers.
211  fclose(fh);
212 
213  // Find a slot to track the pid->fd mapping in. If we can't find a slot
214  // it just means we'll leak the fd, so don't worry about that too much.
215  for (unsigned i = 0; i < sizeof(pid_to_fd) / sizeof(pid_fd); ++i) {
216  if (pid_to_fd[i].pid == 0) {
217  pid_to_fd[i].fd = tracked_fd;
218  pid_to_fd[i].pid = child;
219  break;
220  }
221  }
222 
223  // Set a signal handler to clean up the xapian-tcpsrv child process when it
224  // finally exits.
225  signal(SIGCHLD, on_SIGCHLD);
226 
227  return port;
228 }
229 
230 #elif defined __WIN32__
231 
232 static HANDLE tcpsrv_handles[16];
233 static unsigned tcpsrv_handles_index = 0;
234 
235 static constexpr auto TCPSRV_HANDLES_INDEX_MAX =
236  sizeof(tcpsrv_handles) / sizeof(tcpsrv_handles[0]);
237 
238 XAPIAN_NORETURN(static void win32_throw_error_string(const char * str));
239 static void win32_throw_error_string(const char * str)
240 {
241  string msg(str);
242  char * error = 0;
243  DWORD len;
244  len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_ALLOCATE_BUFFER,
245  0, GetLastError(), 0, (CHAR*)&error, 0, 0);
246  if (error) {
247  // Remove any trailing \r\n from output of FormatMessage.
248  if (len >= 2 && error[len - 2] == '\r' && error[len - 1] == '\n')
249  len -= 2;
250  if (len) {
251  msg += ": ";
252  msg.append(error, len);
253  }
254  LocalFree(error);
255  }
256  throw msg;
257 }
258 
259 // This implementation uses the WIN32 API to start xapian-tcpsrv as a child
260 // process and read its output using a pipe.
261 static int
262 launch_xapian_tcpsrv(const string & args)
263 {
264  int port = DEFAULT_PORT;
265 
266 try_next_port:
267  string cmd = XAPIAN_TCPSRV " --one-shot --interface " LOCALHOST " --port ";
268  cmd += str(port);
269  cmd += " ";
270  cmd += args;
271 
272  // Create a pipe so we can read stdout/stderr from the child process.
273  HANDLE hRead, hWrite;
274  if (!CreatePipe(&hRead, &hWrite, 0, 0))
275  win32_throw_error_string("Couldn't create pipe");
276 
277  // Set the write handle to be inherited by the child process.
278  SetHandleInformation(hWrite, HANDLE_FLAG_INHERIT, 1);
279 
280  // Create the child process.
281  PROCESS_INFORMATION procinfo;
282  memset(&procinfo, 0, sizeof(PROCESS_INFORMATION));
283 
284  STARTUPINFO startupinfo;
285  memset(&startupinfo, 0, sizeof(STARTUPINFO));
286  startupinfo.cb = sizeof(STARTUPINFO);
287  startupinfo.hStdError = hWrite;
288  startupinfo.hStdOutput = hWrite;
289  startupinfo.hStdInput = INVALID_HANDLE_VALUE;
290  startupinfo.dwFlags |= STARTF_USESTDHANDLES;
291 
292  // For some reason Windows wants a modifiable copy!
293  BOOL ok;
294  char * cmdline = strdup(cmd.c_str());
295  ok = CreateProcess(0, cmdline, 0, 0, TRUE, 0, 0, 0, &startupinfo, &procinfo);
296  free(cmdline);
297  if (!ok)
298  win32_throw_error_string("Couldn't create child process");
299 
300  CloseHandle(hWrite);
301  CloseHandle(procinfo.hThread);
302 
303  string output;
304  FILE *fh = fdopen(_open_osfhandle(intptr_t(hRead), O_RDONLY), "r");
305  while (true) {
306  char buf[256];
307  if (fgets(buf, sizeof(buf), fh) == NULL) {
308  fclose(fh);
309  DWORD rc;
310  // This doesn't seem to be necessary on the machine I tested on,
311  // but I guess it could be on a slow machine...
312  while (GetExitCodeProcess(procinfo.hProcess, &rc) && rc == STILL_ACTIVE) {
313  Sleep(100);
314  }
315  CloseHandle(procinfo.hProcess);
316  if (++port < 65536 && rc == 69) {
317  // 69 is EX_UNAVAILABLE which xapian-tcpsrv exits
318  // with if (and only if) the port specified was
319  // in use.
320  goto try_next_port;
321  }
322  string msg("Failed to get 'Listening...' from command '");
323  msg += cmd;
324  msg += "' (output: ";
325  msg += output;
326  msg += ")";
327  throw msg;
328  }
329  if (strcmp(buf, "Listening...\r\n") == 0) break;
330  output += buf;
331  }
332  fclose(fh);
333 
334  if (tcpsrv_handles_index < TCPSRV_HANDLES_INDEX_MAX) {
335  tcpsrv_handles[tcpsrv_handles_index++] = procinfo.hProcess;
336  }
337 
338  return port;
339 }
340 
341 #else
342 # error Neither HAVE_FORK nor __WIN32__ is defined
343 #endif
344 
347 }
348 
349 std::string
351 {
352  return "remotetcp_" + sub_manager->get_dbtype();
353 }
354 
356 BackendManagerRemoteTcp::do_get_database(const vector<string> & files)
357 {
358  // Default to a long (5 minute) timeout so that tests won't fail just
359  // because the host is slow or busy.
360  return BackendManagerRemoteTcp::get_remote_database(files, 300000);
361 }
362 
365  const string & file)
366 {
367  string args = get_writable_database_args(name, file);
368  int port = launch_xapian_tcpsrv(args);
370 }
371 
373 BackendManagerRemoteTcp::get_remote_database(const vector<string> & files,
374  unsigned int timeout)
375 {
376  string args = get_remote_database_args(files, timeout);
377  int port = launch_xapian_tcpsrv(args);
378  return Xapian::Remote::open(LOCALHOST, port);
379 }
380 
383 {
384  string args = get_remote_database_args(path, 300000);
385  int port = launch_xapian_tcpsrv(args);
386  return Xapian::Remote::open(LOCALHOST, port);
387 }
388 
391 {
392  string args = get_writable_database_as_database_args();
393  int port = launch_xapian_tcpsrv(args);
394  return Xapian::Remote::open(LOCALHOST, port);
395 }
396 
399 {
400  string args = get_writable_database_again_args();
401  int port = launch_xapian_tcpsrv(args);
403 }
404 
405 void
407 {
408 #ifdef HAVE_FORK
409  signal(SIGCHLD, SIG_DFL);
410  for (unsigned i = 0; i < sizeof(pid_to_fd) / sizeof(pid_fd); ++i) {
411  pid_t child = pid_to_fd[i].pid;
412  if (child) {
413  int status;
414  while (waitpid(child, &status, 0) == -1 && errno == EINTR) { }
415  // Other possible error from waitpid is ECHILD, which it seems can
416  // only mean that the child has already exited and SIGCHLD was set
417  // to SIG_IGN. If we did somehow see that, the sanest response
418  // seems to be to close the fd and move on.
419  int fd = pid_to_fd[i].fd;
420  pid_to_fd[i].fd = 0;
421  pid_to_fd[i].pid = 0;
422  close(fd);
423  }
424  }
425 #elif defined __WIN32__
426  for (unsigned i = 0; i != tcpsrv_handles_index; ++i) {
427  WaitForSingleObject(tcpsrv_handles[i], INFINITE);
428  CloseHandle(tcpsrv_handles[i]);
429  }
430  tcpsrv_handles_index = 0;
431 #endif
432 }
int close(FD &fd)
Definition: fd.h:63
Xapian::WritableDatabase get_writable_database(const std::string &name, const std::string &file)
Create a RemoteTcp Xapian::WritableDatabase object indexing a single file.
Define the XAPIAN_NORETURN macro.
This class is used to access a database, or a group of databases.
Definition: database.h:68
Xapian::Database do_get_database(const std::vector< std::string > &files)
Create a Xapian::Database object indexing multiple files.
unsigned timeout
A timeout value in milliseconds.
Definition: types.h:100
std::string get_dbtype() const
Return a string representing the current database type.
include <sys/socket.h> with portability workarounds.
Convert errno value to std::string, thread-safe if possible.
#define XAPIAN_TCPSRV
STL namespace.
Convert types to std::string.
BackendManager subclass for remotetcp databases.
#define SOCK_CLOEXEC
Definition: safesyssocket.h:93
Xapian::Database get_writable_database_as_database()
Create a Database object for the last opened WritableDatabase.
This class provides read/write access to a database.
Definition: database.h:785
void errno_to_string(int e, string &s)
Database open(const std::string &host, unsigned int port, useconds_t timeout=10000, useconds_t connect_timeout=10000)
Construct a Database object for read-only access to a remote database accessed via a TCP connection...
Public interfaces for the Xapian library.
string str(int value)
Convert int to std::string.
Definition: str.cc:90
#define DEFAULT_PORT
Xapian::Database get_remote_database(const std::vector< std::string > &files, unsigned int timeout)
Create a RemoteTcp Xapian::Database with the specified timeout.
Xapian::Database get_database_by_path(const std::string &path)
Get a RemoteTcp Xapian::Database instance of the database at path.
include <windows.h> without all the bloat and damage.
Xapian::WritableDatabase get_writable_database_again()
Create a WritableDatabase object for the last opened WritableDatabase.
#define LOCALHOST
Definition: header.h:151
WritableDatabase open_writable(const std::string &host, unsigned int port, useconds_t timeout=0, useconds_t connect_timeout=10000, int flags=0)
Construct a WritableDatabase object for update access to a remote database accessed via a TCP connect...
void clean_up()
Called after each test, to perform any necessary cleanup.
include <fcntl.h>, but working around broken platforms.