From ac5a185edf16efb7738c00f4d35f00bc2f93bf04 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 28 Nov 2012 10:46:24 -0800 Subject: [PATCH] streams2: Handle pipeChunkSize properly --- lib/_stream_readable.js | 45 ++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 20ce5ed2db3..53b920a7506 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -84,7 +84,7 @@ function ReadableState(options, stream) { // the number of writers that are awaiting a drain event in .pipe()s this.awaitDrain = 0; - this.flowChunkSize = null; + this.pipeChunkSize = null; this.decoder = null; if (options.encoding) { @@ -118,7 +118,7 @@ function howMuchToRead(n, state) { if (state.length === 0 && state.ended) return 0; - if (isNaN(n)) + if (isNaN(n) || n === null) return state.length; if (n <= 0) @@ -266,8 +266,8 @@ function onread(stream, er, chunk) { // if we haven't gotten enough to pass the lowWaterMark, // and we haven't ended, then don't bother telling the user - // that it's time to read more data. Otherwise, that'll - // probably kick off another stream.read(), which can trigger + // that it's time to read more data. Otherwise, emitting 'readable' + // probably will trigger another stream.read(), which can trigger // another _read(n,cb) before this one returns! if (state.length <= state.lowWaterMark) { state.reading = true; @@ -322,34 +322,41 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } if (pipeOpts && pipeOpts.chunkSize) - state.flowChunkSize = pipeOpts.chunkSize; + state.pipeChunkSize = pipeOpts.chunkSize; function onend() { dest.end(); } + // when the dest drains, it reduces the awaitDrain counter + // on the source. This would be more elegant with a .once() + // handler in flow(), but adding and removing repeatedly is + // too slow. + var ondrain = pipeOnDrain(src); + dest.on('drain', ondrain); + dest.on('unpipe', function(readable) { + if (readable === src) + dest.removeListener('drain', ondrain); + + // if the reader is waiting for a drain event from this + // specific writer, then it would cause it to never start + // flowing again. + // So, if this is awaiting a drain, then we just call it now. + // If we don't know, then assume that we are waiting for one. + if (!dest._writableState || dest._writableState.needDrain) + ondrain(); + }); + + // tell the dest that it's being piped to dest.emit('pipe', src); - // start the flow. + // start the flow if it hasn't been started already. if (!state.flowing) { // the handler that waits for readable events after all // the data gets sucked out in flow. // This would be easier to follow with a .once() handler // in flow(), but that is too slow. this.on('readable', pipeOnReadable); - var ondrain = pipeOnDrain(src); - dest.on('drain', ondrain); - dest.on('unpipe', function(readable) { - if (readable === src) - dest.removeListener('drain', ondrain); - - // if the reader is waiting for a drain event from this - // specific writer, then it would cause it to never start - // flowing again. - // So, if this is awaiting a drain, then we just call it now. - if (dest._writableState.needDrain) - ondrain(); - }); state.flowing = true; process.nextTick(function() {