From c3aae9cf95749949194f5288bad7468a995b1162 Mon Sep 17 00:00:00 2001 From: Gil Pedersen Date: Sun, 17 Mar 2013 15:04:01 +0100 Subject: [PATCH] stream: Fix stall in Transform under very specific conditions The stall is exposed in the test, though the test itself asserts before it stalls. The test is constructed to replicate the stalling state of a complex Passthrough usecase since I was not able to reliable trigger the stall. Some of the preconditions for triggering the stall are: * rs.length >= rs.highWaterMark * !rs.needReadable * _transform() handler that can return empty transforms * multiple sync write() calls Combined this can trigger a case where rs.reading is not cleared when further progress requires this. The fix is to always clear rs.reading. --- lib/_stream_transform.js | 1 + test/simple/test-stream2-transform.js | 36 +++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 1b6bbb00c9d..ec2b46a9534 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -101,6 +101,7 @@ function afterTransform(stream, er, data) { cb(er); var rs = stream._readableState; + rs.reading = false; if (rs.needReadable || rs.length < rs.highWaterMark) { stream._read(rs.highWaterMark); } diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 500c48b8ac0..7a32a3cce8b 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -235,6 +235,42 @@ test('assymetric transform (compress)', function(t) { }); }); +// this tests for a stall when data is written to a full stream +// that has empty transforms. +test('complex transform', function(t) { + var count = 0; + var saved = null; + var pt = new Transform({highWaterMark:3}); + pt._transform = function(c, e, cb) { + if (count++ === 1) + saved = c; + else { + if (saved) { + pt.push(saved); + saved = null; + } + pt.push(c); + } + + cb(); + }; + + pt.once('readable', function() { + process.nextTick(function() { + pt.write(new Buffer('d')); + pt.write(new Buffer('ef'), function() { + pt.end(); + t.end(); + }); + t.equal(pt.read().toString(), 'abc'); + t.equal(pt.read().toString(), 'def'); + t.equal(pt.read(), null); + }); + }); + + pt.write(new Buffer('abc')); +}); + test('passthrough event emission', function(t) { var pt = new PassThrough();