diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index abb23e41c70..9c242990d4f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -81,6 +81,9 @@ function ReadableState(options, stream) { // when piping, we only care about 'readable' events that happen // after read()ing all the bytes and not getting any pushback. this.ranOut = false; + + // the number of writers that are awaiting a drain event in .pipe()s + this.awaitDrain = 0; this.flowChunkSize = null; this.decoder = null; @@ -330,6 +333,19 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // This would be easier to follow with a .once() handler // in flow(), but that is too slow. this.on('readable', pipeOnReadable); + var ondrain = pipeOnDrain(src); + dest.on('drain', ondrain); + dest.on('unpipe', function(readable) { + if (readable === src) + dest.removeListener('drain', ondrain); + + // if the reader is waiting for a drain event from this + // specific writer, then it would cause it to never start + // flowing again. + // So, if this is awaiting a drain, then we just call it now. + if (dest._writableState.needDrain) + ondrain(); + }); state.flowing = true; process.nextTick(function() { @@ -340,22 +356,25 @@ Readable.prototype.pipe = function(dest, pipeOpts) { return dest; }; +function pipeOnDrain(src) { + return function() { + var dest = this; + var state = src._readableState; + state.awaitDrain --; + if (state.awaitDrain === 0) + flow(src); + }; +} + function flow(src) { var state = src._readableState; var chunk; - var needDrain = 0; - - function ondrain() { - needDrain--; - if (needDrain === 0) - flow(src); - } + state.awaitDrain = 0; function write(dest, i, list) { var written = dest.write(chunk); if (false === written) { - needDrain++; - dest.once('drain', ondrain); + state.awaitDrain++; } } @@ -370,7 +389,7 @@ function flow(src) { src.emit('data', chunk); // if anyone needs a drain, then we have to wait for that. - if (needDrain > 0) + if (state.awaitDrain > 0) return; }