refactored to use Threading.Channel instead of ConcurrentQueue (#21582)

* refactored to use Threading.Channel instead of ConcurrentQueue

* addressed review comments

* minor fix

* addressing PR comments
pull/22012/head
David Guida 2021-04-08 18:52:18 -04:00 committed by GitHub
parent 8ee4827844
commit e43694e1cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 57 deletions

View File

@ -15,7 +15,11 @@ namespace BackgroundTasksSample
#region snippet3
services.AddSingleton<MonitorLoop>();
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddSingleton<IBackgroundTaskQueue>(ctx => {
if (!int.TryParse(hostContext.Configuration["QueueCapacity"], out var queueCapacity))
queueCapacity = 100;
return new BackgroundTaskQueue(queueCapacity);
});
#endregion
#region snippet1

View File

@ -1,6 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace BackgroundTasksSample.Services
@ -8,35 +8,45 @@ namespace BackgroundTasksSample.Services
#region snippet1
public interface IBackgroundTaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public void QueueBackgroundWorkItem(
Func<CancellationToken, Task> workItem)
public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
await _queue.Writer.WriteAsync(workItem);
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}

View File

@ -24,13 +24,13 @@ namespace BackgroundTasksSample.Services
public void StartMonitorLoop()
{
_logger.LogInformation("Monitor Loop is starting.");
_logger.LogInformation("MonitorAsync Loop is starting.");
// Run a console user input loop in a background thread
Task.Run(() => Monitor());
Task.Run(async () => await MonitorAsync());
}
public void Monitor()
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
@ -39,49 +39,46 @@ namespace BackgroundTasksSample.Services
if (keyStroke.Key == ConsoleKey.W)
{
// Enqueue a background work item
_taskQueue.QueueBackgroundWorkItem(async token =>
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid().ToString();
_logger.LogInformation(
"Queued Background Task {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
delayLoop++;
_logger.LogInformation(
"Queued Background Task {Guid} is running. " +
"{DelayLoop}/3", guid, delayLoop);
}
if (delayLoop == 3)
{
_logger.LogInformation(
"Queued Background Task {Guid} is complete.", guid);
}
else
{
_logger.LogInformation(
"Queued Background Task {Guid} was cancelled.", guid);
}
});
await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItem);
}
}
}
private async ValueTask BuildWorkItem(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid().ToString();
_logger.LogInformation("Queued Background Task {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
delayLoop++;
_logger.LogInformation("Queued Background Task {Guid} is running. " + "{DelayLoop}/3", guid, delayLoop);
}
if (delayLoop == 3)
{
_logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
}
else
{
_logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid);
}
}
}
#endregion
}

View File

@ -1,4 +1,5 @@
{
"QueueCapacity": 10,
"Logging": {
"LogLevel": {
"Default": "Information",