streams2: setEncoding and abstract out endReadable

pull/24504/head
isaacs 2012-10-03 16:52:14 -07:00
parent 51a52c43a2
commit 9b5abe5bfe
1 changed files with 71 additions and 16 deletions

View File

@ -24,6 +24,7 @@ module.exports = Readable;
var Stream = require('stream');
var util = require('util');
var assert = require('assert');
var StringDecoder;
util.inherits(Readable, Stream);
@ -41,12 +42,20 @@ function ReadableState(options, stream) {
this.pipes = [];
this.flowing = false;
this.ended = false;
this.endEmitted = false;
this.stream = stream;
this.reading = false;
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.decoder = null;
if (options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
}
}
function Readable(options) {
@ -54,12 +63,19 @@ function Readable(options) {
Stream.apply(this);
}
// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);
};
// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
var state = this._readableState;
if (state.length === 0 && state.ended) {
process.nextTick(this.emit.bind(this, 'end'));
endReadable(this);
return null;
}
@ -85,7 +101,12 @@ Readable.prototype.read = function(n) {
n = state.length;
}
var ret = n > 0 ? fromList(n, state.buffer, state.length) : null;
var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
else
ret = null;
if (ret === null || ret.length === 0)
state.needReadable = true;
@ -108,13 +129,26 @@ Readable.prototype.read = function(n) {
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
this.emit('readable');
else
endReadable(this);
return;
}
if (state.decoder)
chunk = state.decoder.write(chunk);
state.length += chunk.length;
state.buffer.push(chunk);
if (state.length < state.lowWaterMark)
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, that'll
// probably kick off another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length < state.lowWaterMark) {
this._read(state.bufferSize, onread.bind(this));
return;
}
// now we have something to call this.read() to get.
if (state.needReadable) {
@ -309,7 +343,7 @@ Readable.prototype.wrap = function(stream) {
stream.on('end', function() {
state.ended = true;
if (state.length === 0)
this.emit('end');
endReadable(this);
}.bind(this));
stream.on('data', function(chunk) {
@ -360,7 +394,7 @@ Readable.prototype.wrap = function(stream) {
n = state.length;
}
var ret = fromList(n, state.buffer, state.length);
var ret = fromList(n, state.buffer, state.length, !!state.decoder);
state.length -= n;
if (state.length < state.lowWaterMark && paused) {
@ -369,7 +403,7 @@ Readable.prototype.wrap = function(stream) {
}
if (state.length === 0 && state.ended)
process.nextTick(this.emit.bind(this, 'end'));
endReadable(this);
return ret;
};
@ -382,8 +416,7 @@ Readable._fromList = fromList;
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
// If there's no data, then
function fromList(n, list, length) {
function fromList(n, list, length, stringMode) {
var ret;
// nothing in the list, definitely empty.
@ -391,16 +424,20 @@ function fromList(n, list, length) {
return null;
}
if (length === 0) {
if (length === 0)
ret = null;
} else if (!n || n >= length) {
else if (!n || n >= length) {
// read it all, truncate the array.
ret = Buffer.concat(list, length);
if (stringMode)
ret = list.join('');
else
ret = Buffer.concat(list, length);
list.length = 0;
} else {
// read just some of it.
if (n < list[0].length) {
// just take a part of the first list item.
// slice is the same for buffers and strings.
var buf = list[0];
ret = buf.slice(0, n);
list[0] = buf.slice(n);
@ -410,17 +447,26 @@ function fromList(n, list, length) {
} else {
// complex case.
// we have enough to cover it, but it spans past the first buffer.
ret = new Buffer(n);
if (stringMode)
ret = '';
else
ret = new Buffer(n);
var c = 0;
for (var i = 0, l = list.length; i < l && c < n; i++) {
var buf = list[0];
var cpy = Math.min(n - c, buf.length);
buf.copy(ret, c, 0, cpy);
if (cpy < buf.length) {
if (stringMode)
ret += buf.slice(0, cpy);
else
buf.copy(ret, c, 0, cpy);
if (cpy < buf.length)
list[0] = buf.slice(cpy);
} else {
else
list.shift();
}
c += cpy;
}
}
@ -428,3 +474,12 @@ function fromList(n, list, length) {
return ret;
}
function endReadable(stream) {
var state = stream._readableState;
if (state.endEmitted)
return;
state.ended = true;
state.endEmitted = true;
process.nextTick(stream.emit.bind(stream, 'end'));
}