From 03c5772ce40f5415d25273aa3344bc68a91816cc Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:10:00 +0200 Subject: [PATCH] Get stdin/stdout working. Add process->Close(). --- src/process.cc | 263 ++++++++++++++++++++++++++++++++++++++++++++----- src/process.h | 10 +- 2 files changed, 247 insertions(+), 26 deletions(-) diff --git a/src/process.cc b/src/process.cc index fddc4ca2070..7e7495477cd 100644 --- a/src/process.cc +++ b/src/process.cc @@ -2,6 +2,8 @@ #include "process.h" #include +#include +#include #include #include #include @@ -9,6 +11,10 @@ using namespace v8; using namespace node; +#define ON_ERROR_SYMBOL String::NewSymbol("onError") +#define ON_OUTPUT_SYMBOL String::NewSymbol("onOutput") +#define ON_EXIT_SYMBOL String::NewSymbol("onExit") + Persistent Process::constructor_template; void @@ -20,10 +26,8 @@ Process::Initialize (Handle target) constructor_template = Persistent::New(t); constructor_template->InstanceTemplate()->SetInternalFieldCount(1); -#if 0 - NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", Timer::Start); - NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", Timer::Stop); -#endif + NODE_SET_PROTOTYPE_METHOD(constructor_template, "write", Process::Write); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); } @@ -48,13 +52,113 @@ Process::New (const Arguments& args) return args.This(); } +static void +free_buf (oi_buf *b) +{ + V8::AdjustAmountOfExternalAllocatedMemory(-b->len); + free(b); +} + +static oi_buf * +new_buf (size_t size) +{ + size_t total = sizeof(oi_buf) + size; + void *p = malloc(total); + if (p == NULL) return NULL; + + oi_buf *b = static_cast(p); + b->base = static_cast(p) + sizeof(oi_buf); + + b->len = size; + b->release = free_buf; + V8::AdjustAmountOfExternalAllocatedMemory(total); + + return b; +} + +Handle +Process::Write (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + +#if 0 + if ( connection->ReadyState() != OPEN + && connection->ReadyState() != WRITE_ONLY + ) + return ThrowException(String::New("Socket is not open for writing")); +#endif + + // XXX + // A lot of improvement can be made here. First of all we're allocating + // oi_bufs for every send which is clearly inefficent - it should use a + // memory pool or ring buffer. Of course, expressing binary data as an + // array of integers is extremely inefficent. This can improved when v8 + // bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been + // addressed. + + oi_buf *buf; + size_t len; + + if (args[0]->IsString()) { + enum encoding enc = ParseEncoding(args[1]); + Local s = args[0]->ToString(); + len = s->Utf8Length(); + buf = new_buf(len); + switch (enc) { + case RAW: + case ASCII: + s->WriteAscii(buf->base, 0, len); + break; + + case UTF8: + s->WriteUtf8(buf->base, len); + break; + + default: + assert(0 && "unhandled string encoding"); + } + + } else if (args[0]->IsArray()) { + Handle array = Handle::Cast(args[0]); + len = array->Length(); + buf = new_buf(len); + for (size_t i = 0; i < len; i++) { + Local int_value = array->Get(Integer::New(i)); + buf->base[i] = int_value->IntegerValue(); + } + + } else return ThrowException(String::New("Bad argument")); + + if (process->Write(buf) != 0) { + return ThrowException(String::New("Pipe already closed")); + } + + return Undefined(); +} + +Handle +Process::Close (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + + if (process->Close() != 0) { + return ThrowException(String::New("Pipe already closed.")); + } + + return Undefined(); +} + Process::Process (Handle handle) : ObjectWrap(handle) { ev_init(&stdout_watcher_, Process::OnOutput); stdout_watcher_.data = this; - ev_init(&stderr_watcher_, Process::OnError); + ev_init(&stderr_watcher_, Process::OnOutput); stderr_watcher_.data = this; ev_init(&stdin_watcher_, Process::OnWritable); @@ -70,7 +174,11 @@ Process::Process (Handle handle) stdin_pipe_[0] = -1; stdin_pipe_[1] = -1; + got_close_ = false; + pid_ = 0; + + oi_queue_init(&out_stream_); } Process::~Process () @@ -108,7 +216,7 @@ Process::Shutdown () Detach(); } -static int +static inline int SetNonBlocking (int fd) { int flags = fcntl(fd, F_GETFL, 0); @@ -152,6 +260,8 @@ Process::Spawn (const char *command) return -4; case 0: // Child. + //printf("child process!\n"); + close(stdout_pipe_[0]); // close read end dup2(stdout_pipe_[1], STDOUT_FILENO); @@ -161,7 +271,10 @@ Process::Spawn (const char *command) close(stdin_pipe_[1]); // close write end dup2(stdin_pipe_[0], STDIN_FILENO); - execl("/bin/sh", "-c", command, (char *)NULL); + //printf("child process!\n"); + + execl("/bin/sh", "sh", "-c", command, (char *)NULL); + //execl(_PATH_BSHELL, "sh", "-c", program, (char *)NULL); _exit(127); } @@ -171,23 +284,21 @@ Process::Spawn (const char *command) ev_child_start(EV_DEFAULT_UC_ &child_watcher_); SetNonBlocking(stdout_pipe_[0]); - SetNonBlocking(stderr_pipe_[0]); - SetNonBlocking(stdin_pipe_[1]); - ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ); - ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); - ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); - ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_); - ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); - ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); - close(stdout_pipe_[1]); // close write end - close(stderr_pipe_[1]); // close write end - close(stdin_pipe_[0]); // close read end - stdout_pipe_[1] = -1; + + SetNonBlocking(stderr_pipe_[0]); + ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); + ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); + close(stderr_pipe_[1]); // close write end stderr_pipe_[1] = -1; + + SetNonBlocking(stdin_pipe_[1]); + ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + close(stdin_pipe_[0]); // close read end stdin_pipe_[0] = -1; Attach(); @@ -198,22 +309,96 @@ Process::Spawn (const char *command) void Process::OnOutput (EV_P_ ev_io *watcher, int revents) { - Process *process = static_cast(watcher->data); - assert(revents == EV_READ); -} + int r; + char buf[16*1024]; + size_t buf_size = 16*1024; -void -Process::OnError (EV_P_ ev_io *watcher, int revents) -{ Process *process = static_cast(watcher->data); + + bool is_stdout = (&process->stdout_watcher_ == watcher); + int fd = is_stdout ? process->stdout_pipe_[0] : process->stderr_pipe_[0]; + assert(revents == EV_READ); + assert(fd >= 0); + + HandleScope scope; + Handle callback_v = + process->handle_->Get(is_stdout ? ON_OUTPUT_SYMBOL : ON_ERROR_SYMBOL); + Handle callback; + if (callback_v->IsFunction()) { + callback = Handle::Cast(callback_v); + } + Handle argv[1]; + + for (;;) { + r = read(fd, buf, buf_size); + + if (r < 0) { + if (errno != EAGAIN) perror("IPC pipe read error"); + break; + } + + if (!callback.IsEmpty()) { + if (r == 0) { + argv[0] = Null(); + } else { + // TODO multiple encodings + argv[0] = String::New((const char*)buf, r); + } + + TryCatch try_catch; + callback->Call(process->handle_, 1, argv); + if (try_catch.HasCaught()) { + FatalException(try_catch); + return; + } + } + + if (r == 0) { + ev_io_stop(EV_DEFAULT_UC_ watcher); + break; + } + } } void Process::OnWritable (EV_P_ ev_io *watcher, int revents) { Process *process = static_cast(watcher->data); + int sent; + assert(revents == EV_WRITE); + assert(process->stdin_pipe_[1] >= 0); + + while (!oi_queue_empty(&process->out_stream_)) { + oi_queue *q = oi_queue_last(&process->out_stream_); + oi_buf *to_write = (oi_buf*) oi_queue_data(q, oi_buf, queue); + + sent = write( process->stdin_pipe_[1] + , to_write->base + to_write->written + , to_write->len - to_write->written + ); + if (sent < 0) { + if (errno == EAGAIN) break; + perror("IPC pipe write error"); + break; + } + + to_write->written += sent; + + if (to_write->written == to_write->len) { + oi_queue_remove(q); + if (to_write->release) to_write->release(to_write); + } + } + + if (oi_queue_empty(&process->out_stream_)) { + ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_); + if (process->got_close_) { + close(process->stdin_pipe_[1]); + process->stdin_pipe_[1] = -1; + } + } } void @@ -221,5 +406,33 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents) { ev_child_stop(EV_A_ watcher); Process *process = static_cast(watcher->data); + assert(revents == EV_CHILD); + assert(process->pid_ == watcher->rpid); + assert(&process->child_watcher_ == watcher); + + // Call onExit ( watcher->rstatus ) + printf("OnCHLD with status %d\n", watcher->rstatus); } + +int +Process::Write (oi_buf *buf) +{ + if (stdin_pipe_[1] < 0 || got_close_) + return -1; + oi_queue_insert_head(&out_stream_, &buf->queue); + buf->written = 0; + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + return 0; +} + +int +Process::Close () +{ + if (stdin_pipe_[1] < 0 || got_close_) + return -1; + got_close_ = true; + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + return 0; +} + diff --git a/src/process.h b/src/process.h index 5e2a25eb76f..8cbb444b538 100644 --- a/src/process.h +++ b/src/process.h @@ -4,24 +4,28 @@ #include "node.h" #include #include +#include namespace node { class Process : ObjectWrap { public: static void Initialize (v8::Handle target); - virtual size_t size (void) { return sizeof(Process); } protected: static v8::Persistent constructor_template; static v8::Handle New (const v8::Arguments& args); + static v8::Handle Write (const v8::Arguments& args); + static v8::Handle Close (const v8::Arguments& args); Process(v8::Handle handle); ~Process(); void Shutdown (); int Spawn (const char *command); + int Write (oi_buf *buf); + int Close (); private: static void OnOutput (EV_P_ ev_io *watcher, int revents); @@ -39,6 +43,10 @@ class Process : ObjectWrap { int stdin_pipe_[2]; pid_t pid_; + + bool got_close_; + + oi_queue out_stream_; }; } // namespace node