diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index e75e201e02d..9193f4f27b9 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -105,21 +105,51 @@ function Readable(options) { // similar to how Writable.write() returns true if you should // write() some more. Readable.prototype.push = function(chunk) { - var rs = this._readableState; - rs.onread(null, chunk); - - // if it's past the high water mark, we can push in some more. - // Also, if we have no data yet, we can stand some - // more bytes. This is to work around cases where hwm=0, - // such as the repl. Also, if the push() triggered a - // readable event, and the user called read(largeNumber) such that - // needReadable was set, then we ought to push more, so that another - // 'readable' event will be triggered. - return rs.needReadable || - rs.length < rs.highWaterMark || - rs.length === 0; + var state = this._readableState; + return readableAddChunk(this, state, chunk); }; +function readableAddChunk(stream, state, chunk) { + state.reading = false; + + var er = chunkInvalid(state, chunk); + if (er) { + stream.emit('error', er); + } else if (chunk === null || chunk === undefined) { + onreadEof(stream, state); + } else if (state.objectMode || chunk && chunk.length > 0) { + if (state.decoder) + chunk = state.decoder.write(chunk); + + // update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + state.buffer.push(chunk); + + if (state.needReadable) + emitReadable(stream); + + maybeReadMore(stream, state); + } + + return needMoreData(state); +} + + + +// if it's past the high water mark, we can push in some more. +// Also, if we have no data yet, we can stand some +// more bytes. This is to work around cases where hwm=0, +// such as the repl. Also, if the push() triggered a +// readable event, and the user called read(largeNumber) such that +// needReadable was set, then we ought to push more, so that another +// 'readable' event will be triggered. +function needMoreData(state) { + return !state.ended && + (state.needReadable || + state.length < state.highWaterMark || + state.length === 0); +} + // backwards compatibility. Readable.prototype.setEncoding = function(enc) { if (!StringDecoder) @@ -263,15 +293,20 @@ Readable.prototype.read = function(n) { return ret; }; +// This is the function passed to _read(n,cb) as the callback. +// It should be called exactly once for every _read() call. function onread(stream, er, chunk) { var state = stream._readableState; var sync = state.sync; - // If we get something that is not a buffer, string, null, or undefined, - // and we're not in objectMode, then that's an error. - // Otherwise stream chunks are all considered to be of length=1, and the - // watermarks determine how many objects to keep in the buffer, rather than - // how many bytes or characters. + if (er) + stream.emit('error', er); + else + stream.push(chunk); +} + +function chunkInvalid(state, chunk) { + var er = null; if (!Buffer.isBuffer(chunk) && 'string' !== typeof chunk && chunk !== null && @@ -280,68 +315,26 @@ function onread(stream, er, chunk) { !er) { er = new TypeError('Invalid non-string/buffer chunk'); } + return er; +} - state.reading = false; - if (er) - return stream.emit('error', er); - if (chunk === null || chunk === undefined) { - // eof - state.ended = true; - if (state.decoder) { - chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } +function onreadEof(stream, state) { + state.ended = true; + if (state.decoder) { + var chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += state.objectMode ? 1 : chunk.length; } - - // if we've ended and we have some data left, then emit - // 'readable' now to make sure it gets picked up. - if (state.length > 0) - emitReadable(stream); - else - endReadable(stream); - return; } - // at this point, if we got a zero-length buffer or string, - // and we're not in object-mode, then there's really no point - // continuing. it means that there is nothing to read right - // now, but as we have not received the EOF-signaling null, - // we're not ended. we've already unset the reading flag, - // so just get out of here. - if (!state.objectMode && - (chunk || typeof chunk === 'string') && - 0 === chunk.length) - return; - - if (state.decoder) - chunk = state.decoder.write(chunk); - - // update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - state.buffer.push(chunk); - - // if we haven't gotten any data, - // and we haven't ended, then don't bother telling the user - // that it's time to read more data. Otherwise, emitting 'readable' - // probably will trigger another stream.read(), which can trigger - // another _read(n,cb) before this one returns! - if (state.length === 0) { - state.reading = true; - stream._read(state.bufferSize, state.onread); - return; - } - - if (state.needReadable) + // if we've ended and we have some data left, then emit + // 'readable' now to make sure it gets picked up. + if (state.length > 0) emitReadable(stream); - else if (state.sync) - process.nextTick(function() { - maybeReadMore(stream, state); - }); else - maybeReadMore(stream, state); + endReadable(stream); } // Don't emit readable right away in sync mode, because this can trigger @@ -365,17 +358,26 @@ function emitReadable(stream) { function emitReadable_(stream) { var state = stream._readableState; stream.emit('readable'); - maybeReadMore(stream, state); } + +// at this point, the user has presumably seen the 'readable' event, +// and called read() to consume some data. that may have triggered +// in turn another _read(n,cb) call, in which case reading = true if +// it's in progress. +// However, if we're not ended, or reading, and the length < hwm, +// then go ahead and try to read some more right now preemptively. function maybeReadMore(stream, state) { - // at this point, the user has presumably seen the 'readable' event, - // and called read() to consume some data. that may have triggered - // in turn another _read(n,cb) call, in which case reading = true if - // it's in progress. - // However, if we're not ended, or reading, and the length < hwm, - // then go ahead and try to read some more right now preemptively. - if (!state.reading && !state.ending && !state.ended && + if (state.sync) + process.nextTick(function() { + maybeReadMore_(stream, state); + }); + else + maybeReadMore_(stream, state); +} + +function maybeReadMore_(stream, state) { + if (!state.reading && !state.ended && state.length < state.highWaterMark) { stream.read(0); }