streams2: Refactor out .once() usage from Readable.pipe()

pull/24504/head
isaacs 2012-11-28 01:25:39 -08:00
parent 38e2b0053a
commit 4b4ff2dff1
1 changed files with 29 additions and 10 deletions

View File

@ -81,6 +81,9 @@ function ReadableState(options, stream) {
// when piping, we only care about 'readable' events that happen
// after read()ing all the bytes and not getting any pushback.
this.ranOut = false;
// the number of writers that are awaiting a drain event in .pipe()s
this.awaitDrain = 0;
this.flowChunkSize = null;
this.decoder = null;
@ -330,6 +333,19 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// This would be easier to follow with a .once() handler
// in flow(), but that is too slow.
this.on('readable', pipeOnReadable);
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
dest.on('unpipe', function(readable) {
if (readable === src)
dest.removeListener('drain', ondrain);
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
if (dest._writableState.needDrain)
ondrain();
});
state.flowing = true;
process.nextTick(function() {
@ -340,22 +356,25 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
return dest;
};
function pipeOnDrain(src) {
return function() {
var dest = this;
var state = src._readableState;
state.awaitDrain --;
if (state.awaitDrain === 0)
flow(src);
};
}
function flow(src) {
var state = src._readableState;
var chunk;
var needDrain = 0;
function ondrain() {
needDrain--;
if (needDrain === 0)
flow(src);
}
state.awaitDrain = 0;
function write(dest, i, list) {
var written = dest.write(chunk);
if (false === written) {
needDrain++;
dest.once('drain', ondrain);
state.awaitDrain++;
}
}
@ -370,7 +389,7 @@ function flow(src) {
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
if (needDrain > 0)
if (state.awaitDrain > 0)
return;
}