timers: introduce setInterval async iterator

Added setInterval async generator to timers\promises.
Utilises async generators to provide an iterator compatible with
`for await`.

Co-Authored-By: Fabian Cook <hello@fabiancook.dev>

fix message

PR-URL: https://github.com/nodejs/node/pull/37153
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
pull/37156/head
linkgoron 2021-01-31 01:16:18 +02:00 committed by Benjamin Gruenbaum
parent 8b14046f77
commit fefc639016
3 changed files with 331 additions and 17 deletions

View File

@ -363,6 +363,38 @@ added: v15.0.0
* `signal` {AbortSignal} An optional `AbortSignal` that can be used to
cancel the scheduled `Immediate`.
### `timersPromises.setInterval([delay[, value[, options]]])`
<!-- YAML
added: REPLACEME
-->
Returns an async iterator that generates values in an interval of `delay` ms.
* `delay` {number} The number of milliseconds to wait between iterations.
**Default**: `1`.
* `value` {any} A value with which the iterator returns.
* `options` {Object}
* `ref` {boolean} Set to `false` to indicate that the scheduled `Timeout`
between iterations should not require the Node.js event loop to
remain active.
**Default**: `true`.
* `signal` {AbortSignal} An optional `AbortSignal` that can be used to
cancel the scheduled `Timeout` between operations.
```js
(async function() {
const { setInterval } = require('timers/promises');
const interval = 100;
for await (const startTime of setInterval(interval, Date.now())) {
const now = Date.now();
console.log(now);
if ((now - startTime) > 1000)
break;
}
console.log(Date.now());
})();
```
[Event Loop]: https://nodejs.org/en/docs/guides/event-loop-timers-and-nexttick/#setimmediate-vs-settimeout
[`AbortController`]: globals.md#globals_class_abortcontroller
[`TypeError`]: errors.md#errors_class_typeerror

View File

