From c38ce9bc0a143141abfa638289b458ddaaac26d6 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 8 May 2013 12:54:29 -0700 Subject: [PATCH] stream: Guarantee ordering of 'finish' event In synchronous Writable streams (where the _write cb is called on the current tick), the 'finish' event (and thus the end() callback) can in some cases be called before all the write() callbacks are called. Use a counter, and have stream.Transform rely on the 'prefinish' event instead of the 'finish' event. This has zero effect on most streams, but it corrects an edge case and makes it perform more deterministically, which is a Good Thing. --- lib/_stream_transform.js | 2 +- lib/_stream_writable.js | 43 +++++++++++++++++++++++----- test/simple/test-stream2-writable.js | 16 +++++++++++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 8a00d343b60..958bfa11486 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -125,7 +125,7 @@ function Transform(options) { // sync guard flag. this._readableState.sync = false; - this.once('finish', function() { + this.once('prefinish', function() { if ('function' === typeof this._flush) this._flush(function(er) { done(stream, er); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9042df721be..f0034e3ba58 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -102,6 +102,14 @@ function WritableState(options, stream) { this.writelen = 0; this.buffer = []; + + // number of pending user-supplied write callbacks + // this must be 0 before 'finish' can be emitted + this.pendingcb = 0; + + // emit prefinish if the only thing we're waiting for is _write cbs + // This is relevant for synchronous Transform streams + this.prefinished = false; } function Writable(options) { @@ -171,8 +179,10 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (state.ended) writeAfterEnd(this, state, cb); - else if (validChunk(this, state, chunk, cb)) + else if (validChunk(this, state, chunk, cb)) { + state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); + } return ret; }; @@ -241,10 +251,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { function onwriteError(stream, state, sync, er, cb) { if (sync) process.nextTick(function() { + state.pendingcb--; cb(er); }); - else + else { + state.pendingcb--; cb(er); + } stream.emit('error', er); } @@ -289,9 +302,9 @@ function onwrite(stream, er) { function afterWrite(stream, state, finished, cb) { if (!finished) onwriteDrain(stream, state); + state.pendingcb--; cb(); - if (finished) - finishMaybe(stream, state); + finishMaybe(stream, state); } // Must force callback to be called on nextTick, so that we don't @@ -315,9 +328,14 @@ function clearBuffer(stream, state) { for (var c = 0; c < state.buffer.length; c++) cbs.push(state.buffer[c].callback); + // count the one we are adding, as well. + // TODO(isaacs) clean this up + state.pendingcb++; doWrite(stream, state, true, state.length, state.buffer, '', function(err) { - for (var i = 0; i < cbs.length; i++) + for (var i = 0; i < cbs.length; i++) { + state.pendingcb--; cbs[i](err); + } }); // Clear buffer @@ -390,11 +408,22 @@ function needFinish(stream, state) { !state.writing); } +function prefinish(stream, state) { + if (!state.prefinished) { + state.prefinished = true; + stream.emit('prefinish'); + } +} + function finishMaybe(stream, state) { var need = needFinish(stream, state); if (need) { - state.finished = true; - stream.emit('finish'); + if (state.pendingcb === 0) { + prefinish(stream, state); + state.finished = true; + stream.emit('finish'); + } else + prefinish(stream, state); } return need; } diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js index e0f384cb2ad..704100c0da4 100644 --- a/test/simple/test-stream2-writable.js +++ b/test/simple/test-stream2-writable.js @@ -375,3 +375,19 @@ test('finish does not come before write cb', function(t) { w.write(Buffer(0)); w.end(); }); + +test('finish does not come before sync _write cb', function(t) { + var w = new W(); + var writeCb = false; + w._write = function(chunk, e, cb) { + cb(); + }; + w.on('finish', function() { + assert(writeCb); + t.end(); + }); + w.write(Buffer(0), function(er) { + writeCb = true; + }); + w.end(); +});