diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 7b2d76c5348..db76ab73b81 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -24,6 +24,7 @@ module.exports = Readable; var Stream = require('stream'); var util = require('util'); var assert = require('assert'); +var StringDecoder; util.inherits(Readable, Stream); @@ -41,12 +42,20 @@ function ReadableState(options, stream) { this.pipes = []; this.flowing = false; this.ended = false; + this.endEmitted = false; this.stream = stream; this.reading = false; // whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. this.needReadable = false; + + this.decoder = null; + if (options.encoding) { + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + this.decoder = new StringDecoder(options.encoding); + } } function Readable(options) { @@ -54,12 +63,19 @@ function Readable(options) { Stream.apply(this); } +// backwards compatibility. +Readable.prototype.setEncoding = function(enc) { + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + this._readableState.decoder = new StringDecoder(enc); +}; + // you can override either this method, or _read(n, cb) below. Readable.prototype.read = function(n) { var state = this._readableState; if (state.length === 0 && state.ended) { - process.nextTick(this.emit.bind(this, 'end')); + endReadable(this); return null; } @@ -85,7 +101,12 @@ Readable.prototype.read = function(n) { n = state.length; } - var ret = n > 0 ? fromList(n, state.buffer, state.length) : null; + + var ret; + if (n > 0) + ret = fromList(n, state.buffer, state.length, !!state.decoder); + else + ret = null; if (ret === null || ret.length === 0) state.needReadable = true; @@ -108,13 +129,26 @@ Readable.prototype.read = function(n) { // 'readable' now to make sure it gets picked up. if (state.length > 0) this.emit('readable'); + else + endReadable(this); return; } + if (state.decoder) + chunk = state.decoder.write(chunk); + state.length += chunk.length; state.buffer.push(chunk); - if (state.length < state.lowWaterMark) + + // if we haven't gotten enough to pass the lowWaterMark, + // and we haven't ended, then don't bother telling the user + // that it's time to read more data. Otherwise, that'll + // probably kick off another stream.read(), which can trigger + // another _read(n,cb) before this one returns! + if (state.length < state.lowWaterMark) { this._read(state.bufferSize, onread.bind(this)); + return; + } // now we have something to call this.read() to get. if (state.needReadable) { @@ -309,7 +343,7 @@ Readable.prototype.wrap = function(stream) { stream.on('end', function() { state.ended = true; if (state.length === 0) - this.emit('end'); + endReadable(this); }.bind(this)); stream.on('data', function(chunk) { @@ -360,7 +394,7 @@ Readable.prototype.wrap = function(stream) { n = state.length; } - var ret = fromList(n, state.buffer, state.length); + var ret = fromList(n, state.buffer, state.length, !!state.decoder); state.length -= n; if (state.length < state.lowWaterMark && paused) { @@ -369,7 +403,7 @@ Readable.prototype.wrap = function(stream) { } if (state.length === 0 && state.ended) - process.nextTick(this.emit.bind(this, 'end')); + endReadable(this); return ret; }; @@ -382,8 +416,7 @@ Readable._fromList = fromList; // Pluck off n bytes from an array of buffers. // Length is the combined lengths of all the buffers in the list. -// If there's no data, then -function fromList(n, list, length) { +function fromList(n, list, length, stringMode) { var ret; // nothing in the list, definitely empty. @@ -391,16 +424,20 @@ function fromList(n, list, length) { return null; } - if (length === 0) { + if (length === 0) ret = null; - } else if (!n || n >= length) { + else if (!n || n >= length) { // read it all, truncate the array. - ret = Buffer.concat(list, length); + if (stringMode) + ret = list.join(''); + else + ret = Buffer.concat(list, length); list.length = 0; } else { // read just some of it. if (n < list[0].length) { // just take a part of the first list item. + // slice is the same for buffers and strings. var buf = list[0]; ret = buf.slice(0, n); list[0] = buf.slice(n); @@ -410,17 +447,26 @@ function fromList(n, list, length) { } else { // complex case. // we have enough to cover it, but it spans past the first buffer. - ret = new Buffer(n); + if (stringMode) + ret = ''; + else + ret = new Buffer(n); + var c = 0; for (var i = 0, l = list.length; i < l && c < n; i++) { var buf = list[0]; var cpy = Math.min(n - c, buf.length); - buf.copy(ret, c, 0, cpy); - if (cpy < buf.length) { + + if (stringMode) + ret += buf.slice(0, cpy); + else + buf.copy(ret, c, 0, cpy); + + if (cpy < buf.length) list[0] = buf.slice(cpy); - } else { + else list.shift(); - } + c += cpy; } } @@ -428,3 +474,12 @@ function fromList(n, list, length) { return ret; } + +function endReadable(stream) { + var state = stream._readableState; + if (state.endEmitted) + return; + state.ended = true; + state.endEmitted = true; + process.nextTick(stream.emit.bind(stream, 'end')); +}