stream: allow transfer of readable byte streams

Updates the `ReadableStream` constructor to mark byte streams as
transferable. When transferred, byte streams become regular streams.

Refs: https://github.com/nodejs/node/pull/39062
Refs: https://streams.spec.whatwg.org/#rs-transfer
PR-URL: https://github.com/nodejs/node/pull/45955
Reviewed-By: Daeyeon Jeong <daeyeon.dev@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
pull/46499/head
MrBBot 2023-02-04 19:30:15 +00:00 committed by GitHub
parent 806a516851
commit 03854f6487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 9 deletions

View File

@ -281,17 +281,16 @@ class ReadableStream {
this,
source,
extractHighWaterMark(highWaterMark, 0));
return;
} else {
if (type !== undefined)
throw new ERR_INVALID_ARG_VALUE('source.type', type);
setupReadableStreamDefaultControllerFromSource(
this,
source,
extractHighWaterMark(highWaterMark, 1),
extractSizeAlgorithm(size));
}
if (type !== undefined)
throw new ERR_INVALID_ARG_VALUE('source.type', type);
setupReadableStreamDefaultControllerFromSource(
this,
source,
extractHighWaterMark(highWaterMark, 1),
extractSizeAlgorithm(size));
// eslint-disable-next-line no-constructor-return
return makeTransferable(this);
}

View File

@ -15,6 +15,7 @@ const {
const {
isReadableStream,
isReadableByteStreamController,
} = require('internal/webstreams/readablestream');
const {
@ -25,6 +26,10 @@ const {
isTransformStream,
} = require('internal/webstreams/transformstream');
const {
kState,
} = require('internal/webstreams/util');
const {
makeTransferable,
kClone,
@ -107,6 +112,56 @@ const theData = 'hello';
assert(readable.locked);
}
{
const { port1, port2 } = new MessageChannel();
port1.onmessageerror = common.mustNotCall();
port2.onmessageerror = common.mustNotCall();
// This test repeats the test above, but with a readable byte stream.
// Note transferring a readable byte stream results in a regular
// value-oriented stream on the other side:
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
const theByteData = new Uint8Array([1, 2, 3]);
const readable = new ReadableStream({
type: 'bytes',
start: common.mustCall((controller) => {
// `enqueue` will detach its argument's buffer, so clone first
controller.enqueue(theByteData.slice());
controller.close();
}),
});
assert(isReadableByteStreamController(readable[kState].controller));
port2.onmessage = common.mustCall(({ data }) => {
assert(isReadableStream(data));
assert(!isReadableByteStreamController(data[kState].controller));
const reader = data.getReader();
reader.read().then(common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, { done: false, value: theByteData });
}));
port2.close();
});
port1.onmessage = common.mustCall(({ data }) => {
assert(isReadableStream(data));
assert(!isReadableByteStreamController(data[kState].controller));
assert(!data.locked);
port1.postMessage(data, [data]);
assert(data.locked);
});
assert.throws(() => port2.postMessage(readable), {
code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
});
port2.postMessage(readable, [readable]);
assert(readable.locked);
}
{
const { port1, port2 } = new MessageChannel();
port1.onmessageerror = common.mustNotCall();