Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Page updated Apr 29, 2024

Observe in real time

Observe model mutations in real time

You can subscribe to changes on your Models. This reacts dynamically to updates of data to the underlying Storage Engine, which could be the result of GraphQL Subscriptions as well as Queries or Mutations that run against the backing AppSync API if you are synchronizing with the cloud.

NOTE: AWS AppSync has an adjustable limit of 100 subscriptions per connection. DataStore automatically subscribes to create, update, and delete mutations for all models.

This means that GraphQL APIs with DataStore enabled are limited to 33 models and DynamoDB tables.

(3 subscriptions * 33 models = 99 subscriptions per connection).

However, You can request a service limit increase from AWS AppSync to meet the real-time requirements of your application.

// You must hold a reference to your subscription.
var postsSubscription: AmplifyAsyncThrowingSequence<MutationEvent>?
// Then in the body of your code, subscribe to the subscription
func subscribeToPosts() async {
let postsSubscription = Amplify.DataStore.observe(Post.self)
self.postsSubscription = postsSubscription
do {
for try await changes in postsSubscription {
// handle incoming changes
print("Subscription received mutation: \(changes)")
}
} catch {
print("Subscription received error: \(error)")
}
}
// Then, when you're finished observing, cancel the subscription
func unsubscribeFromPosts() {
postsSubscription?.cancel()
}
// In your type declaration, declare a cancellable to hold onto the subscription
var postsSubscription: AnyCancellable?
// Then in the body of your code, subscribe to the publisher
func subscribeToPosts() {
postsSubscription = Amplify.Publisher.create(Amplify.DataStore.observe(Post.self))
.sink {
if case let .failure(error) = $0 {
print("Subscription received error - \(error)")
}
}
receiveValue: { changes in
// handle incoming changes
print("Subscription received mutation: \(changes)")
}
}
// Then, when you're finished observing, cancel the subscription
func unsubscribeFromPosts() {
postsSubscription?.cancel()
}

DataStore.clear() and DataStore.stop() will stop the DataStore sync engine and keep any subscriptions connected. There will not be any additional subscription events received by the subscriber until DataStore is started (DataStore.start()) or the sync engine is re-initiated upon performing a DataStore operation (query/save/delete).

This API is built on top of the Combine framework; therefore, it is only available on iOS 13 or higher.

The Amplify.Publisher.create API returns a standard AnyPublisher.

Observe query results in real-time

observeQuery(...) returns an initial data set, similar to query(...), and also automatically subscribes to subsequent changes to the query.

The first snapshot returned from observeQuery will contain the same results as calling query directly on your Model - a collection of all the locally available records. Subsequent snapshots will be emitted as updates to the dataset and received in the local store.

While data is syncing from the cloud, snapshots will contain all of the items synced so far and an isSynced status of false. When the sync process is complete, a snapshot will be emitted with all the records in the local store and an isSynced status of true.

In addition to typical real-time use cases, observeQuery can be used on app launch to show your customers an initial data set from the local store while new data is being synced from cloud.

observeQuery also accepts the same predicates and sorting options as query.

// You must hold a reference to your subscription.
var postsSubscription: AmplifyAsyncThrowingSequence<DataStoreQuerySnapshot<Post>>?
// Subscribe
func subscribeToPosts() async {
let post = Post.keys
let postsSubscription = Amplify.DataStore.observeQuery(
for: Post.self,
where: post.title.beginsWith("post") && post.rating > 10.0,
sort: .ascending(post.rating)
)
// hold onto your subscription
self.postsSubscription = postsSubscription
do {
// observe new snapshots
for try await querySnapshot in postsSubscription {
print("[Snapshot] item count: \(querySnapshot.items.count), isSynced: \(querySnapshot.isSynced)")
}
} catch {
print("Error: \(error)")
}
}
// When you're finished observing, cancel the subscription
func unsubscribeFromPosts() {
postsSubscription?.cancel()
}
// In your type declaration, declare a cancellable to hold onto the subscription
var postsSubscription: AnyCancellable?
func subscribeToPosts() {
let post = Post.keys
self.postsSubscription = Amplify.Publisher.create(
Amplify.DataStore.observeQuery(
for: Post.self,
where: post.title.beginsWith("post") && post.rating > 10.0,
sort: .ascending(post.rating)
)
)
.sink {
if case .failure(let error) = $0 {
print("Error \(error)")
}
} receiveValue: { querySnapshot in
print("[Snapshot] item count: \(querySnapshot.items.count), isSynced: \(querySnapshot.isSynced)")
}
}
// Then, when you're finished observing, cancel the subscription
func unsubscribeFromPosts() {
postsSubscription?.cancel()
}