Maintain queueSize for each socket

v0.7.4-release
Ryan Dahl 2010-11-12 11:24:30 -08:00
parent 5a84461e46
commit 7c3c5c6861
2 changed files with 17 additions and 5 deletions

View File

@ -254,8 +254,6 @@ Stream.prototype._appendBucket = function (data, encoding, fd) {
if (encoding) newBucket.encoding = encoding;
if (fd) newBucket.fd = fd;
var queueSize = data.length;
// TODO properly calculate queueSize
if (this._writeWatcher.lastBucket) {
@ -266,7 +264,13 @@ Stream.prototype._appendBucket = function (data, encoding, fd) {
this._writeWatcher.lastBucket = newBucket;
return queueSize;
if (this._writeWatcher.queueSize === undefined) {
this._writeWatcher.queueSize = 0;
}
assert(this._writeWatcher.queueSize >= 0);
this._writeWatcher.queueSize += data.length;
return this._writeWatcher.queueSize;
};

View File

@ -37,6 +37,7 @@ static Persistent<String> fd_sym;
static Persistent<String> is_unix_socket_sym;
static Persistent<String> first_bucket_sym;
static Persistent<String> last_bucket_sym;
static Persistent<String> queue_size_sym;
void IOWatcher::Initialize(Handle<Object> target) {
@ -62,6 +63,7 @@ void IOWatcher::Initialize(Handle<Object> target) {
onerror_sym = NODE_PSYMBOL("onerror");
first_bucket_sym = NODE_PSYMBOL("firstBucket");
last_bucket_sym = NODE_PSYMBOL("lastBucket");
queue_size_sym = NODE_PSYMBOL("queueSize");
offset_sym = NODE_PSYMBOL("offset");
fd_sym = NODE_PSYMBOL("fd");
is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
@ -423,6 +425,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
// what about written == 0 ?
size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
assert(queue_size >= offset);
// Now drop the buckets that have been written.
bucket_index = 0;
@ -451,15 +456,15 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
// serialized onto a buffer.
size_t bucket_len = Buffer::Length(data_v->ToObject());
if (unix_socket && bucket->Has(fd_sym)) {
bucket->Set(fd_sym, Null());
}
assert(bucket_len > offset);
DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
queue_size -= written;
// Only on the first bucket does is the offset > 0.
if (offset + written < bucket_len) {
// we have not written the entire bucket
@ -488,6 +493,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
watcher->Set(first_bucket_sym, bucket->Get(next_sym));
}
// Set the queue size.
watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
// Finished dumping the buckets.
//
// If our list of buckets is empty, we can emit 'drain' and forget about