xapian-core  1.4.27
backendmanager_remotetcp.cc
Go to the documentation of this file.
1 
4 /* Copyright (C) 2006,2007,2008,2009,2013,2015 Olly Betts
5  * Copyright (C) 2008 Lemur Consulting Ltd
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <config.h>
23 
25 
26 #include <xapian.h>
27 
28 #include <stdio.h> // For fdopen().
29 #include <cerrno>
30 #include <cstring>
31 
32 #include "safesysexits.h"
33 
34 #ifdef HAVE_FORK
35 # include <signal.h>
36 # include <sys/types.h>
37 # include "safesyssocket.h"
38 # include <sys/wait.h>
39 # include <unistd.h>
40 // Some older systems had SIGCLD rather than SIGCHLD.
41 # if !defined SIGCHLD && defined SIGCLD
42 # define SIGCHLD SIGCLD
43 # endif
44 #endif
45 
46 #ifdef __WIN32__
47 # include <io.h> // For _open_osfhandle().
48 # include "safefcntl.h"
49 # include "safewindows.h"
50 # include <cstdlib> // For free().
51 #endif
52 
53 #include "errno_to_string.h"
54 #include "noreturn.h"
55 #include "str.h"
56 
57 #include <string>
58 #include <vector>
59 
60 #ifdef HAVE_VALGRIND
61 # include <valgrind/memcheck.h>
62 #endif
63 
64 using namespace std;
65 
66 // We've had problems on some hosts which run tinderbox tests with "localhost"
67 // not being set in /etc/hosts - using the IP address equivalent seems more
68 // reliable.
69 #define LOCALHOST "127.0.0.1"
70 
71 // Start at DEFAULT port and try higher ports until one isn't already in use.
72 #define DEFAULT_PORT 1239
73 
74 #ifdef HAVE_FORK
75 
76 // We can't dynamically allocate memory for this because it confuses the leak
77 // detector. We only have 1-3 child fds open at once anyway, so a fixed size
78 // array isn't a problem, and linear scanning isn't a problem either.
79 struct pid_fd {
80  pid_t pid;
81  int fd;
82 };
83 
84 static pid_fd pid_to_fd[16];
85 
86 extern "C" {
87 
88 static void
89 on_SIGCHLD(int /*sig*/)
90 {
91  int status;
92  pid_t child;
93  while ((child = waitpid(-1, &status, WNOHANG)) > 0) {
94  for (unsigned i = 0; i < sizeof(pid_to_fd) / sizeof(pid_fd); ++i) {
95  if (pid_to_fd[i].pid == child) {
96  int fd = pid_to_fd[i].fd;
97  pid_to_fd[i].fd = 0;
98  pid_to_fd[i].pid = 0;
99  // NB close() *is* safe to use in a signal handler.
100  close(fd);
101  break;
102  }
103  }
104  }
105 }
106 
107 }
108 
109 static int
110 launch_xapian_tcpsrv(const string & args)
111 {
112  int port = DEFAULT_PORT;
113 
114  // We want to be able to get the exit status of the child process we fork
115  // if xapian-tcpsrv doesn't start listening successfully.
116  signal(SIGCHLD, SIG_DFL);
117 try_next_port:
118  string cmd = XAPIAN_TCPSRV " --one-shot --interface " LOCALHOST " --port ";
119  cmd += str(port);
120  cmd += " ";
121  cmd += args;
122 #ifdef HAVE_VALGRIND
123  if (RUNNING_ON_VALGRIND) cmd = "./runsrv " + cmd;
124 #endif
125  int fds[2];
126  if (socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, PF_UNSPEC, fds) < 0) {
127  string msg("Couldn't create socketpair: ");
128  errno_to_string(errno, msg);
129  throw msg;
130  }
131 
132  pid_t child = fork();
133  if (child == 0) {
134  // Put this process into its own process group so that we can kill the
135  // server itself easily by killing the process group. Just killing
136  // `child` only kills the /bin/sh and leaves the server running.
137  setpgid(0, 0);
138  // Child process.
139  close(fds[0]);
140  // Connect stdout and stderr to the socket.
141  //
142  // Make sure the socket isn't fd 1 or 2. We need to ensure that
143  // FD_CLOEXEC isn't set for stdout or stderr (which creating them with
144  // dup2() achieves), and that we close fds[1]. The cleanest way to
145  // address this seems to be to turn the unusual situation into the
146  // usual one.
147  if (fds[1] == 1 || fds[1] == 2) {
148  dup2(fds[1], 3);
149  fds[1] = 3;
150  }
151  dup2(fds[1], 1);
152  dup2(fds[1], 2);
153  close(fds[1]);
154  execl("/bin/sh", "/bin/sh", "-c", cmd.c_str(), static_cast<void*>(0));
155  _exit(-1);
156  }
157 
158  close(fds[1]);
159  if (child == -1) {
160  // Couldn't fork.
161  int fork_errno = errno;
162  close(fds[0]);
163  string msg("Couldn't fork: ");
164  errno_to_string(fork_errno, msg);
165  throw msg;
166  }
167 
168  // Parent process.
169 
170  // Wrap the file descriptor in a FILE * so we can read lines using fgets().
171  FILE * fh = fdopen(fds[0], "r");
172  if (fh == NULL) {
173  string msg("Failed to run command '");
174  msg += cmd;
175  msg += "': ";
176  errno_to_string(errno, msg);
177  throw msg;
178  }
179 
180  string output;
181  while (true) {
182  char buf[256];
183  if (fgets(buf, sizeof(buf), fh) == NULL) {
184  fclose(fh);
185  // Wait for the child to exit.
186  int status;
187  if (waitpid(child, &status, 0) == -1) {
188  string msg("waitpid failed: ");
189  errno_to_string(errno, msg);
190  throw msg;
191  }
192  if (++port < 65536 && status != 0) {
193  if (WIFEXITED(status) &&
194  WEXITSTATUS(status) == EX_UNAVAILABLE) {
195  // Exit code EX_UNAVAILABLE from xapian-tcpsrv means the
196  // specified port was in use.
197  goto try_next_port;
198  }
199  }
200  string msg("Failed to get 'Listening...' from command '");
201  msg += cmd;
202  msg += "' (output: ";
203  msg += output;
204  msg += ")";
205  throw msg;
206  }
207  if (strcmp(buf, "Listening...\n") == 0) break;
208  output += buf;
209  }
210 
211  // dup() the fd we wrapped with fdopen() so we can keep it open so the
212  // xapian-tcpsrv keeps running.
213  int tracked_fd = dup(fds[0]);
214 
215  // We must fclose() the FILE* to avoid valgrind detecting memory leaks from
216  // its buffers.
217  fclose(fh);
218 
219  // Find a slot to track the pid->fd mapping in. If we can't find a slot
220  // it just means we'll leak the fd, so don't worry about that too much.
221  for (unsigned i = 0; i < sizeof(pid_to_fd) / sizeof(pid_fd); ++i) {
222  if (pid_to_fd[i].pid == 0) {
223  pid_to_fd[i].fd = tracked_fd;
224  pid_to_fd[i].pid = child;
225  break;
226  }
227  }
228 
229  // Set a signal handler to clean up the xapian-tcpsrv child process when it
230  // finally exits.
231  signal(SIGCHLD, on_SIGCHLD);
232 
233  return port;
234 }
235 
236 #elif defined __WIN32__
237 
238 static HANDLE tcpsrv_handles[16];
239 static unsigned tcpsrv_handles_index = 0;
240 
241 static constexpr auto TCPSRV_HANDLES_INDEX_MAX =
242  sizeof(tcpsrv_handles) / sizeof(tcpsrv_handles[0]);
243 
244 XAPIAN_NORETURN(static void win32_throw_error_string(const char * str));
245 static void win32_throw_error_string(const char * str)
246 {
247  string msg(str);
248  char * error = 0;
249  DWORD len;
250  len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_ALLOCATE_BUFFER,
251  0, GetLastError(), 0, (CHAR*)&error, 0, 0);
252  if (error) {
253  // Remove any trailing \r\n from output of FormatMessage.
254  if (len >= 2 && error[len - 2] == '\r' && error[len - 1] == '\n')
255  len -= 2;
256  if (len) {
257  msg += ": ";
258  msg.append(error, len);
259  }
260  LocalFree(error);
261  }
262  throw msg;
263 }
264 
265 // This implementation uses the WIN32 API to start xapian-tcpsrv as a child
266 // process and read its output using a pipe.
267 static int
268 launch_xapian_tcpsrv(const string & args)
269 {
270  int port = DEFAULT_PORT;
271 
272 try_next_port:
273  string cmd = XAPIAN_TCPSRV " --one-shot --interface " LOCALHOST " --port ";
274  cmd += str(port);
275  cmd += " ";
276  cmd += args;
277 
278  // Create a pipe so we can read stdout/stderr from the child process.
279  HANDLE hRead, hWrite;
280  if (!CreatePipe(&hRead, &hWrite, 0, 0))
281  win32_throw_error_string("Couldn't create pipe");
282 
283  // Set the write handle to be inherited by the child process.
284  SetHandleInformation(hWrite, HANDLE_FLAG_INHERIT, 1);
285 
286  // Create the child process.
287  PROCESS_INFORMATION procinfo;
288  memset(&procinfo, 0, sizeof(PROCESS_INFORMATION));
289 
290  STARTUPINFO startupinfo;
291  memset(&startupinfo, 0, sizeof(STARTUPINFO));
292  startupinfo.cb = sizeof(STARTUPINFO);
293  startupinfo.hStdError = hWrite;
294  startupinfo.hStdOutput = hWrite;
295  startupinfo.hStdInput = INVALID_HANDLE_VALUE;
296  startupinfo.dwFlags |= STARTF_USESTDHANDLES;
297 
298  // For some reason Windows wants a modifiable command line string
299  // so pass a pointer to the first character rather than using c_str().
300  if (!CreateProcess(XAPIAN_TCPSRV, &cmd[0], 0, 0, TRUE, 0, 0, 0,
301  &startupinfo, &procinfo)) {
302  win32_throw_error_string("Couldn't create child process");
303  }
304 
305  CloseHandle(hWrite);
306  CloseHandle(procinfo.hThread);
307 
308  string output;
309  FILE *fh = fdopen(_open_osfhandle(intptr_t(hRead), O_RDONLY), "r");
310  while (true) {
311  char buf[256];
312  if (fgets(buf, sizeof(buf), fh) == NULL) {
313  fclose(fh);
314  DWORD rc;
315  // This doesn't seem to be necessary on the machine I tested on,
316  // but I guess it could be on a slow machine...
317  while (GetExitCodeProcess(procinfo.hProcess, &rc) && rc == STILL_ACTIVE) {
318  Sleep(100);
319  }
320  CloseHandle(procinfo.hProcess);
321  if (++port < 65536 && rc == EX_UNAVAILABLE) {
322  // Exit code EX_UNAVAILABLE from xapian-tcpsrv means the
323  // specified port was in use.
324  goto try_next_port;
325  }
326  string msg("Failed to get 'Listening...' from command '");
327  msg += cmd;
328  msg += "' (output: ";
329  msg += output;
330  msg += ")";
331  throw msg;
332  }
333  if (strcmp(buf, "Listening...\r\n") == 0) break;
334  output += buf;
335  }
336  fclose(fh);
337 
338  if (tcpsrv_handles_index < TCPSRV_HANDLES_INDEX_MAX) {
339  tcpsrv_handles[tcpsrv_handles_index++] = procinfo.hProcess;
340  }
341 
342  return port;
343 }
344 
345 #else
346 # error Neither HAVE_FORK nor __WIN32__ is defined
347 #endif
348 
351 }
352 
354 BackendManagerRemoteTcp::do_get_database(const vector<string> & files)
355 {
356  // Default to a long (5 minute) timeout so that tests won't fail just
357  // because the host is slow or busy.
358  return BackendManagerRemoteTcp::get_remote_database(files, 300000);
359 }
360 
363  const string & file)
364 {
365  string args = get_writable_database_args(name, file);
366  int port = launch_xapian_tcpsrv(args);
368 }
369 
371 BackendManagerRemoteTcp::get_remote_database(const vector<string> & files,
372  unsigned int timeout)
373 {
374  string args = get_remote_database_args(files, timeout);
375  int port = launch_xapian_tcpsrv(args);
376  return Xapian::Remote::open(LOCALHOST, port);
377 }
378 
381 {
382  string args = get_remote_database_args(path, 300000);
383  int port = launch_xapian_tcpsrv(args);
384  return Xapian::Remote::open(LOCALHOST, port);
385 }
386 
389 {
390  string args = get_writable_database_as_database_args();
391  int port = launch_xapian_tcpsrv(args);
392  return Xapian::Remote::open(LOCALHOST, port);
393 }
394 
397 {
398  string args = get_writable_database_again_args();
399  int port = launch_xapian_tcpsrv(args);
401 }
402 
403 void
405 {
406 #ifdef HAVE_FORK
407  signal(SIGCHLD, SIG_DFL);
408  for (unsigned i = 0; i < sizeof(pid_to_fd) / sizeof(pid_fd); ++i) {
409  pid_t child = pid_to_fd[i].pid;
410  if (child) {
411  int status;
412  while (waitpid(child, &status, 0) == -1 && errno == EINTR) { }
413  // Other possible error from waitpid is ECHILD, which it seems can
414  // only mean that the child has already exited and SIGCHLD was set
415  // to SIG_IGN. If we did somehow see that, the sanest response
416  // seems to be to close the fd and move on.
417  int fd = pid_to_fd[i].fd;
418  pid_to_fd[i].fd = 0;
419  pid_to_fd[i].pid = 0;
420  close(fd);
421  }
422  }
423 #elif defined __WIN32__
424  for (unsigned i = 0; i != tcpsrv_handles_index; ++i) {
425  WaitForSingleObject(tcpsrv_handles[i], INFINITE);
426  CloseHandle(tcpsrv_handles[i]);
427  }
428  tcpsrv_handles_index = 0;
429 #endif
430 }
int close(FD &fd)
Definition: fd.h:63
Xapian::WritableDatabase get_writable_database(const std::string &name, const std::string &file)
Create a RemoteTcp Xapian::WritableDatabase object indexing a single file.
Define the XAPIAN_NORETURN macro.
This class is used to access a database, or a group of databases.
Definition: database.h:68
Xapian::Database do_get_database(const std::vector< std::string > &files)
Create a Xapian::Database object indexing multiple files.
unsigned timeout
A timeout value in milliseconds.
Definition: types.h:100
include <sys/socket.h> with portability workarounds.
Convert errno value to std::string, thread-safe if possible.
#define XAPIAN_TCPSRV
STL namespace.
Convert types to std::string.
BackendManager subclass for remotetcp databases.
#define SOCK_CLOEXEC
Definition: safesyssocket.h:83
Xapian::Database get_writable_database_as_database()
Create a Database object for the last opened WritableDatabase.
This class provides read/write access to a database.
Definition: database.h:789
include <sysexits.h> with portability workarounds.
void errno_to_string(int e, string &s)
Database open(const std::string &host, unsigned int port, useconds_t timeout=10000, useconds_t connect_timeout=10000)
Construct a Database object for read-only access to a remote database accessed via a TCP connection...
Public interfaces for the Xapian library.
string str(int value)
Convert int to std::string.
Definition: str.cc:90
#define DEFAULT_PORT
Xapian::Database get_remote_database(const std::vector< std::string > &files, unsigned int timeout)
Create a RemoteTcp Xapian::Database with the specified timeout.
Xapian::Database get_database_by_path(const std::string &path)
Get a RemoteTcp Xapian::Database instance of the database at path.
include <windows.h> without all the bloat and damage.
Xapian::WritableDatabase get_writable_database_again()
Create a WritableDatabase object for the last opened WritableDatabase.
#define EX_UNAVAILABLE
Definition: safesysexits.h:33
#define LOCALHOST
Definition: header.h:151
WritableDatabase open_writable(const std::string &host, unsigned int port, useconds_t timeout=0, useconds_t connect_timeout=10000, int flags=0)
Construct a WritableDatabase object for update access to a remote database accessed via a TCP connect...
void clean_up()
Called after each test, to perform any necessary cleanup.
include <fcntl.h>, but working around broken platforms.