Dumper net.js integration

v0.7.4-release
Ryan Dahl 2010-11-05 13:03:32 -07:00
parent 913789da3e
commit dcc547d798
2 changed files with 59 additions and 213 deletions

View File

@ -827,8 +827,8 @@ function connectionListener (socket) {
// No more messages to be pushed out.
// HACK: need way to do this with socket interface
if (socket._writeQueue.length) {
socket.__destroyOnDrain = true; //socket.end();
if (socket._writeWatcher.firstBucket) {
socket._eof = true;
} else {
socket.destroy();
}

View File

@ -54,6 +54,24 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
return new IOWatcher();
});
IOWatcher.prototype.ondrain = function () {
assert(this.socket);
if (this.writable || this.readable) {
require('timers').active(this.socket);
this.socket.emit('drain');
}
if (this.socket._eof) this.socket._shutdown();
};
IOWatcher.prototype.onerror = function (errno) {
assert(this.socket);
this.socket.destroy(errnoException(errno, 'write'));
};
exports.isIP = binding.isIP;
exports.isIPv4 = function (input) {
@ -92,16 +110,6 @@ function setImplmentationMethods (self) {
};
if (self.type == 'unix') {
self._writeImpl = function (buf, off, len, fd, flags) {
// Detect and disallow zero-byte writes wth an attached file
// descriptor. This is an implementation limitation of sendmsg(2).
if (fd && noData(buf, off, len)) {
throw new Error('File descriptors can only be written with data');
}
return sendMsg(self.fd, buf, off, len, fd, flags);
};
self._readImpl = function (buf, off, len) {
var bytesRead = recvMsg(self.fd, buf, off, len);
@ -123,25 +131,10 @@ function setImplmentationMethods (self) {
return bytesRead;
};
} else {
self._writeImpl = function (buf, off, len, fd, flags) {
// XXX: TLS support requires that 0-byte writes get processed
// by the kernel for some reason. Otherwise, we'd just
// fast-path return here.
// Drop 'fd' and 'flags' as these are not supported by the write(2)
// system call
return write(self.fd, buf, off, len);
};
self._readImpl = function (buf, off, len) {
return read(self.fd, buf, off, len);
};
}
self._shutdownImpl = function () {
shutdown(self.fd, 'write');
};
};
@ -168,11 +161,6 @@ function initStream (self) {
self._readWatcher.callback = onReadable;
self.readable = false;
// Queue of buffers and string that need to be written to socket.
self._writeQueue = [];
self._writeQueueEncoding = [];
self._writeQueueFD = [];
self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
self._writeWatcher.callback = onWritable;
@ -255,182 +243,44 @@ Object.defineProperty(Stream.prototype, 'readyState', {
});
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
Stream.prototype.write = function (data, encoding, fd) {
if (this._connecting || (this._writeQueue && this._writeQueue.length)) {
if (!this._writeQueue) {
this._writeQueue = [];
this._writeQueueEncoding = [];
this._writeQueueFD = [];
}
Stream.prototype._appendBucket = function (data, encoding, fd) {
var newBucket = { data: data };
if (encoding) newBucket.encoding = encoding;
if (fd) newBucket.fd = fd;
// Slow. There is already a write queue, so let's append to it.
if (this._writeQueueLast() === END_OF_FILE) {
throw new Error('Stream.end() called already; cannot write.');
}
var queueSize = data.length;
if (typeof data == 'string' &&
this._writeQueue.length &&
typeof this._writeQueue[this._writeQueue.length-1] === 'string' &&
this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) {
// optimization - concat onto last
this._writeQueue[this._writeQueue.length-1] += data;
} else {
this._writeQueue.push(data);
this._writeQueueEncoding.push(encoding);
}
// TODO properly calculate queueSize
if (fd != undefined) {
this._writeQueueFD.push(fd);
}
return false;
if (this._writeWatcher.lastBucket) {
this._writeWatcher.lastBucket.next = newBucket;
} else {
// Fast.
// The most common case. There is no write queue. Just push the data
// directly to the socket.
return this._writeOut(data, encoding, fd);
this._writeWatcher.firstBucket = newBucket;
}
this._writeWatcher.lastBucket = newBucket;
return queueSize;
};
// Directly writes the data to socket.
//
// Steps:
// 1. If it's a string, write it to the `pool`. (If not space remains
// on the pool make a new one.)
// 2. Write data to socket. Return true if flushed.
// 3. Slice out remaining
// 4. Unshift remaining onto _writeQueue. Return false.
Stream.prototype._writeOut = function (data, encoding, fd) {
Stream.prototype.write = function (data, encoding, fd) {
if (this._eof) {
throw new Error('Stream.end() called already; cannot write.');
}
if (!this.writable) {
throw new Error('Stream is not writable');
}
var buffer, off, len;
var bytesWritten, charsWritten;
var queuedData = false;
var queueSize = this._appendBucket(data, encoding, fd);
if (typeof data != 'string') {
// 'data' is a buffer, ignore 'encoding'
buffer = data;
off = 0;
len = data.length;
} else {
assert(typeof data == 'string');
if (!pool || pool.length - pool.used < kMinPoolSpace) {
pool = null;
allocNewPool();
}
if (!encoding || encoding == 'utf8' || encoding == 'utf-8') {
// default to utf8
bytesWritten = pool.write(data, 'utf8', pool.used);
charsWritten = Buffer._charsWritten;
} else {
bytesWritten = pool.write(data, encoding, pool.used);
charsWritten = bytesWritten;
}
if (encoding && data.length > 0) {
assert(bytesWritten > 0);
}
buffer = pool;
len = bytesWritten;
off = pool.used;
pool.used += bytesWritten;
debug('wrote ' + bytesWritten + ' bytes to pool');
if (charsWritten != data.length) {
//debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n");
// Unshift whatever didn't fit onto the buffer
this._writeQueue.unshift(data.slice(charsWritten));
this._writeQueueEncoding.unshift(encoding);
this._writeWatcher.start();
queuedData = true;
}
}
try {
bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
} catch (e) {
this.destroy(e);
return false;
}
debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");
if (this._connecting) return false;
this._onWritable(); // Insert writeWatcher into the dumpQueue
require('timers').active(this);
if (bytesWritten == len) {
// awesome. sent to buffer.
if (buffer === pool) {
// If we're just writing from the pool then we can make a little
// optimization and save the space.
buffer.used -= len;
}
if (queuedData) {
return false;
} else {
return true;
}
}
// Didn't write the entire thing to buffer.
// Need to wait for the socket to become available before trying again.
this._writeWatcher.start();
// Slice out the data left.
var leftOver = buffer.slice(off + bytesWritten, off + len);
leftOver.used = leftOver.length; // used the whole thing...
// util.error('data.used = ' + data.used);
//if (!this._writeQueue) initWriteStream(this);
// data should be the next thing to write.
this._writeQueue.unshift(leftOver);
this._writeQueueEncoding.unshift(null);
// If didn't successfully write any bytes, enqueue our fd and try again
if (!bytesWritten) {
this._writeQueueFD.unshift(fd);
}
return false;
};
// Flushes the write buffer out.
// Returns true if the entire buffer was flushed.
Stream.prototype.flush = function () {
while (this._writeQueue && this._writeQueue.length) {
var data = this._writeQueue.shift();
var encoding = this._writeQueueEncoding.shift();
var fd = this._writeQueueFD.shift();
if (data === END_OF_FILE) {
this._shutdown();
return true;
}
var flushed = this._writeOut(data,encoding,fd);
if (!flushed) return false;
}
if (this._writeWatcher) this._writeWatcher.stop();
return true;
};
Stream.prototype._writeQueueLast = function () {
return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
: null;
return queueSize < (64*1024);
};
@ -481,7 +331,7 @@ Stream.prototype._onConnect = function () {
}
if (this._writeQueue && this._writeQueue.length) {
if (this._writeWatcher.firstBucket) {
// Flush this in case any writes are queued up while connecting.
this._onWritable();
}
@ -493,11 +343,10 @@ Stream.prototype._onConnect = function () {
Stream.prototype._onWritable = function () {
// Stream becomes writable on connect() but don't flush if there's
// nothing actually to write
if (this.flush()) {
if (this._events && this._events['drain']) this.emit("drain");
if (this.ondrain) this.ondrain(); // Optimization
// Stick it into the dumpQueue
if (!this._writeWatcher.next) {
this._writeWatcher.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = this._writeWatcher;
}
};
@ -648,15 +497,13 @@ Stream.prototype.destroy = function (exception) {
// pool is shared between sockets, so don't need to free it here.
var self = this;
// TODO would like to set _writeQueue to null to avoid extra object alloc,
// but lots of code assumes this._writeQueue is always an array.
this._writeQueue = [];
this.readable = this.writable = false;
if (this._writeWatcher) {
this._writeWatcher.stop();
this._writeWatcher.socket = null;
this._writeWatcher.firstBucket = null;
this._writeWatcher.lastBucket = null;
ioWatchers.free(this._writeWatcher);
this._writeWatcher = null;
}
@ -695,7 +542,7 @@ Stream.prototype._shutdown = function () {
this.writable = false;
try {
this._shutdownImpl();
shutdown(this.fd, 'write');
} catch (e) {
this.destroy(e);
}
@ -708,15 +555,14 @@ Stream.prototype._shutdown = function () {
Stream.prototype.end = function (data, encoding) {
if (this.writable) {
if (this._writeQueueLast() !== END_OF_FILE) {
if (data) this.write(data, encoding);
this._writeQueue.push(END_OF_FILE);
if (!this._connecting) {
this.flush();
}
}
}
if (!this.writable) return; // TODO this should throw error
if (this._eof) return; // TODO this should also throw error
if (data) this._appendBucket(data, encoding);
this._eof = true;
// If this isn't in the dumpQueue then we shutdown now.
if (!this._writeWatcher.next) this._shutdown();
};