@ -18,7 +18,11 @@ const {
codes: { ERR_INVALID_ARG_TYPE }
} = require('internal/errors');
const { validateAbortSignal } = require('internal/validators');
const {
validateAbortSignal,
validateBoolean,
validateObject,
} = require('internal/validators');
function cancelListenerHandler(clear, reject) {
if (!this._destroyed) {
@ -111,7 +115,59 @@ function setImmediate(value, options = {}) {
() => signal.removeEventListener('abort', oncancel)) : ret;
}
async function* setInterval(after, value, options = {}) {
validateObject(options, 'options');
const { signal, ref = true } = options;
validateAbortSignal(signal, 'options.signal');
validateBoolean(ref, 'options.ref');
if (signal?.aborted)
throw new AbortError();
let onCancel;
let interval;
try {
let notYielded = 0;
let callback;
interval = new Timeout(() => {
notYielded++;
if (callback) {
callback();
callback = undefined;
}
}, after, undefined, true, true);
if (!ref) interval.unref();
insert(interval, interval._idleTimeout);
if (signal) {
onCancel = () => {
// eslint-disable-next-line no-undef
clearInterval(interval);
if (callback) {
callback(PromiseReject(new AbortError()));
callback = undefined;
}
};
signal.addEventListener('abort', onCancel, { once: true });
}
while (!signal?.aborted) {
if (notYielded === 0) {
await new Promise((resolve) => callback = resolve);
}
for (; notYielded > 0; notYielded--) {
yield value;
}
}
throw new AbortError();
} finally {
// eslint-disable-next-line no-undef
clearInterval(interval);
signal?.removeEventListener('abort', onCancel);
}
}
module.exports = {
setTimeout,
setImmediate,
setInterval,
};

View File

@ -19,6 +19,7 @@ const exec = promisify(child_process.exec);
assert.strictEqual(setTimeout, timerPromises.setTimeout);
assert.strictEqual(setImmediate, timerPromises.setImmediate);
const { setInterval } = timerPromises;
process.on('multipleResolves', common.mustNotCall());
@ -50,10 +51,51 @@ process.on('multipleResolves', common.mustNotCall());
}));
}
{
const iterable = setInterval(1, undefined);
const iterator = iterable[Symbol.asyncIterator]();
const promise = iterator.next();
promise.then(common.mustCall((result) => {
assert.ok(!result.done, 'iterator was wrongly marked as done');
assert.strictEqual(result.value, undefined);
return iterator.return();
})).then(common.mustCall());
}
{
const iterable = setInterval(1, 'foobar');
const iterator = iterable[Symbol.asyncIterator]();
const promise = iterator.next();
promise.then(common.mustCall((result) => {
assert.ok(!result.done, 'iterator was wronly marked as done');
assert.strictEqual(result.value, 'foobar');
return iterator.return();
})).then(common.mustCall());
}
{
const iterable = setInterval(1, 'foobar');
const iterator = iterable[Symbol.asyncIterator]();
const promise = iterator.next();
promise
.then(common.mustCall((result) => {
assert.ok(!result.done, 'iterator was wronly marked as done');
assert.strictEqual(result.value, 'foobar');
return iterator.next();
}))
.then(common.mustCall((result) => {
assert.ok(!result.done, 'iterator was wrongly marked as done');
assert.strictEqual(result.value, 'foobar');
return iterator.return();
}))
.then(common.mustCall());
}
{
const ac = new AbortController();
const signal = ac.signal;
assert.rejects(setTimeout(10, undefined, { signal }), /AbortError/);
assert.rejects(setTimeout(10, undefined, { signal }), /AbortError/)
.then(common.mustCall());
ac.abort();
}
@ -61,13 +103,15 @@ process.on('multipleResolves', common.mustNotCall());
const ac = new AbortController();
const signal = ac.signal;
ac.abort(); // Abort in advance
assert.rejects(setTimeout(10, undefined, { signal }), /AbortError/);
assert.rejects(setTimeout(10, undefined, { signal }), /AbortError/)
.then(common.mustCall());
}
{
const ac = new AbortController();
const signal = ac.signal;
assert.rejects(setImmediate(10, { signal }), /AbortError/);
assert.rejects(setImmediate(10, { signal }), /AbortError/)
.then(common.mustCall());
ac.abort();
}
@ -75,23 +119,85 @@ process.on('multipleResolves', common.mustNotCall());
const ac = new AbortController();
const signal = ac.signal;
ac.abort(); // Abort in advance
assert.rejects(setImmediate(10, { signal }), /AbortError/);
assert.rejects(setImmediate(10, { signal }), /AbortError/)
.then(common.mustCall());
}
{
const ac = new AbortController();
const { signal } = ac;
ac.abort(); // Abort in advance
const iterable = setInterval(1, undefined, { signal });
const iterator = iterable[Symbol.asyncIterator]();
assert.rejects(iterator.next(), /AbortError/).then(common.mustCall());
}
{
const ac = new AbortController();
const { signal } = ac;
const iterable = setInterval(100, undefined, { signal });
const iterator = iterable[Symbol.asyncIterator]();
// This promise should take 100 seconds to resolve, so now aborting it should
// mean we abort early
const promise = iterator.next();
ac.abort(); // Abort in after we have a next promise
assert.rejects(promise, /AbortError/).then(common.mustCall());
}
{
// Check aborting after getting a value.
const ac = new AbortController();
const { signal } = ac;
const iterable = setInterval(100, undefined, { signal });
const iterator = iterable[Symbol.asyncIterator]();
const promise = iterator.next();
const abortPromise = promise.then(common.mustCall(() => ac.abort()))
.then(() => iterator.next());
assert.rejects(abortPromise, /AbortError/).then(common.mustCall());
}
{
// Check that aborting after resolve will not reject.
const ac = new AbortController();
const signal = ac.signal;
setTimeout(10, undefined, { signal }).then(() => {
assert.doesNotReject(setTimeout(10, undefined, { signal })
.then(common.mustCall(() => {
ac.abort();
}))).then(common.mustCall());
}
{
// Check that aborting after resolve will not reject.
const ac = new AbortController();
const signal = ac.signal;
assert.doesNotReject(setImmediate(10, { signal }).then(common.mustCall(() => {
ac.abort();
}))).then(common.mustCall());
}
{
[1, '', Infinity, null, {}].forEach((ref) => {
const iterable = setInterval(10, undefined, { ref });
assert.rejects(() => iterable[Symbol.asyncIterator]().next(), /ERR_INVALID_ARG_TYPE/)
.then(common.mustCall());
});
}
{
// Check that aborting after resolve will not reject.
const ac = new AbortController();
const signal = ac.signal;
setImmediate(10, { signal }).then(() => {
ac.abort();
[1, '', Infinity, null, {}].forEach((signal) => {
const iterable = setInterval(10, undefined, { signal });
assert.rejects(() => iterable[Symbol.asyncIterator]().next(), /ERR_INVALID_ARG_TYPE/)
.then(common.mustCall());
});
[1, '', Infinity, null, true, false].forEach((options) => {
const iterable = setInterval(10, undefined, options);
assert.rejects(() => iterable[Symbol.asyncIterator]().next(), /ERR_INVALID_ARG_TYPE/)
.then(common.mustCall());
});
}
@ -113,6 +219,43 @@ process.on('multipleResolves', common.mustNotCall());
}));
}
{
// Check that timer adding signals does not leak handlers
const signal = new NodeEventTarget();
signal.aborted = false;
const iterator = setInterval(1, undefined, { signal });
iterator.next().then(common.mustCall(() => {
assert.strictEqual(signal.listenerCount('abort'), 1);
iterator.return();
})).finally(common.mustCall(() => {
assert.strictEqual(signal.listenerCount('abort'), 0);
}));
}
{
// Check that break removes the signal listener
const signal = new NodeEventTarget();
signal.aborted = false;
async function tryBreak() {
const iterator = setInterval(10, undefined, { signal });
let i = 0;
// eslint-disable-next-line no-unused-vars
for await (const _ of iterator) {
if (i === 0) {
assert.strictEqual(signal.listenerCount('abort'), 1);
}
i++;
if (i === 2) {
break;
}
}
assert.strictEqual(i, 2);
assert.strictEqual(signal.listenerCount('abort'), 0);
}
tryBreak().then(common.mustCall());
}
{
Promise.all(
[1, '', false, Infinity].map((i) => assert.rejects(setImmediate(10, i)), {
@ -152,16 +295,99 @@ process.on('multipleResolves', common.mustNotCall());
{
exec(`${process.execPath} -pe "const assert = require('assert');` +
'require(\'timers/promises\').setTimeout(1000, null, { ref: false }).' +
'then(assert.fail)"').then(common.mustCall(({ stderr }) => {
'require(\'timers/promises\').setTimeout(1000, null, { ref: false }).' +
'then(assert.fail)"').then(common.mustCall(({ stderr }) => {
assert.strictEqual(stderr, '');
}));
}
{
exec(`${process.execPath} -pe "const assert = require('assert');` +
'require(\'timers/promises\').setImmediate(null, { ref: false }).' +
'then(assert.fail)"').then(common.mustCall(({ stderr }) => {
'require(\'timers/promises\').setImmediate(null, { ref: false }).' +
'then(assert.fail)"').then(common.mustCall(({ stderr }) => {
assert.strictEqual(stderr, '');
}));
}
{
exec(`${process.execPath} -pe "const assert = require('assert');` +
'const interval = require(\'timers/promises\')' +
'.setInterval(1000, null, { ref: false });' +
'interval[Symbol.asyncIterator]().next()' +
'.then(assert.fail)"').then(common.mustCall(({ stderr }) => {
assert.strictEqual(stderr, '');
}));
}
{
async function runInterval(fn, intervalTime, signal) {
const input = 'foobar';
const interval = setInterval(intervalTime, input, { signal });
let iteration = 0;
for await (const value of interval) {
assert.strictEqual(value, input);
iteration++;
await fn(iteration);
}
}
{
// Check that we call the correct amount of times.
const controller = new AbortController();
const { signal } = controller;
let loopCount = 0;
const delay = 20;
const timeoutLoop = runInterval(() => {
loopCount++;
if (loopCount === 5) controller.abort();
if (loopCount > 5) throw new Error('ran too many times');
}, delay, signal);
assert.rejects(timeoutLoop, /AbortError/).then(common.mustCall(() => {
assert.strictEqual(loopCount, 5);
}));
}
{
// Check that if we abort when we have some callbacks left,
// we actually call them.
const controller = new AbortController();
const { signal } = controller;
const delay = 10;
let totalIterations = 0;
const timeoutLoop = runInterval(async (iterationNumber) => {
if (iterationNumber === 2) {
await setTimeout(delay * 2);
controller.abort();
}
if (iterationNumber > totalIterations) {
totalIterations = iterationNumber;
}
}, delay, signal);
timeoutLoop.catch(common.mustCall(() => {
assert.ok(totalIterations >= 3, `iterations was ${totalIterations} < 3`);
}));
}
}
{
// Check that the timing is correct
let pre = false;
let post = false;
setTimeout(1).then(() => pre = true);
const iterable = setInterval(2);
const iterator = iterable[Symbol.asyncIterator]();
iterator.next().then(common.mustCall(() => {
assert.ok(pre, 'interval ran too early');
assert.ok(!post, 'interval ran too late');
return iterator.next();
})).then(common.mustCall(() => {
assert.ok(post, 'second interval ran too early');
return iterator.return();
}));
setTimeout(3).then(() => post = true);
}