mirror of https://github.com/nodejs/node.git
stream: Fix stall in Transform under very specific conditions
The stall is exposed in the test, though the test itself asserts before it stalls. The test is constructed to replicate the stalling state of a complex Passthrough usecase since I was not able to reliable trigger the stall. Some of the preconditions for triggering the stall are: * rs.length >= rs.highWaterMark * !rs.needReadable * _transform() handler that can return empty transforms * multiple sync write() calls Combined this can trigger a case where rs.reading is not cleared when further progress requires this. The fix is to always clear rs.reading.pull/24507/merge
parent
bfd16de125
commit
c3aae9cf95
|
@ -101,6 +101,7 @@ function afterTransform(stream, er, data) {
|
|||
cb(er);
|
||||
|
||||
var rs = stream._readableState;
|
||||
rs.reading = false;
|
||||
if (rs.needReadable || rs.length < rs.highWaterMark) {
|
||||
stream._read(rs.highWaterMark);
|
||||
}
|
||||
|
|
|
@ -235,6 +235,42 @@ test('assymetric transform (compress)', function(t) {
|
|||
});
|
||||
});
|
||||
|
||||
// this tests for a stall when data is written to a full stream
|
||||
// that has empty transforms.
|
||||
test('complex transform', function(t) {
|
||||
var count = 0;
|
||||
var saved = null;
|
||||
var pt = new Transform({highWaterMark:3});
|
||||
pt._transform = function(c, e, cb) {
|
||||
if (count++ === 1)
|
||||
saved = c;
|
||||
else {
|
||||
if (saved) {
|
||||
pt.push(saved);
|
||||
saved = null;
|
||||
}
|
||||
pt.push(c);
|
||||
}
|
||||
|
||||
cb();
|
||||
};
|
||||
|
||||
pt.once('readable', function() {
|
||||
process.nextTick(function() {
|
||||
pt.write(new Buffer('d'));
|
||||
pt.write(new Buffer('ef'), function() {
|
||||
pt.end();
|
||||
t.end();
|
||||
});
|
||||
t.equal(pt.read().toString(), 'abc');
|
||||
t.equal(pt.read().toString(), 'def');
|
||||
t.equal(pt.read(), null);
|
||||
});
|
||||
});
|
||||
|
||||
pt.write(new Buffer('abc'));
|
||||
});
|
||||
|
||||
|
||||
test('passthrough event emission', function(t) {
|
||||
var pt = new PassThrough();
|
||||
|
|
Loading…
Reference in New Issue