From db3c4efd1d95e6c7fc47b9c07216beb7029cf7bc Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Thu, 19 Jan 2012 16:52:23 -0800 Subject: [PATCH] support for sharing streams accross isolates --- lib/child_process.js | 42 ++++++--------- lib/net.js | 23 ++++++++ src/node.js | 25 +++++++-- src/node_isolate.cc | 123 +++++++++++++++++++++++++++++++++++++------ 4 files changed, 165 insertions(+), 48 deletions(-) diff --git a/lib/child_process.js b/lib/child_process.js index a1eac7744dd..9723730ce77 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -68,29 +68,13 @@ function mergeOptions(target, overrides) { function setupChannel(target, channel) { - var isWindows = process.platform === 'win32'; target._channel = channel; var jsonBuffer = ''; - - if (isWindows) { - var setSimultaneousAccepts = function(handle) { - var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS && - process.env.NODE_MANY_ACCEPTS != '0') ? true : false; - - if (handle._simultaneousAccepts != simultaneousAccepts) { - handle.setSimultaneousAccepts(simultaneousAccepts); - handle._simultaneousAccepts = simultaneousAccepts; - } - } - } - channel.buffering = false; channel.onread = function(pool, offset, length, recvHandle) { - if (recvHandle && setSimultaneousAccepts) { - // Update simultaneous accepts on Windows - setSimultaneousAccepts(recvHandle); - } + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(recvHandle); if (pool) { jsonBuffer += pool.toString('ascii', offset, offset + length); @@ -140,10 +124,8 @@ function setupChannel(target, channel) { var buffer = Buffer(JSON.stringify(message) + '\n'); - if (sendHandle && setSimultaneousAccepts) { - // Update simultaneous accepts on Windows - setSimultaneousAccepts(sendHandle); - } + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(sendHandle); var writeReq = channel.write(buffer, 0, buffer.length, sendHandle); @@ -582,9 +564,13 @@ Isolate.prototype.spawn = function(options) { self._handle = isolates.create(options.args, options.options); if (!self._handle) throw new Error('Cannot create isolate.'); - self._handle.onmessage = function(msg) { + self._handle.onmessage = function(msg, recvHandle) { msg = JSON.parse('' + msg); - self.emit('message', msg); + + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(recvHandle); + + self.emit('message', msg, recvHandle); }; self._handle.onexit = function() { @@ -600,10 +586,14 @@ Isolate.prototype.kill = function(sig) { }; -Isolate.prototype.send = function(msg) { +Isolate.prototype.send = function(msg, sendHandle) { if (typeof msg === 'undefined') throw new TypeError('Bad argument.'); if (!this._handle) throw new Error('Isolate not running.'); msg = JSON.stringify(msg); msg = new Buffer(msg); - return this._handle.send(msg); + + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(sendHandle); + + return this._handle.send(msg, sendHandle); }; diff --git a/lib/net.js b/lib/net.js index a14a00c0a80..006036aaf29 100644 --- a/lib/net.js +++ b/lib/net.js @@ -942,3 +942,26 @@ exports.isIPv4 = function(input) { exports.isIPv6 = function(input) { return exports.isIP(input) === 6; }; + + +if (process.platform === 'win32') { + var simultaneousAccepts; + + exports._setSimultaneousAccepts = function(handle) { + if (typeof handle === 'undefined') { + return; + } + + if (typeof simultaneousAccepts === 'undefined') { + simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS && + process.env.NODE_MANY_ACCEPTS != '0') ? true : false; + } + + if (handle._simultaneousAccepts != simultaneousAccepts) { + handle.setSimultaneousAccepts(simultaneousAccepts); + handle._simultaneousAccepts = simultaneousAccepts; + } + } +} else { + exports._setSimultaneousAccepts = function(handle) {} +} diff --git a/src/node.js b/src/node.js index 8ca0da48d01..403101a8908 100644 --- a/src/node.js +++ b/src/node.js @@ -123,17 +123,27 @@ if (process.tid === 1) return; + var net = NativeModule.require('net'); + // isolate initialization - process.send = function(msg) { + process.send = function(msg, sendHandle) { if (typeof msg === 'undefined') throw new TypeError('Bad argument.'); msg = JSON.stringify(msg); msg = new Buffer(msg); - return process._send(msg); + + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(sendHandle); + + return process._send(msg, sendHandle); }; - process._onmessage = function(msg) { + process._onmessage = function(msg, recvHandle) { msg = JSON.parse('' + msg); - process.emit('message', msg); + + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(recvHandle); + + process.emit('message', msg, recvHandle); }; process.exit = process._exit; @@ -439,10 +449,15 @@ // Load tcp_wrap to avoid situation where we might immediately receive // a message. // FIXME is this really necessary? - process.binding('tcp_wrap') + process.binding('tcp_wrap'); cp._forkChild(); assert(process.send); + } else if (process.tid !== 1) { + // Load tcp_wrap to avoid situation where we might immediately receive + // a message. + // FIXME is this really necessary? + process.binding('tcp_wrap'); } } diff --git a/src/node_isolate.cc b/src/node_isolate.cc index ce980881a6c..5a1d2db4d58 100644 --- a/src/node_isolate.cc +++ b/src/node_isolate.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -34,6 +35,8 @@ #define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor) +#define ISOLATEMESSAGE_SHARED_STREAM 0x0001 + namespace node { @@ -166,23 +169,35 @@ private: struct IsolateMessage { - size_t size_; - char* data_; + int flags; + struct { + size_t size_; + char* buffer_; + } data_; + uv_stream_info_t shared_stream_info_; + + IsolateMessage(const char* buffer, size_t size, + uv_stream_info_t* shared_stream_info) { + flags = 0; - IsolateMessage(const char* data, size_t size) { // make a copy for now - size_ = size; - data_ = new char[size]; - memcpy(data_, data, size); + data_.size_ = size; + data_.buffer_ = new char[size]; + memcpy(data_.buffer_, buffer, size); + + if (shared_stream_info) { + flags |= ISOLATEMESSAGE_SHARED_STREAM; + shared_stream_info_ = *shared_stream_info; + } } ~IsolateMessage() { - delete[] data_; + delete[] data_.buffer_; } static void Free(char* data, void* arg) { IsolateMessage* msg = static_cast(arg); - assert(data == msg->data_); + assert(data == msg->data_.buffer_); delete msg; } }; @@ -208,7 +223,23 @@ Handle Isolate::Send(const Arguments& args) { const char* data = Buffer::Data(obj); size_t size = Buffer::Length(obj); - IsolateMessage* msg = new IsolateMessage(data, size); + IsolateMessage* msg; + + if (args[1]->IsObject()) { + uv_stream_info_t stream_info; + + Local send_stream_obj = args[1]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast( + send_stream_obj->GetPointerFromInternalField(0)); + uv_stream_t* send_stream = send_stream_wrap->GetStream(); + int r = uv_export(send_stream, &stream_info); + assert(r == 0); + msg = new IsolateMessage(data, size, &stream_info); + } else { + msg = new IsolateMessage(data, size, NULL); + } + isolate->send_channel_->Send(msg); return Undefined(); @@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) { Isolate* self = static_cast(arg); NODE_ISOLATE_CHECK(self); - Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg); - Handle argv[] = { buf->handle_ }; - MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv); + Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_, + IsolateMessage::Free, msg); + + int argc = 1; + Handle argv[2] = { + buf->handle_ + }; + + if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) { + // Instantiate the client javascript object and handle. + Local pending_obj = TCPWrap::Instantiate(); + + // Unwrap the client javascript object. + assert(pending_obj->InternalFieldCount() > 0); + TCPWrap* pending_wrap = + static_cast(pending_obj->GetPointerFromInternalField(0)); + + int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_); + assert(r == 0); + + argv[1] = pending_obj; + argc++; + } + + MakeCallback(self->globals_.process, "_onmessage", argc, argv); } @@ -442,9 +495,30 @@ private: NODE_ISOLATE_CHECK(parent_isolate_); HandleScope scope; Buffer* buf = Buffer::New( - msg->data_, msg->size_, IsolateMessage::Free, msg); - Handle argv[] = { buf->handle_ }; - MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv); + msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg); + + int argc = 1; + Handle argv[2] = { + buf->handle_ + }; + + if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) { + // Instantiate the client javascript object and handle. + Local pending_obj = TCPWrap::Instantiate(); + + // Unwrap the client javascript object. + assert(pending_obj->InternalFieldCount() > 0); + TCPWrap* pending_wrap = + static_cast(pending_obj->GetPointerFromInternalField(0)); + + int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_); + assert(r == 0); + + argv[1] = pending_obj; + argc++; + } + + MakeCallback(handle_, "onmessage", argc, argv); } // TODO merge with Isolate::Send(), it's almost identical @@ -457,9 +531,24 @@ private: const char* data = Buffer::Data(obj); size_t size = Buffer::Length(obj); - IsolateMessage* msg = new IsolateMessage(data, size); - self->send_channel_->Send(msg); + IsolateMessage* msg; + if (args[1]->IsObject()) { + uv_stream_info_t stream_info; + + Local send_stream_obj = args[1]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast( + send_stream_obj->GetPointerFromInternalField(0)); + uv_stream_t* send_stream = send_stream_wrap->GetStream(); + int r = uv_export(send_stream, &stream_info); + assert(r == 0); + msg = new IsolateMessage(data, size, &stream_info); + } else { + msg = new IsolateMessage(data, size, NULL); + } + + self->send_channel_->Send(msg); return Undefined(); }