Subscribe to real-time events
Subscribe to mutations for creating real-time clients.
Setup subscription with callbacks
When creating subscriptions, a Stream
object will be returned to you. This Stream
will continue producing events until either the subscription encounters an error or you cancel the subscription. In the case of need for limiting the amount of data that is omitted, you can take advantage of the Stream's helper functions such as take
. The cancellation occurs when the defined amount of event has occurred:
Stream<GraphQLResponse<Todo>> subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream<GraphQLResponse<Todo>> operation = Amplify.API .subscribe( subscriptionRequest, onEstablished: () => safePrint('Subscription established'), ) // Listens to only 5 elements .take(5) .handleError( (Object error) { safePrint('Error in subscription stream: $error'); }, ); return operation;}
Alternatively, you can call Stream.listen
to create a StreamSubscription
object which can be programmatically canceled.
// Be sure to import thisimport 'dart:async';
...
StreamSubscription<GraphQLResponse<Todo>>? subscription;
void subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream<GraphQLResponse<Todo>> operation = Amplify.API.subscribe( subscriptionRequest, onEstablished: () => safePrint('Subscription established'), ); subscription = operation.listen( (event) { safePrint('Subscription event data received: ${event.data}'); }, onError: (Object e) => safePrint('Error in subscription stream: $e'), );}
void unsubscribe() { subscription?.cancel(); subscription = null;}
In addition to an onCreate
subscription, you can also call .onUpdate()
or .onDelete()
.
final onUpdateSubscriptionRequest = ModelSubscriptions.onUpdate(Todo.classType);// orfinal onDeleteSubscriptionRequest = ModelSubscriptions.onDelete(Todo.classType);
Subscription connection status
Now that you set up the application and are using subscriptions, you may want to know when the subscription is closed, or reflect to your users when the subscription isn’t healthy. You can monitor the subscription status for changes via Amplify.Hub
Amplify.Hub.listen( HubChannel.Api, (ApiHubEvent event) { if (event is SubscriptionHubEvent) { safePrint(event.status); } },);
SubscriptionStatus
connected
- Connected and working with no issuesconnecting
- Attempting to connect (both initial connection and reconnection)pendingDisconnect
- Connection has no active subscriptions and is shutting downdisconnected
- Connection has no active subscriptions and is disconnectedfailed
- Connection had a failure and has been disconnected
Automated Reconnection
Under the hood, we will attempt to maintain a healthy web socket connection through network changes. For example, if a device’s connection changes from Wi-Fi to 5g network, the plugin will attempt to reconnect using the new network.
Likewise, when disconnected from the internet unexpectedly, the subscription will attempt to reconnect using an exponential retry/back off strategy. By default, we will make 8 recovery attempts over about 50 seconds. If we cannot make a successful connection, then the web socket will be closed. You can customize this strategy when configuring the API plugin through RetryOptions
.
Future<void> _configureAmplify() async { final apiPlugin = AmplifyAPI( options: APIPluginOptions( modelProvider: ModelProvider.instance, // Optional config subscriptionOptions: const GraphQLSubscriptionOptions( retryOptions: RetryOptions(maxAttempts: 10), ), ) ); await Amplify.addPlugin(apiPlugin);
try { await Amplify.configure(outputs); } on AmplifyAlreadyConfiguredException { safePrint( "Tried to reconfigure Amplify; this can occur when your app restarts on Android."); }}
import 'package:amplify_flutter/amplify_flutter.dart';import 'package:amplify_api/amplify_api.dart';import './models/ModelProvider.dart'; // <--- Update import to reflect your projectimport 'dart:async';
// ...
List<Todo?> allTodos = [];SubscriptionStatus prevSubscriptionStatus = SubscriptionStatus.disconnected;StreamSubscription<GraphQLResponse<Todo>>? subscription;
/// ...
// Init listenersAmplify.Hub.listen( HubChannel.Api, (ApiHubEvent event) { if (event is SubscriptionHubEvent) { if (prevSubscriptionStatus == SubscriptionStatus.connecting && event.status == SubscriptionStatus.connected) { getTodos(); // refetch todos } prevSubscriptionStatus = event.status; } },);
subscribe();
/// ...
Future<void> getTodos() async { try { final request = ModelQueries.list(Todo.classType); final response = await Amplify.API.query(request: request).response;
final todos = response.data?.items ?? []; if (response.errors.isNotEmpty) { safePrint('errors: ${response.errors}'); }
setState(() { allTodos = todos; }); } on ApiException catch (e) { safePrint('Query failed: $e'); return; }}
void subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream<GraphQLResponse<Todo>> operation = Amplify.API.subscribe( subscriptionRequest, onEstablished: () => safePrint('Subscription established'), ); subscription = operation.listen( (event) { setState(() { allTodos.add(event.data); }); }, onError: (Object e) => safePrint('Error in subscription stream: $e'), );}