Page updated Feb 19, 2024

Subscribe to real-time events

Subscribe to mutations for creating real-time clients.

Setup subscription with callbacks

When creating subscriptions, a Stream object will be returned to you. This Stream will continue producing events until either the subscription encounters an error or you cancel the subscription. In the case of need for limiting the amount of data that is omitted, you can take advantage of the Stream's helper functions such as take. The cancellation occurs when the defined amount of event has occurred:

1Stream<GraphQLResponse<Todo>> subscribe() {
2 final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType);
3 final Stream<GraphQLResponse<Todo>> operation = Amplify.API
4 .subscribe(
5 subscriptionRequest,
6 onEstablished: () => safePrint('Subscription established'),
7 )
8 // Listens to only 5 elements
9 .take(5)
10 .handleError(
11 (Object error) {
12 safePrint('Error in subscription stream: $error');
13 },
14 );
15 return operation;
16}

Alternatively, you can call Stream.listen to create a StreamSubscription object which can be programmatically canceled.

1// Be sure to import this
2import 'dart:async';
3
4...
5
6StreamSubscription<GraphQLResponse<Todo>>? subscription;
7
8void subscribe() {
9 final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType);
10 final Stream<GraphQLResponse<Todo>> operation = Amplify.API.subscribe(
11 subscriptionRequest,
12 onEstablished: () => safePrint('Subscription established'),
13 );
14 subscription = operation.listen(
15 (event) {
16 safePrint('Subscription event data received: ${event.data}');
17 },
18 onError: (Object e) => safePrint('Error in subscription stream: $e'),
19 );
20}
21
22void unsubscribe() {
23 subscription?.cancel();
24 subscription = null;
25}

In addition to an onCreate subscription, you can also call .onUpdate() or .onDelete().

1final onUpdateSubscriptionRequest = ModelSubscriptions.onUpdate(Todo.classType);
2// or
3final onDeleteSubscriptionRequest = ModelSubscriptions.onDelete(Todo.classType);

Subscription connection status

Now that you set up the application and are using subscriptions, you may want to know when the subscription is closed, or reflect to your users when the subscription isn’t healthy. You can monitor the subscription status for changes via Amplify.Hub

1Amplify.Hub.listen(
2 HubChannel.Api,
3 (ApiHubEvent event) {
4 if (event is SubscriptionHubEvent) {
5 safePrint(event.status);
6 }
7 },
8);

SubscriptionStatus

  • connected - Connected and working with no issues
  • connecting - Attempting to connect (both initial connection and reconnection)
  • pendingDisconnect - Connection has no active subscriptions and is shutting down
  • disconnected - Connection has no active subscriptions and is disconnected
  • failed - Connection had a failure and has been disconnected

Automated Reconnection

Under the hood, we will attempt to maintain a healthy web socket connection through network changes. For example, if a device’s connection changes from Wi-Fi to 5g network, the plugin will attempt to reconnect using the new network.

Likewise, when disconnected from the internet unexpectedly, the subscription will attempt to reconnect using an exponential retry/back off strategy. By default, we will make 8 recovery attempts over about 50 seconds. If we cannot make a successful connection, then the web socket will be closed. You can customize this strategy when configuring the API plugin through RetryOptions.

1Future<void> _configureAmplify() async {
2 final apiPlugin = AmplifyAPI(
3 modelProvider: ModelProvider.instance,
4 // Optional config
5 subscriptionOptions: const GraphQLSubscriptionOptions(
6 retryOptions: RetryOptions(maxAttempts: 10),
7 ),
8 );
9 await Amplify.addPlugin(apiPlugin);
10
11 try {
12 await Amplify.configure(amplifyconfig);
13 } on AmplifyAlreadyConfiguredException {
14 safePrint(
15 "Tried to reconfigure Amplify; this can occur when your app restarts on Android.");
16 }
17}

Important: While offline, your application will miss messages and will not automatically catch up when reconnection happens. Depending on your use case, you may want to take action to catch up when your app comes back online. The following example solves this problem by retrieving all data on reconnection.

1import 'package:amplify_flutter/amplify_flutter.dart';
2import 'package:amplify_api/amplify_api.dart';
3import './models/ModelProvider.dart'; // <--- Update import to reflect your project
4import 'dart:async';
5
6// ...
7
8List<Todo?> allTodos = [];
9SubscriptionStatus prevSubscriptionStatus = SubscriptionStatus.disconnected;
10StreamSubscription<GraphQLResponse<Todo>>? subscription;
11
12/// ...
13
14// Init listeners
15Amplify.Hub.listen(
16 HubChannel.Api,
17 (ApiHubEvent event) {
18 if (event is SubscriptionHubEvent) {
19 if (prevSubscriptionStatus == SubscriptionStatus.connecting &&
20 event.status == SubscriptionStatus.connected) {
21 getTodos(); // refetch todos
22 }
23 prevSubscriptionStatus = event.status;
24 }
25 },
26);
27
28subscribe();
29
30/// ...
31
32Future<void> getTodos() async {
33 try {
34 final request = ModelQueries.list(Todo.classType);
35 final response = await Amplify.API.query(request: request).response;
36
37 final todos = response.data?.items ?? [];
38 if (response.errors.isNotEmpty) {
39 safePrint('errors: ${response.errors}');
40 }
41
42 setState(() {
43 allTodos = todos;
44 });
45 } on ApiException catch (e) {
46 safePrint('Query failed: $e');
47 return;
48 }
49}
50
51void subscribe() {
52 final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType);
53 final Stream<GraphQLResponse<Todo>> operation = Amplify.API.subscribe(
54 subscriptionRequest,
55 onEstablished: () => safePrint('Subscription established'),
56 );
57 subscription = operation.listen(
58 (event) {
59 setState(() {
60 allTodos.add(event.data);
61 });
62 },
63 onError: (Object e) => safePrint('Error in subscription stream: $e'),
64 );
65}