diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 8a00d343b60..958bfa11486 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -125,7 +125,7 @@ function Transform(options) { // sync guard flag. this._readableState.sync = false; - this.once('finish', function() { + this.once('prefinish', function() { if ('function' === typeof this._flush) this._flush(function(er) { done(stream, er); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9042df721be..f0034e3ba58 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -102,6 +102,14 @@ function WritableState(options, stream) { this.writelen = 0; this.buffer = []; + + // number of pending user-supplied write callbacks + // this must be 0 before 'finish' can be emitted + this.pendingcb = 0; + + // emit prefinish if the only thing we're waiting for is _write cbs + // This is relevant for synchronous Transform streams + this.prefinished = false; } function Writable(options) { @@ -171,8 +179,10 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (state.ended) writeAfterEnd(this, state, cb); - else if (validChunk(this, state, chunk, cb)) + else if (validChunk(this, state, chunk, cb)) { + state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); + } return ret; }; @@ -241,10 +251,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { function onwriteError(stream, state, sync, er, cb) { if (sync) process.nextTick(function() { + state.pendingcb--; cb(er); }); - else + else { + state.pendingcb--; cb(er); + } stream.emit('error', er); } @@ -289,9 +302,9 @@ function onwrite(stream, er) { function afterWrite(stream, state, finished, cb) { if (!finished) onwriteDrain(stream, state); + state.pendingcb--; cb(); - if (finished) - finishMaybe(stream, state); + finishMaybe(stream, state); } // Must force callback to be called on nextTick, so that we don't @@ -315,9 +328,14 @@ function clearBuffer(stream, state) { for (var c = 0; c < state.buffer.length; c++) cbs.push(state.buffer[c].callback); + // count the one we are adding, as well. + // TODO(isaacs) clean this up + state.pendingcb++; doWrite(stream, state, true, state.length, state.buffer, '', function(err) { - for (var i = 0; i < cbs.length; i++) + for (var i = 0; i < cbs.length; i++) { + state.pendingcb--; cbs[i](err); + } }); // Clear buffer @@ -390,11 +408,22 @@ function needFinish(stream, state) { !state.writing); } +function prefinish(stream, state) { + if (!state.prefinished) { + state.prefinished = true; + stream.emit('prefinish'); + } +} + function finishMaybe(stream, state) { var need = needFinish(stream, state); if (need) { - state.finished = true; - stream.emit('finish'); + if (state.pendingcb === 0) { + prefinish(stream, state); + state.finished = true; + stream.emit('finish'); + } else + prefinish(stream, state); } return need; } diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js index e0f384cb2ad..704100c0da4 100644 --- a/test/simple/test-stream2-writable.js +++ b/test/simple/test-stream2-writable.js @@ -375,3 +375,19 @@ test('finish does not come before write cb', function(t) { w.write(Buffer(0)); w.end(); }); + +test('finish does not come before sync _write cb', function(t) { + var w = new W(); + var writeCb = false; + w._write = function(chunk, e, cb) { + cb(); + }; + w.on('finish', function() { + assert(writeCb); + t.end(); + }); + w.write(Buffer(0), function(er) { + writeCb = true; + }); + w.end(); +});