mirror of https://github.com/nodejs/node.git
stream: enable autoDestroy by default
PR-URL: https://github.com/nodejs/node/pull/30623 Refs: https://github.com/nodejs/node/issues/30621 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: Rich Trott <rtrott@gmail.com>pull/31175/head
parent
c52ebc06da
commit
4bec6d13f9
|
@ -277,8 +277,8 @@ added: v0.9.4
|
|||
The `'error'` event is emitted if an error occurred while writing or piping
|
||||
data. The listener callback is passed a single `Error` argument when called.
|
||||
|
||||
The stream is not closed when the `'error'` event is emitted unless the
|
||||
[`autoDestroy`][writable-new] option was set to `true` when creating the
|
||||
The stream is closed when the `'error'` event is emitted unless the
|
||||
[`autoDestroy`][writable-new] option was set to `false` when creating the
|
||||
stream.
|
||||
|
||||
After `'error'`, no further events other than `'close'` *should* be emitted
|
||||
|
@ -1667,11 +1667,7 @@ const { Writable } = require('stream');
|
|||
|
||||
class MyWritable extends Writable {
|
||||
constructor({ highWaterMark, ...options }) {
|
||||
super({
|
||||
highWaterMark,
|
||||
autoDestroy: true,
|
||||
emitClose: true
|
||||
});
|
||||
super({ highWaterMark });
|
||||
// ...
|
||||
}
|
||||
}
|
||||
|
@ -1745,6 +1741,9 @@ changes:
|
|||
pr-url: https://github.com/nodejs/node/pull/22795
|
||||
description: Add `autoDestroy` option to automatically `destroy()` the
|
||||
stream when it emits `'finish'` or errors.
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/30623
|
||||
description: Change `autoDestroy` option default to `true`.
|
||||
-->
|
||||
|
||||
* `options` {Object}
|
||||
|
@ -1776,7 +1775,7 @@ changes:
|
|||
* `final` {Function} Implementation for the
|
||||
[`stream._final()`][stream-_final] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `false`.
|
||||
`.destroy()` on itself after ending. **Default:** `true`.
|
||||
|
||||
<!-- eslint-disable no-useless-constructor -->
|
||||
```js
|
||||
|
@ -2021,6 +2020,9 @@ changes:
|
|||
pr-url: https://github.com/nodejs/node/pull/22795
|
||||
description: Add `autoDestroy` option to automatically `destroy()` the
|
||||
stream when it emits `'end'` or errors.
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/30623
|
||||
description: Change `autoDestroy` option default to `true`.
|
||||
-->
|
||||
|
||||
* `options` {Object}
|
||||
|
@ -2039,7 +2041,7 @@ changes:
|
|||
* `destroy` {Function} Implementation for the
|
||||
[`stream._destroy()`][readable-_destroy] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `false`.
|
||||
`.destroy()` on itself after ending. **Default:** `true`.
|
||||
|
||||
<!-- eslint-disable no-useless-constructor -->
|
||||
```js
|
||||
|
|
|
@ -48,7 +48,7 @@ function IncomingMessage(socket) {
|
|||
};
|
||||
}
|
||||
|
||||
Stream.Readable.call(this, streamOptions);
|
||||
Stream.Readable.call(this, { autoDestroy: false, ...streamOptions });
|
||||
|
||||
this._readableState.readingMore = true;
|
||||
|
||||
|
|
|
@ -138,7 +138,7 @@ function ReadableState(options, stream, isDuplex) {
|
|||
this.emitClose = !options || options.emitClose !== false;
|
||||
|
||||
// Should .destroy() be called after 'end' (and potentially 'finish')
|
||||
this.autoDestroy = !!(options && options.autoDestroy);
|
||||
this.autoDestroy = !options || options.autoDestroy !== false;
|
||||
|
||||
// Has it been destroyed
|
||||
this.destroyed = false;
|
||||
|
@ -201,11 +201,7 @@ Readable.prototype._destroy = function(err, cb) {
|
|||
};
|
||||
|
||||
Readable.prototype[EE.captureRejectionSymbol] = function(err) {
|
||||
// TODO(mcollina): remove the destroyed if once errorEmitted lands in
|
||||
// Readable.
|
||||
if (!this.destroyed) {
|
||||
this.destroy(err);
|
||||
}
|
||||
this.destroy(err);
|
||||
};
|
||||
|
||||
// Manually shove something into the read() buffer.
|
||||
|
|
|
@ -167,7 +167,7 @@ function WritableState(options, stream, isDuplex) {
|
|||
this.emitClose = !options || options.emitClose !== false;
|
||||
|
||||
// Should .destroy() be called after 'finish' (and potentially 'end')
|
||||
this.autoDestroy = !!(options && options.autoDestroy);
|
||||
this.autoDestroy = !options || options.autoDestroy !== false;
|
||||
|
||||
// Indicates whether the stream has errored. When true all write() calls
|
||||
// should return false. This is needed since when autoDestroy
|
||||
|
|
|
@ -77,6 +77,9 @@ function ReadStream(path, options) {
|
|||
if (options.emitClose === undefined) {
|
||||
options.emitClose = false;
|
||||
}
|
||||
if (options.autoDestroy === undefined) {
|
||||
options.autoDestroy = false;
|
||||
}
|
||||
|
||||
this[kFs] = options.fs || fs;
|
||||
|
||||
|
@ -298,6 +301,9 @@ function WriteStream(path, options) {
|
|||
if (options.emitClose === undefined) {
|
||||
options.emitClose = false;
|
||||
}
|
||||
if (options.autoDestroy === undefined) {
|
||||
options.autoDestroy = false;
|
||||
}
|
||||
|
||||
this[kFs] = options.fs || fs;
|
||||
if (typeof this[kFs].open !== 'function') {
|
||||
|
|
|
@ -282,7 +282,7 @@ function onStreamTimeout(kind) {
|
|||
|
||||
class Http2ServerRequest extends Readable {
|
||||
constructor(stream, headers, options, rawHeaders) {
|
||||
super(options);
|
||||
super({ autoDestroy: false, ...options });
|
||||
this[kState] = {
|
||||
closed: false,
|
||||
didRead: false,
|
||||
|
|
|
@ -1782,6 +1782,7 @@ class Http2Stream extends Duplex {
|
|||
constructor(session, options) {
|
||||
options.allowHalfOpen = true;
|
||||
options.decodeStrings = false;
|
||||
options.autoDestroy = false;
|
||||
super(options);
|
||||
this[async_id_symbol] = -1;
|
||||
|
||||
|
|
|
@ -289,6 +289,7 @@ function Socket(options) {
|
|||
options.allowHalfOpen = true;
|
||||
// For backwards compat do not emit close on destroy.
|
||||
options.emitClose = false;
|
||||
options.autoDestroy = false;
|
||||
// Handle strings directly.
|
||||
options.decodeStrings = false;
|
||||
stream.Duplex.call(this, options);
|
||||
|
|
|
@ -259,7 +259,7 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
|
|||
}
|
||||
}
|
||||
|
||||
Transform.call(this, opts);
|
||||
Transform.call(this, { autoDestroy: false, ...opts });
|
||||
this._hadError = false;
|
||||
this.bytesWritten = 0;
|
||||
this._handle = handle;
|
||||
|
|
|
@ -62,8 +62,8 @@ const Stream = require('stream').Stream;
|
|||
const R = Stream.Readable;
|
||||
const W = Stream.Writable;
|
||||
|
||||
const r = new R();
|
||||
const w = new W();
|
||||
const r = new R({ autoDestroy: false });
|
||||
const w = new W({ autoDestroy: false });
|
||||
let removed = false;
|
||||
|
||||
r._read = common.mustCall(function() {
|
||||
|
|
|
@ -32,7 +32,7 @@ const assert = require('assert');
|
|||
|
||||
const stream = require('stream');
|
||||
const hwm = 10;
|
||||
const r = stream.Readable({ highWaterMark: hwm });
|
||||
const r = stream.Readable({ highWaterMark: hwm, autoDestroy: false });
|
||||
const chunks = 10;
|
||||
|
||||
const data = Buffer.allocUnsafe(chunks * hwm + Math.ceil(hwm / 2));
|
||||
|
|
|
@ -5,6 +5,9 @@ const assert = require('assert');
|
|||
const stream = require('stream');
|
||||
|
||||
class MyWritable extends stream.Writable {
|
||||
constructor(options) {
|
||||
super({ autoDestroy: false, ...options });
|
||||
}
|
||||
_write(chunk, encoding, callback) {
|
||||
assert.notStrictEqual(chunk, null);
|
||||
callback();
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
'use strict';
|
||||
const common = require('../common');
|
||||
const { Writable } = require('stream');
|
||||
const assert = require('assert');
|
||||
|
||||
{
|
||||
// Sync + Sync
|
||||
const writable = new Writable({
|
||||
write: common.mustCall((buf, enc, cb) => {
|
||||
cb();
|
||||
assert.throws(cb, {
|
||||
code: 'ERR_MULTIPLE_CALLBACK',
|
||||
name: 'Error'
|
||||
});
|
||||
cb();
|
||||
})
|
||||
});
|
||||
writable.write('hi');
|
||||
writable.on('error', common.expectsError({
|
||||
code: 'ERR_MULTIPLE_CALLBACK',
|
||||
name: 'Error'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -23,14 +23,15 @@ const assert = require('assert');
|
|||
write: common.mustCall((buf, enc, cb) => {
|
||||
cb();
|
||||
process.nextTick(() => {
|
||||
assert.throws(cb, {
|
||||
code: 'ERR_MULTIPLE_CALLBACK',
|
||||
name: 'Error'
|
||||
});
|
||||
cb();
|
||||
});
|
||||
})
|
||||
});
|
||||
writable.write('hi');
|
||||
writable.on('error', common.expectsError({
|
||||
code: 'ERR_MULTIPLE_CALLBACK',
|
||||
name: 'Error'
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -39,12 +40,13 @@ const assert = require('assert');
|
|||
write: common.mustCall((buf, enc, cb) => {
|
||||
process.nextTick(cb);
|
||||
process.nextTick(() => {
|
||||
assert.throws(cb, {
|
||||
code: 'ERR_MULTIPLE_CALLBACK',
|
||||
name: 'Error'
|
||||
});
|
||||
cb();
|
||||
});
|
||||
})
|
||||
});
|
||||
writable.write('hi');
|
||||
writable.on('error', common.expectsError({
|
||||
code: 'ERR_MULTIPLE_CALLBACK',
|
||||
name: 'Error'
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ const stream = require('stream');
|
|||
stream.Readable.prototype.unpipe.call(this, dest);
|
||||
};
|
||||
|
||||
const dest = new stream.Writable();
|
||||
const dest = new stream.Writable({ autoDestroy: false });
|
||||
dest._write = function(chunk, encoding, cb) {
|
||||
cb();
|
||||
};
|
||||
|
|
|
@ -275,7 +275,7 @@ const helloWorldBuffer = Buffer.from('hello world');
|
|||
|
||||
{
|
||||
// Verify writables cannot be piped
|
||||
const w = new W();
|
||||
const w = new W({ autoDestroy: false });
|
||||
w._write = common.mustNotCall();
|
||||
let gotError = false;
|
||||
w.on('error', function() {
|
||||
|
|
Loading…
Reference in New Issue