From b43e544140ccf68580c02e71c56d19b82e1e1d32 Mon Sep 17 00:00:00 2001 From: isaacs Date: Thu, 10 Jan 2013 13:49:19 -0800 Subject: [PATCH] stream: Use push() for Transform._output() This also slightly changes the semantics, in that a 'readable' event may be triggered by the first write() call, even if a user has not yet called read(). This happens because the Transform _write() handler is calling read(0) to start the flow of data. Technically, the new behavior is more 'correct', since it is more in line with the semantics of the 'readable' event in other streams. --- lib/_stream_transform.js | 44 ++------------------------- test/simple/test-stream2-transform.js | 26 ++++++++++------ 2 files changed, 19 insertions(+), 51 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index b0819de29a5..020dc8d76a6 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -75,7 +75,7 @@ function TransformState(stream) { this.transforming = false; this.pendingReadCb = null; this.output = function(chunk) { - stream._output(chunk); + stream.push(chunk); }; } @@ -148,9 +148,6 @@ Transform.prototype._read = function(n, readcb) { var rs = this._readableState; var ts = this._transformState; - if (ts.pendingReadCb) - throw new Error('_read while _read already in progress'); - ts.pendingReadCb = readcb; // if there's nothing pending, then we just wait. @@ -173,31 +170,6 @@ Transform.prototype._read = function(n, readcb) { }); }; -Transform.prototype._output = function(chunk) { - if (!chunk || !chunk.length) - return; - - // if we've got a pending readcb, then just call that, - // and let Readable take care of it. If not, then we fill - // the readable buffer ourselves, and emit whatever's needed. - var ts = this._transformState; - var readcb = ts.pendingReadCb; - if (readcb) { - ts.pendingReadCb = null; - readcb(null, chunk); - return; - } - - // otherwise, it's up to us to fill the rs buffer. - var rs = this._readableState; - var len = rs.length; - rs.buffer.push(chunk); - rs.length += chunk.length; - if (rs.needReadable) { - rs.needReadable = false; - this.emit('readable'); - } -}; function done(stream, er) { if (er) @@ -215,17 +187,5 @@ function done(stream, er) { if (ts.transforming) throw new Error('calling transform done when still transforming'); - // if we were waiting on a read, let them know that it isn't coming. - var readcb = ts.pendingReadCb; - if (readcb) - return readcb(); - - rs.ended = true; - // we may have gotten a 'null' read before, and since there is - // no more data coming from the writable side, we need to emit - // now so that the consumer knows to pick up the tail bits. - if (rs.length && rs.needReadable) - stream.emit('readable'); - else if (rs.length === 0) - stream.emit('end'); + return stream.push(null); } diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 2bc008517cc..90870807518 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -215,35 +215,40 @@ test('passthrough event emission', function(t) { var i = 0; pt.write(new Buffer('foog')); + + console.error('need emit 0'); pt.write(new Buffer('bark')); + console.error('should have emitted readable now 1 === %d', emits); + t.equal(emits, 1); + t.equal(pt.read(5).toString(), 'foogb'); t.equal(pt.read(5) + '', 'null'); - console.error('need emit 0'); + console.error('need emit 1'); pt.write(new Buffer('bazy')); console.error('should have emitted, but not again'); pt.write(new Buffer('kuel')); - console.error('should have emitted readable now 1 === %d', emits); - t.equal(emits, 1); + console.error('should have emitted readable now 2 === %d', emits); + t.equal(emits, 2); t.equal(pt.read(5).toString(), 'arkba'); t.equal(pt.read(5).toString(), 'zykue'); t.equal(pt.read(5), null); - console.error('need emit 1'); + console.error('need emit 2'); pt.end(); - t.equal(emits, 2); + t.equal(emits, 3); t.equal(pt.read(5).toString(), 'l'); t.equal(pt.read(5), null); console.error('should not have emitted again'); - t.equal(emits, 2); + t.equal(emits, 3); t.end(); }); @@ -256,25 +261,28 @@ test('passthrough event emission reordered', function(t) { }); pt.write(new Buffer('foog')); + console.error('need emit 0'); pt.write(new Buffer('bark')); + console.error('should have emitted readable now 1 === %d', emits); + t.equal(emits, 1); t.equal(pt.read(5).toString(), 'foogb'); t.equal(pt.read(5), null); - console.error('need emit 0'); + console.error('need emit 1'); pt.once('readable', function() { t.equal(pt.read(5).toString(), 'arkba'); t.equal(pt.read(5), null); - console.error('need emit 1'); + console.error('need emit 2'); pt.once('readable', function() { t.equal(pt.read(5).toString(), 'zykue'); t.equal(pt.read(5), null); pt.once('readable', function() { t.equal(pt.read(5).toString(), 'l'); t.equal(pt.read(5), null); - t.equal(emits, 3); + t.equal(emits, 4); t.end(); }); pt.end();