From e43694e1cdd2c29decd0d45cca5ff78d6b0e2506 Mon Sep 17 00:00:00 2001 From: David Guida Date: Thu, 8 Apr 2021 18:52:18 -0400 Subject: [PATCH] refactored to use Threading.Channel instead of ConcurrentQueue (#21582) * refactored to use Threading.Channel instead of ConcurrentQueue * addressed review comments * minor fix * addressing PR comments --- .../3.x/BackgroundTasksSample/Program.cs | 6 +- .../Services/BackgroundTaskQueue.cs | 36 +++++--- .../Services/MonitorLoop.cs | 83 +++++++++---------- .../BackgroundTasksSample/appsettings.json | 1 + 4 files changed, 69 insertions(+), 57 deletions(-) diff --git a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Program.cs b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Program.cs index 1f9103496d..caaebc2c5d 100644 --- a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Program.cs +++ b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Program.cs @@ -15,7 +15,11 @@ namespace BackgroundTasksSample #region snippet3 services.AddSingleton(); services.AddHostedService(); - services.AddSingleton(); + services.AddSingleton(ctx => { + if (!int.TryParse(hostContext.Configuration["QueueCapacity"], out var queueCapacity)) + queueCapacity = 100; + return new BackgroundTaskQueue(queueCapacity); + }); #endregion #region snippet1 diff --git a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/BackgroundTaskQueue.cs b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/BackgroundTaskQueue.cs index 7d0992414f..d51f7dca12 100644 --- a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/BackgroundTaskQueue.cs +++ b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/BackgroundTaskQueue.cs @@ -1,6 +1,6 @@ using System; -using System.Collections.Concurrent; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; namespace BackgroundTasksSample.Services @@ -8,35 +8,45 @@ namespace BackgroundTasksSample.Services #region snippet1 public interface IBackgroundTaskQueue { - void QueueBackgroundWorkItem(Func workItem); + ValueTask QueueBackgroundWorkItemAsync(Func workItem); - Task> DequeueAsync( + ValueTask> DequeueAsync( CancellationToken cancellationToken); } public class BackgroundTaskQueue : IBackgroundTaskQueue { - private ConcurrentQueue> _workItems = - new ConcurrentQueue>(); - private SemaphoreSlim _signal = new SemaphoreSlim(0); + private readonly Channel> _queue; - public void QueueBackgroundWorkItem( - Func workItem) + public BackgroundTaskQueue(int capacity) + { + // Capacity should be set based on the expected application load and + // number of concurrent threads accessing the queue. + // BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task, + // which completes only when space became available. This leads to backpressure, + // in case too many publishers/calls start accumulating. + var options = new BoundedChannelOptions(capacity) + { + FullMode = BoundedChannelFullMode.Wait + }; + _queue = Channel.CreateBounded>(options); + } + + public async ValueTask QueueBackgroundWorkItemAsync( + Func workItem) { if (workItem == null) { throw new ArgumentNullException(nameof(workItem)); } - _workItems.Enqueue(workItem); - _signal.Release(); + await _queue.Writer.WriteAsync(workItem); } - public async Task> DequeueAsync( + public async ValueTask> DequeueAsync( CancellationToken cancellationToken) { - await _signal.WaitAsync(cancellationToken); - _workItems.TryDequeue(out var workItem); + var workItem = await _queue.Reader.ReadAsync(cancellationToken); return workItem; } diff --git a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/MonitorLoop.cs b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/MonitorLoop.cs index eac5efb5fe..dc2e658d45 100644 --- a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/MonitorLoop.cs +++ b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/Services/MonitorLoop.cs @@ -24,13 +24,13 @@ namespace BackgroundTasksSample.Services public void StartMonitorLoop() { - _logger.LogInformation("Monitor Loop is starting."); + _logger.LogInformation("MonitorAsync Loop is starting."); // Run a console user input loop in a background thread - Task.Run(() => Monitor()); + Task.Run(async () => await MonitorAsync()); } - public void Monitor() + private async ValueTask MonitorAsync() { while (!_cancellationToken.IsCancellationRequested) { @@ -39,49 +39,46 @@ namespace BackgroundTasksSample.Services if (keyStroke.Key == ConsoleKey.W) { // Enqueue a background work item - _taskQueue.QueueBackgroundWorkItem(async token => - { - // Simulate three 5-second tasks to complete - // for each enqueued work item - - int delayLoop = 0; - var guid = Guid.NewGuid().ToString(); - - _logger.LogInformation( - "Queued Background Task {Guid} is starting.", guid); - - while (!token.IsCancellationRequested && delayLoop < 3) - { - try - { - await Task.Delay(TimeSpan.FromSeconds(5), token); - } - catch (OperationCanceledException) - { - // Prevent throwing if the Delay is cancelled - } - - delayLoop++; - - _logger.LogInformation( - "Queued Background Task {Guid} is running. " + - "{DelayLoop}/3", guid, delayLoop); - } - - if (delayLoop == 3) - { - _logger.LogInformation( - "Queued Background Task {Guid} is complete.", guid); - } - else - { - _logger.LogInformation( - "Queued Background Task {Guid} was cancelled.", guid); - } - }); + await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItem); } } } + + private async ValueTask BuildWorkItem(CancellationToken token) + { + // Simulate three 5-second tasks to complete + // for each enqueued work item + + int delayLoop = 0; + var guid = Guid.NewGuid().ToString(); + + _logger.LogInformation("Queued Background Task {Guid} is starting.", guid); + + while (!token.IsCancellationRequested && delayLoop < 3) + { + try + { + await Task.Delay(TimeSpan.FromSeconds(5), token); + } + catch (OperationCanceledException) + { + // Prevent throwing if the Delay is cancelled + } + + delayLoop++; + + _logger.LogInformation("Queued Background Task {Guid} is running. " + "{DelayLoop}/3", guid, delayLoop); + } + + if (delayLoop == 3) + { + _logger.LogInformation("Queued Background Task {Guid} is complete.", guid); + } + else + { + _logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid); + } + } } #endregion } diff --git a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/appsettings.json b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/appsettings.json index 8983e0fc1c..37fb9aecd5 100644 --- a/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/appsettings.json +++ b/aspnetcore/fundamentals/host/hosted-services/samples/3.x/BackgroundTasksSample/appsettings.json @@ -1,4 +1,5 @@ { + "QueueCapacity": 10, "Logging": { "LogLevel": { "Default": "Information",