AspNetCore.Docs/aspnetcore/signalr/streaming.md

219 lines
11 KiB
Markdown
Raw Normal View History

---
title: Use streaming in ASP.NET Core SignalR
2019-01-24 04:36:13 +08:00
author: bradygaster
description: Learn how to stream data between the client and the server.
monikerRange: '>= aspnetcore-3.1'
2019-01-24 04:36:13 +08:00
ms.author: bradyg
ms.custom: mvc, devx-track-js
ms.date: 10/29/2020
uid: signalr/streaming
---
# Use streaming in ASP.NET Core SignalR
By [Brennan Conroy](https://github.com/BrennanConroy)
:::moniker range=">= aspnetcore-3.1"
ASP.NET Core SignalR supports streaming from client to server and from server to client. This is useful for scenarios where fragments of data arrive over time. When streaming, each fragment is sent to the client or server as soon as it becomes available, rather than waiting for all of the data to become available.
[View or download sample code](https://github.com/dotnet/AspNetCore.Docs/tree/live/aspnetcore/signalr/streaming/samples/) ([how to download](xref:index#how-to-download-a-sample))
## Set up a hub for streaming
2019-06-06 23:37:19 +08:00
A hub method automatically becomes a streaming hub method when it returns <xref:System.Collections.Generic.IAsyncEnumerable`1>, <xref:System.Threading.Channels.ChannelReader%601>, `Task<IAsyncEnumerable<T>>`, or `Task<ChannelReader<T>>`.
### Server-to-client streaming
Streaming hub methods can return `IAsyncEnumerable<T>` in addition to `ChannelReader<T>`. The simplest way to return `IAsyncEnumerable<T>` is by making the hub method an async iterator method as the following sample demonstrates. Hub async iterator methods can accept a `CancellationToken` parameter that's triggered when the client unsubscribes from the stream. Async iterator methods avoid problems common with Channels, such as not returning the `ChannelReader` early enough or exiting the method without completing the <xref:System.Threading.Channels.ChannelWriter`1>.
[!INCLUDE[](~/includes/csharp-8-required.md)]
[!code-csharp[Streaming hub async iterator method](streaming/samples/3.0/Hubs/AsyncEnumerableHub.cs?name=snippet_AsyncIterator)]
The following sample shows the basics of streaming data to the client using Channels. Whenever an object is written to the <xref:System.Threading.Channels.ChannelWriter%601>, the object is immediately sent to the client. At the end, the `ChannelWriter` is completed to tell the client the stream is closed.
> [!NOTE]
> Write to the `ChannelWriter<T>` on a background thread and return the `ChannelReader` as soon as possible. Other hub invocations are blocked until a `ChannelReader` is returned.
>
> Wrap logic in a [`try ... catch` statement](/dotnet/csharp/language-reference/keywords/try-catch). Complete the `Channel` in a [`finally` block](/dotnet/csharp/language-reference/keywords/try-catch-finally). If you want to flow an error, capture it inside the `catch` block and write it in the `finally` block.
[!code-csharp[Streaming hub method](streaming/samples/3.0/Hubs/StreamHub.cs?name=snippet1)]
Server-to-client streaming hub methods can accept a `CancellationToken` parameter that's triggered when the client unsubscribes from the stream. Use this token to stop the server operation and release any resources if the client disconnects before the end of the stream.
### Client-to-server streaming
2019-06-06 23:37:19 +08:00
A hub method automatically becomes a client-to-server streaming hub method when it accepts one or more objects of type <xref:System.Threading.Channels.ChannelReader%601> or <xref:System.Collections.Generic.IAsyncEnumerable%601>. The following sample shows the basics of reading streaming data sent from the client. Whenever the client writes to the <xref:System.Threading.Channels.ChannelWriter%601>, the data is written into the `ChannelReader` on the server from which the hub method is reading.
[!code-csharp[Streaming upload hub method](streaming/samples/3.0/Hubs/StreamHub.cs?name=snippet2)]
2019-06-06 23:37:19 +08:00
An <xref:System.Collections.Generic.IAsyncEnumerable%601> version of the method follows.
[!INCLUDE[](~/includes/csharp-8-required.md)]
```csharp
What's new in 3.0 (#14250) * What's new in 3.0 * wrok * wrok * work * work * work * Add Razor components bit * Nits * Fix a link * work * work * work * Edit pass on what's new in 3.0 doc (#14355) * Add additional coverage * added core 3.0 signalr (#14369) * added core 3.0 signalr * PR of Bradys PR (#14371) * Update aspnetcore-3.0.md * copy halter suggestion over * PR feedback * edits * minor tweaks * work * react to anurse * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * react to feedback * Update aspnetcore-3.0.md (#14403) * Update aspnetcore-3.0.md * Update aspnetcore-3.0.md * nothing new * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Daniel Roth <daroth@microsoft.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Daniel Roth <daroth@microsoft.com> * react to feedback * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Daniel Roth <daroth@microsoft.com> * react to feedback * Add option for RCL and note default is for RC dev * Blazor!!!!!!!! WOOT! * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Stephen Halter <halter73@gmail.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Stephen Halter <halter73@gmail.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * Update aspnetcore/release-notes/aspnetcore-3.0.md Co-Authored-By: Andrew Stanton-Nurse <andrew@stanton-nurse.com> * react to feedback * react to feedback * react to feedback * Add ANCM updates (#14425) * Add ANCM updates * Update aspnetcore-3.0.md * react to feedback * work * work * Apply suggestions from code review Co-Authored-By: Daniel Roth <daroth@microsoft.com> * react to dan roth suggestions * Updates + add a blurb on enhanced endpoint routing
2019-09-23 11:51:14 +08:00
public async Task UploadStream(IAsyncEnumerable<string> stream)
2019-06-06 23:37:19 +08:00
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
```
## .NET client
### Server-to-client streaming
2019-06-06 23:37:19 +08:00
The `StreamAsync` and `StreamAsChannelAsync` methods on `HubConnection` are used to invoke server-to-client streaming methods. Pass the hub method name and arguments defined in the hub method to `StreamAsync` or `StreamAsChannelAsync`. The generic parameter on `StreamAsync<T>` and `StreamAsChannelAsync<T>` specifies the type of objects returned by the streaming method. An object of type `IAsyncEnumerable<T>` or `ChannelReader<T>` is returned from the stream invocation and represents the stream on the client.
A `StreamAsync` example that returns `IAsyncEnumerable<int>`:
```csharp
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
2019-06-06 23:37:19 +08:00
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
```
A corresponding `StreamAsChannelAsync` example that returns `ChannelReader<int>`:
```csharp
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
```
In the previous code:
2019-06-06 23:37:19 +08:00
* The `StreamAsChannelAsync` method on `HubConnection` is used to invoke a server-to-client streaming method. Pass the hub method name and arguments defined in the hub method to `StreamAsChannelAsync`.
* The generic parameter on `StreamAsChannelAsync<T>` specifies the type of objects returned by the streaming method.
* A `ChannelReader<T>` is returned from the stream invocation and represents the stream on the client.
### Client-to-server streaming
2019-06-06 23:37:19 +08:00
There are two ways to invoke a client-to-server streaming hub method from the .NET client. You can either pass in an `IAsyncEnumerable<T>` or a `ChannelReader` as an argument to `SendAsync`, `InvokeAsync`, or `StreamAsChannelAsync`, depending on the hub method invoked.
Whenever data is written to the `IAsyncEnumerable` or `ChannelWriter` object, the hub method on the server receives a new item with the data from the client.
If using an `IAsyncEnumerable` object, the stream ends after the method returning stream items exits.
[!INCLUDE[](~/includes/csharp-8-required.md)]
```csharp
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
2019-06-06 23:37:19 +08:00
await connection.SendAsync("UploadStream", clientStreamData());
```
2019-06-06 23:37:19 +08:00
Or if you're using a `ChannelWriter`, you complete the channel with `channel.Writer.Complete()`:
```csharp
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
```
## JavaScript client
### Server-to-client streaming
JavaScript clients call server-to-client streaming methods on hubs with `connection.stream`. The `stream` method accepts two arguments:
* The name of the hub method. In the following example, the hub method name is `Counter`.
* Arguments defined in the hub method. In the following example, the arguments are a count for the number of stream items to receive and the delay between stream items.
`connection.stream` returns an `IStreamResult`, which contains a `subscribe` method. Pass an `IStreamSubscriber` to `subscribe` and set the `next`, `error`, and `complete` callbacks to receive notifications from the `stream` invocation.
[!code-javascript[Streaming javascript](streaming/samples/2.2/wwwroot/js/stream.js?range=19-36)]
To end the stream from the client, call the `dispose` method on the `ISubscription` that's returned from the `subscribe` method. Calling this method causes cancellation of the `CancellationToken` parameter of the Hub method, if you provided one.
### Client-to-server streaming
JavaScript clients call client-to-server streaming methods on hubs by passing in a `Subject` as an argument to `send`, `invoke`, or `stream`, depending on the hub method invoked. The `Subject` is a class that looks like a `Subject`. For example in RxJS, you can use the [Subject](https://rxjs-dev.firebaseapp.com/api/index/class/Subject) class from that library.
[!code-javascript[Upload javascript](streaming/samples/3.0/wwwroot/js/stream.js?range=41-51)]
Calling `subject.next(item)` with an item writes the item to the stream, and the hub method receives the item on the server.
To end the stream, call `subject.complete()`.
## Java client
### Server-to-client streaming
The SignalR Java client uses the `stream` method to invoke streaming methods. `stream` accepts three or more arguments:
* The expected type of the stream items.
* The name of the hub method.
* Arguments defined in the hub method.
```java
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
```
The `stream` method on `HubConnection` returns an Observable of the stream item type. The Observable type's `subscribe` method is where `onNext`, `onError` and `onCompleted` handlers are defined.
### Client-to-server streaming
The SignalR Java client can call client-to-server streaming methods on hubs by passing in an [Observable](https://rxjs-dev.firebaseapp.com/api/index/class/Observable) as an argument to `send`, `invoke`, or `stream`, depending on the hub method invoked.
```java
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
```
Calling `stream.onNext(item)` with an item writes the item to the stream, and the hub method receives the item on the server.
To end the stream, call `stream.onComplete()`.
:::moniker-end
## Additional resources
* [Hubs](xref:signalr/hubs)
* [.NET client](xref:signalr/dotnet-client)
* [JavaScript client](xref:signalr/javascript-client)
* [Publish to Azure](xref:signalr/publish-to-azure-web-app)