2020-06-28 16:29:01 +08:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
const {
|
2021-02-13 23:15:23 +08:00
|
|
|
ArrayPrototypePop,
|
2020-06-28 16:29:01 +08:00
|
|
|
Promise,
|
|
|
|
} = primordials;
|
|
|
|
|
2021-02-13 23:15:23 +08:00
|
|
|
const {
|
|
|
|
addAbortSignalNoValidate,
|
|
|
|
} = require('internal/streams/add-abort-signal');
|
|
|
|
|
|
|
|
const {
|
|
|
|
validateAbortSignal,
|
|
|
|
} = require('internal/validators');
|
|
|
|
|
2021-02-25 03:30:50 +08:00
|
|
|
const {
|
|
|
|
isIterable,
|
|
|
|
isStream,
|
|
|
|
} = require('internal/streams/utils');
|
|
|
|
|
2021-05-20 02:32:18 +08:00
|
|
|
const pl = require('internal/streams/pipeline');
|
|
|
|
const eos = require('internal/streams/end-of-stream');
|
2020-06-28 16:29:01 +08:00
|
|
|
|
|
|
|
function pipeline(...streams) {
|
|
|
|
return new Promise((resolve, reject) => {
|
2021-02-13 23:15:23 +08:00
|
|
|
let signal;
|
|
|
|
const lastArg = streams[streams.length - 1];
|
|
|
|
if (lastArg && typeof lastArg === 'object' &&
|
|
|
|
!isStream(lastArg) && !isIterable(lastArg)) {
|
|
|
|
const options = ArrayPrototypePop(streams);
|
|
|
|
signal = options.signal;
|
|
|
|
validateAbortSignal(signal, 'options.signal');
|
|
|
|
}
|
|
|
|
|
|
|
|
const pipe = pl(...streams, (err, value) => {
|
2020-06-28 16:29:01 +08:00
|
|
|
if (err) {
|
|
|
|
reject(err);
|
|
|
|
} else {
|
|
|
|
resolve(value);
|
|
|
|
}
|
|
|
|
});
|
2021-02-13 23:15:23 +08:00
|
|
|
if (signal) {
|
|
|
|
addAbortSignalNoValidate(signal, pipe);
|
|
|
|
}
|
2020-06-28 16:29:01 +08:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function finished(stream, opts) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
eos(stream, opts, (err) => {
|
|
|
|
if (err) {
|
|
|
|
reject(err);
|
|
|
|
} else {
|
|
|
|
resolve();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
finished,
|
|
|
|
pipeline,
|
|
|
|
};
|