mirror of https://github.com/nodejs/node.git
stream: add readableDidRead
Adds readableDidRead to streams and applies usage to http, http2 and quic. PR-URL: https://github.com/nodejs/node/pull/36820 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>pull/39362/head
parent
4a9fcb3534
commit
8306051001
|
@ -87,6 +87,7 @@ function IncomingMessage(socket) {
|
|||
this.statusMessage = null;
|
||||
this.client = socket;
|
||||
|
||||
// TODO: Deprecate and remove.
|
||||
this._consuming = false;
|
||||
// Flag for when we decide that this message cannot possibly be
|
||||
// read by the user, so there's no point continuing to handle it.
|
||||
|
|
|
@ -802,7 +802,7 @@ function resOnFinish(req, res, socket, state, server) {
|
|||
// If the user never called req.read(), and didn't pipe() or
|
||||
// .resume() or .on('data'), then we call req._dump() so that the
|
||||
// bytes will be pulled off the wire.
|
||||
if (!req._consuming && !req._readableState.resumeScheduled)
|
||||
if (!req.readableDidRead)
|
||||
req._dump();
|
||||
|
||||
// Make sure the requestTimeout is cleared before finishing.
|
||||
|
|
|
@ -171,6 +171,8 @@ function ReadableState(options, stream, isDuplex) {
|
|||
// If true, a maybeReadMore has been scheduled.
|
||||
this.readingMore = false;
|
||||
|
||||
this.didRead = false;
|
||||
|
||||
this.decoder = null;
|
||||
this.encoding = null;
|
||||
if (options && options.encoding) {
|
||||
|
@ -542,6 +544,8 @@ Readable.prototype.read = function(n) {
|
|||
if (ret !== null)
|
||||
this.emit('data', ret);
|
||||
|
||||
state.didRead = true;
|
||||
|
||||
return ret;
|
||||
};
|
||||
|
||||
|
@ -845,7 +849,9 @@ function pipeOnDrain(src, dest) {
|
|||
|
||||
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
|
||||
EE.listenerCount(src, 'data')) {
|
||||
// TODO(ronag): Call resume() instead?
|
||||
state.flowing = true;
|
||||
state.didRead = true;
|
||||
flow(src);
|
||||
}
|
||||
};
|
||||
|
@ -993,6 +999,7 @@ Readable.prototype.resume = function() {
|
|||
function resume(stream, state) {
|
||||
if (!state.resumeScheduled) {
|
||||
state.resumeScheduled = true;
|
||||
state.didRead = true;
|
||||
process.nextTick(resume_, stream, state);
|
||||
}
|
||||
}
|
||||
|
@ -1177,6 +1184,13 @@ ObjectDefineProperties(Readable.prototype, {
|
|||
}
|
||||
},
|
||||
|
||||
readableDidRead: {
|
||||
enumerable: false,
|
||||
get: function() {
|
||||
return this._readableState.didRead;
|
||||
}
|
||||
},
|
||||
|
||||
readableHighWaterMark: {
|
||||
enumerable: false,
|
||||
get: function() {
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
'use strict';
|
||||
require('../common');
|
||||
const assert = require('assert');
|
||||
const Readable = require('stream').Readable;
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
read: () => {}
|
||||
});
|
||||
|
||||
assert.strictEqual(readable.readableDidRead, false);
|
||||
readable.read();
|
||||
assert.strictEqual(readable.readableDidRead, true);
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
read: () => {}
|
||||
});
|
||||
|
||||
assert.strictEqual(readable.readableDidRead, false);
|
||||
readable.resume();
|
||||
assert.strictEqual(readable.readableDidRead, true);
|
||||
}
|
Loading…
Reference in New Issue