node/lib/_stream_wrap.js

217 lines
4.5 KiB
JavaScript

'use strict';
const assert = require('assert');
const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const uv = process.binding('uv');
const debug = util.debuglog('stream_wrap');
function StreamWrap(stream) {
const handle = new JSStream();
this.stream = stream;
this._list = null;
const self = this;
handle.close = function(cb) {
debug('close');
self.doClose(cb);
};
handle.isAlive = function() {
return self.isAlive();
};
handle.isClosing = function() {
return self.isClosing();
};
handle.onreadstart = function() {
return self.readStart();
};
handle.onreadstop = function() {
return self.readStop();
};
handle.onshutdown = function(req) {
return self.doShutdown(req);
};
handle.onwrite = function(req, bufs) {
return self.doWrite(req, bufs);
};
this.stream.pause();
this.stream.on('error', function(err) {
self.emit('error', err);
});
this.stream.on('data', function(chunk) {
debug('data', chunk.length);
if (self._handle)
self._handle.readBuffer(chunk);
});
this.stream.once('end', function() {
debug('end');
if (self._handle)
self._handle.emitEOF();
});
Socket.call(this, {
handle: handle
});
}
util.inherits(StreamWrap, Socket);
module.exports = StreamWrap;
// require('_stream_wrap').StreamWrap
StreamWrap.StreamWrap = StreamWrap;
StreamWrap.prototype.isAlive = function isAlive() {
return true;
};
StreamWrap.prototype.isClosing = function isClosing() {
return !this.readable || !this.writable;
};
StreamWrap.prototype.readStart = function readStart() {
this.stream.resume();
return 0;
};
StreamWrap.prototype.readStop = function readStop() {
this.stream.pause();
return 0;
};
StreamWrap.prototype.doShutdown = function doShutdown(req) {
const self = this;
const handle = this._handle;
const item = this._enqueue('shutdown', req);
this.stream.end(function() {
// Ensure that write was dispatched
setImmediate(function() {
if (!self._dequeue(item))
return;
handle.finishShutdown(req, 0);
});
});
return 0;
};
StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
const self = this;
const handle = self._handle;
var pending = bufs.length;
// Queue the request to be able to cancel it
const item = self._enqueue('write', req);
self.stream.cork();
bufs.forEach(function(buf) {
self.stream.write(buf, done);
});
self.stream.uncork();
function done(err) {
if (!err && --pending !== 0)
return;
// Ensure that this is called once in case of error
pending = 0;
// Ensure that write was dispatched
setImmediate(function() {
// Do not invoke callback twice
if (!self._dequeue(item))
return;
var errCode = 0;
if (err) {
if (err.code && uv['UV_' + err.code])
errCode = uv['UV_' + err.code];
else
errCode = uv.UV_EPIPE;
}
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
});
}
return 0;
};
function QueueItem(type, req) {
this.type = type;
this.req = req;
this.prev = this;
this.next = this;
}
StreamWrap.prototype._enqueue = function enqueue(type, req) {
const item = new QueueItem(type, req);
if (this._list === null) {
this._list = item;
return item;
}
item.next = this._list.next;
item.prev = this._list;
item.next.prev = item;
item.prev.next = item;
return item;
};
StreamWrap.prototype._dequeue = function dequeue(item) {
assert(item instanceof QueueItem);
var next = item.next;
var prev = item.prev;
if (next === null && prev === null)
return false;
item.next = null;
item.prev = null;
if (next === item) {
prev = null;
next = null;
} else {
prev.next = next;
next.prev = prev;
}
if (this._list === item)
this._list = next;
return true;
};
StreamWrap.prototype.doClose = function doClose(cb) {
const self = this;
const handle = self._handle;
setImmediate(function() {
while (self._list !== null) {
const item = self._list;
const req = item.req;
self._dequeue(item);
const errCode = uv.UV_ECANCELED;
if (item.type === 'write') {
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
} else if (item.type === 'shutdown') {
handle.finishShutdown(req, errCode);
}
}
// Should be already set by net.js
assert(self._handle === null);
cb();
});
};