stream: Guarantee ordering of 'finish' event

In synchronous Writable streams (where the _write cb is called on the
current tick), the 'finish' event (and thus the end() callback) can in
some cases be called before all the write() callbacks are called.

Use a counter, and have stream.Transform rely on the 'prefinish' event
instead of the 'finish' event.

This has zero effect on most streams, but it corrects an edge case and
makes it perform more deterministically, which is a Good Thing.
archived-io.js-v0.10
isaacs 2013-05-08 12:54:29 -07:00
parent 8a407f58b9
commit c38ce9bc0a
3 changed files with 53 additions and 8 deletions

View File

@ -125,7 +125,7 @@ function Transform(options) {
// sync guard flag.
this._readableState.sync = false;
this.once('finish', function() {
this.once('prefinish', function() {
if ('function' === typeof this._flush)
this._flush(function(er) {
done(stream, er);

View File

@ -102,6 +102,14 @@ function WritableState(options, stream) {
this.writelen = 0;
this.buffer = [];
// number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
this.pendingcb = 0;
// emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams
this.prefinished = false;
}
function Writable(options) {
@ -171,8 +179,10 @@ Writable.prototype.write = function(chunk, encoding, cb) {
if (state.ended)
writeAfterEnd(this, state, cb);
else if (validChunk(this, state, chunk, cb))
else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}
return ret;
};
@ -241,10 +251,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
function onwriteError(stream, state, sync, er, cb) {
if (sync)
process.nextTick(function() {
state.pendingcb--;
cb(er);
});
else
else {
state.pendingcb--;
cb(er);
}
stream.emit('error', er);
}
@ -289,9 +302,9 @@ function onwrite(stream, er) {
function afterWrite(stream, state, finished, cb) {
if (!finished)
onwriteDrain(stream, state);
state.pendingcb--;
cb();
if (finished)
finishMaybe(stream, state);
finishMaybe(stream, state);
}
// Must force callback to be called on nextTick, so that we don't
@ -315,9 +328,14 @@ function clearBuffer(stream, state) {
for (var c = 0; c < state.buffer.length; c++)
cbs.push(state.buffer[c].callback);
// count the one we are adding, as well.
// TODO(isaacs) clean this up
state.pendingcb++;
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++)
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
});
// Clear buffer
@ -390,11 +408,22 @@ function needFinish(stream, state) {
!state.writing);
}
function prefinish(stream, state) {
if (!state.prefinished) {
state.prefinished = true;
stream.emit('prefinish');
}
}
function finishMaybe(stream, state) {
var need = needFinish(stream, state);
if (need) {
state.finished = true;
stream.emit('finish');
if (state.pendingcb === 0) {
prefinish(stream, state);
state.finished = true;
stream.emit('finish');
} else
prefinish(stream, state);
}
return need;
}

View File

@ -375,3 +375,19 @@ test('finish does not come before write cb', function(t) {
w.write(Buffer(0));
w.end();
});
test('finish does not come before sync _write cb', function(t) {
var w = new W();
var writeCb = false;
w._write = function(chunk, e, cb) {
cb();
};
w.on('finish', function() {
assert(writeCb);
t.end();
});
w.write(Buffer(0), function(er) {
writeCb = true;
});
w.end();
});