diff --git a/deps/coupling/coupling.c b/deps/coupling/coupling.c index 57eca1c8ceb..f6b03f38eb3 100644 --- a/deps/coupling/coupling.c +++ b/deps/coupling/coupling.c @@ -143,98 +143,176 @@ ring_buffer_push (ring_buffer *ring, int fd) return r; } +/* PULL PUMP + * + * This is used to read data from a blocking file descriptor and pump it into + * a non-blocking pipe (or other non-blocking fd). The algorithm is this: + * + * while (true) { + * read(STDIN_FILENO) // blocking + * + * while (!ring.empty) { + * write(pipe) // non-blocking + * select(pipe, writable) + * } + * } + * + */ static void -pump (int is_pull, int pullfd, int pushfd) +pull_pump (int pullfd, int pushfd) { int r; ring_buffer ring; - fd_set readfds, writefds, exceptfds; + + fd_set writefds, exceptfds; + FD_ZERO(&exceptfds); + FD_ZERO(&writefds); + FD_SET(pushfd, &exceptfds); + FD_SET(pushfd, &writefds); ring_buffer_init(&ring); - int maxfd; + while (pullfd >= 0) { + /* Blocking read from STDIN_FILENO */ + r = ring_buffer_pull(&ring, pullfd); - while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) { - FD_ZERO(&exceptfds); - FD_ZERO(&readfds); - FD_ZERO(&writefds); - - maxfd = -1; - - if (is_pull) { - if (!ring_buffer_empty_p(&ring)) { - maxfd = pushfd; - FD_SET(pushfd, &exceptfds); - FD_SET(pushfd, &writefds); - } - } else { - if (pullfd >= 0) { - if (!ring_buffer_filled_p(&ring)) { - maxfd = pullfd; - FD_SET(pullfd, &exceptfds); - FD_SET(pullfd, &readfds); - } - } + if (r == 0) { + /* eof */ + close(pullfd); + pullfd = -1; + } else if (r < 0 && errno != EINTR && errno != EAGAIN) { + /* error */ + perror("pull_pump read()"); + close(pullfd); + pullfd = -1; } - if (maxfd >= 0) { - r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL); + /* Push all of the data in the ring buffer out. */ + while (!ring_buffer_empty_p(&ring)) { + /* non-blocking write() to the pipe */ + r = ring_buffer_push(&ring, pushfd); - if (r < 0 || (pullfd >= 0 && FD_ISSET(pushfd, &exceptfds))) { + if (r < 0 && errno != EAGAIN && errno != EINTR) { + if (errno == EPIPE) { + /* This happens if someone closes the other end of the pipe. This + * is a normal forced close of STDIN. Hopefully there wasn't data + * in the ring buffer. Just close both ends and exit. + */ + close(pushfd); + close(pullfd); + pushfd = pullfd = -1; + } else { + perror("pull_pump write()"); + close(pushfd); + close(pullfd); + } + return; + } + + /* Select for writablity on the pipe end. + * Very rarely will this stick. + */ + r = select(pushfd+1, NULL, &writefds, &exceptfds, NULL); + + if (r < 0 || FD_ISSET(pushfd, &exceptfds)) { close(pushfd); close(pullfd); pushfd = pullfd = -1; return; } } + } + assert(pullfd < 0); + assert(ring_buffer_empty_p(&ring)); + close(pushfd); +} - if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) { +/* PUSH PUMP + * + * This is used to push data out to a blocking file descriptor. It pulls + * data from a non-blocking pipe (pullfd) and pushes to STDOUT_FILENO + * (pushfd). + * When the pipe is closed, then the rest of the data is pushed out and then + * STDOUT_FILENO is closed. + * + * The algorithm looks roughly like this: + * + * while (true) { + * r = read(pipe) // nonblocking + * + * while (!ring.empty) { + * write(STDOUT_FILENO) // blocking + * } + * + * select(pipe, readable); + * } + */ +static void +push_pump (int pullfd, int pushfd) +{ + int r; + ring_buffer ring; + + fd_set readfds, exceptfds; + FD_ZERO(&exceptfds); + FD_ZERO(&readfds); + FD_SET(pullfd, &exceptfds); + FD_SET(pullfd, &readfds); + + ring_buffer_init(&ring); + + /* The pipe is open or there is data left to be pushed out + * NOTE: if pushfd (STDOUT_FILENO) ever errors out, then we just exit the + * loop. + */ + while (pullfd >= 0 || !ring_buffer_empty_p(&ring)) { + + /* Pull from the non-blocking pipe */ + r = ring_buffer_pull(&ring, pullfd); + + if (r == 0) { + /* eof */ close(pullfd); pullfd = -1; + } else if (r < 0 && errno != EINTR && errno != EAGAIN) { + perror("push_pump read()"); + close(pullfd); + pullfd = -1; + return; } - if (pullfd >= 0 && (is_pull || FD_ISSET(pullfd, &readfds))) { - r = ring_buffer_pull(&ring, pullfd); - if (r == 0) { - /* eof */ - close(pullfd); - pullfd = -1; + /* Push everything out to STDOUT */ + while (!ring_buffer_empty_p(&ring)) { + /* Blocking write() to pushfd (STDOUT_FILENO) */ + r = ring_buffer_push(&ring, pushfd); - } else if (r < 0) { - if (errno != EINTR && errno != EAGAIN) goto error; + /* If there was a problem, just exit the entire function */ + + if (r < 0 && errno != EINTR) { + close(pushfd); + close(pullfd); + pushfd = pullfd = -1; + return; } } + + if (pullfd >= 0) { + /* select for readability on the pullfd */ + r = select(pullfd+1, &readfds, NULL, &exceptfds, NULL); - if (!is_pull || FD_ISSET(pushfd, &writefds)) { - r = ring_buffer_push(&ring, pushfd); - if (r < 0) { - switch (errno) { - case EINTR: - case EAGAIN: - continue; - - case EPIPE: - /* TODO catch SIGPIPE? */ - close(pushfd); - close(pullfd); - pushfd = pullfd = -1; - return; - - default: - goto error; - } + if (r < 0 || FD_ISSET(pullfd, &exceptfds)) { + close(pushfd); + close(pullfd); + pushfd = pullfd = -1; + return; } } } - + /* If we got here then we got eof on pullfd and pushed all the data out. + * so now just close pushfd */ + assert(pullfd < 0); + assert(ring_buffer_empty_p(&ring)); close(pushfd); - close(pullfd); - return; - -error: - close(pushfd); - close(pullfd); - perror("(coupling) pump"); } static inline int @@ -262,7 +340,11 @@ pump_thread (void *data) { struct coupling *c = (struct coupling*)data; - pump(c->is_pull, c->pullfd, c->pushfd); + if (c->is_pull) { + pull_pump(c->pullfd, c->pushfd); + } else { + push_pump(c->pullfd, c->pushfd); + } return NULL; } diff --git a/test/mjsunit/fixtures/echo.js b/test/mjsunit/fixtures/echo.js index 49f435cc7a7..0aed0ff84ce 100644 --- a/test/mjsunit/fixtures/echo.js +++ b/test/mjsunit/fixtures/echo.js @@ -1,5 +1,12 @@ process.mixin(require("../common")); process.stdio.open(); + +print("hello world\r\n"); + process.stdio.addListener("data", function (data) { - puts(data); -}); \ No newline at end of file + print(data); +}); + +process.stdio.addListener("close", function () { + process.stdio.close(); +}); diff --git a/test/mjsunit/test-stdio.js b/test/mjsunit/test-stdio.js index 9570539d7d3..e8bc6842d0a 100644 --- a/test/mjsunit/test-stdio.js +++ b/test/mjsunit/test-stdio.js @@ -2,20 +2,34 @@ process.mixin(require("./common")); var sub = path.join(fixturesDir, 'echo.js'); -var result = false; - -var child = process.createChildProcess(path.join(libDir, "../bin/node"), [sub]); +var gotHelloWorld = false; +var gotEcho = false; + +var child = process.createChildProcess(process.argv[0], [sub]); + child.addListener("error", function (data){ puts("parent stderr: " + data); }); + child.addListener("output", function (data){ - if (data && data[0] == 't') { - result = true; + if (data) { + puts('child said: ' + JSON.stringify(data)); + if (!gotHelloWorld) { + assert.equal("hello world\r\n", data); + gotHelloWorld = true; + child.write('echo me\r\n'); + } else { + assert.equal("echo me\r\n", data); + gotEcho = true; + child.close(); + } + } else { + puts('child eof'); } }); -setTimeout(function () { - child.write('t\r\n'); -}, 100); -setTimeout(function (){ - assert.ok(result); -}, 500) + + +process.addListener('exit', function () { + assert.ok(gotHelloWorld); + assert.ok(gotEcho); +});