Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Page updated Apr 29, 2024

Observe in real time

Amplify Android v1 is deprecated as of June 1st, 2024. No new features or bug fixes will be added. Dependencies may become outdated and potentially introduce compatibility issues.

Please use the latest version (v2) of Amplify Library for Android to get started. Refer to the upgrade guide for instructions on upgrading your application to the latest version.

Amplify libraries should be used for all new cloud connected applications. If you are currently using the AWS Mobile SDK for Android, you can access the documentation here.

Observe model mutations in real-time

You can subscribe to changes on your Models. This reacts dynamically to updates of data to the underlying Storage Engine, which could be the result of GraphQL Subscriptions as well as Queries or Mutations that run against the backing AppSync API if you are synchronizing with the cloud.

Amplify.DataStore.observe(Post.class,
cancelable -> Log.i("MyAmplifyApp", "Observation began."),
postChanged -> {
Post post = postChanged.item();
Log.i("MyAmplifyApp", "Post: " + post);
},
failure -> Log.e("MyAmplifyApp", "Observation failed.", failure),
() -> Log.i("MyAmplifyApp", "Observation complete.")
);
Amplify.DataStore.observe(Post::class.java,
{ Log.i("MyAmplifyApp", "Observation began") },
{
val post = it.item()
Log.i("MyAmplifyApp", "Post: $post")
},
{ Log.e("MyAmplifyApp", "Observation failed", it) },
{ Log.i("MyAmplifyApp", "Observation complete") }
)
Amplify.DataStore.observe(Post::class)
.onStart { Log.i("MyAmplifyApp", "Observation began") }
.catch { Log.e("MyAmplifyApp", "Observation failed", it) }
.onCompletion { Log.i("MyAmplifyApp", "Observation complete") }
.collect { Log.i("MyAmplifyApp", "Post: ${it.item()}") }
RxAmplify.DataStore.observe(Post.class)
.subscribe(
post -> Log.i("MyAmplifyApp", "Post: " + post),
failure -> Log.e("MyAmplifyApp", "Observation failed.", failure),
() -> Log.i("MyAmplifyApp", "Observation complete.")
);

Observe query results in real-time

observeQuery(...) returns an initial data set, similar to query(...), and also automatically subscribes to subsequent changes to the query.

The first snapshot returned from observeQuery will contain the same results as calling query directly on your Model - a collection of all the locally-available records. Subsequent snapshots will be emitted as updates to the dataset are received in the local store.

While data is syncing from the cloud, snapshots will contain all of the items synced so far and an isSynced status of false. When the sync process is complete, a snapshot will be emitted with all the records in the local store and an isSynced status of true.

In addition to typical real-time use cases, observeQuery can be used on app launch to show your customers an initial data set from the local store while new data is being synced from cloud.

observeQuery also accepts the same predicates and sorting options as query.

String tag = "ObserveQuery";
Consumer<DataStoreQuerySnapshot<Post>> onQuerySnapshot = value ->{
Log.d(tag, "success on snapshot");
Log.d(tag, "number of records: " + value.getItems().size());
Log.d(tag, "sync status: " + value.getIsSynced());
};
Consumer<Cancelable> observationStarted = value ->{
Log.d(tag, "success on cancelable");
};
Consumer<DataStoreException> onObservationError = value ->{
Log.d(tag, "error on snapshot$value");
};
Action onObservationComplete = () ->{
Log.d(tag, "complete");
};
QueryPredicate predicate =
Post.TITLE.beginsWith("post").and(Post.RATING.gt(10));
QuerySortBy querySortBy = new QuerySortBy("post", "rating", QuerySortOrder.ASCENDING);
List<QuerySortBy> sortByList = new ArrayList<QuerySortBy>();
sortByList.add(querySortBy);
ObserveQueryOptions options = new ObserveQueryOptions(predicate, sortByList);
Amplify.DataStore.<Post>observeQuery(
Post.class,
options,
observationStarted,
onQuerySnapshot,
onObservationError,
onObservationComplete
);
val tag = "ObserveQuery"
val onQuerySnapshot: Consumer<DataStoreQuerySnapshot<Post>> =
Consumer<DataStoreQuerySnapshot<Post>> { value: DataStoreQuerySnapshot<Post> ->
Log.d(tag, "success on snapshot")
Log.d(tag, "number of records: " + value.items.size)
Log.d(tag, "sync status: " + value.isSynced)
}
val observationStarted =
Consumer { _: Cancelable ->
Log.d(tag, "success on cancelable")
}
val onObservationError =
Consumer { value: DataStoreException ->
Log.d(tag, "error on snapshot $value")
}
val onObservationComplete = Action {
Log.d(tag, "complete")
}
val predicate: QueryPredicate =
Post.TITLE.beginsWith("post").and(Post.RATING.gt(10))
val querySortBy = QuerySortBy("post", "rating", QuerySortOrder.ASCENDING)
val options = ObserveQueryOptions(predicate, listOf(querySortBy))
Amplify.DataStore.observeQuery(
Post::class.java,
options,
observationStarted,
onQuerySnapshot,
onObservationError,
onObservationComplete
)
val querySortBy = QuerySortBy("post", "rating", QuerySortOrder.ASCENDING)
Amplify.DataStore.observeQuery(
Post::class,
ObserveQueryOptions(Post.TITLE.beginsWith("post").and(Post.RATING.gt(10)),
listOf(querySortBy))
).onStart { Log.i("MyAmplifyApp", "Observation began") }
.catch { Log.e("MyAmplifyApp", "Observation failed", it) }
.onCompletion { Log.i("MyAmplifyApp", "Observation complete") }
.collect { value -> Log.i("MyAmplifyApp", "Blog: ${value.items.size}") }
QueryPredicate predicate =
Post.TITLE.beginsWith("post")
.and(Post.RATING.gt(10))
;
QuerySortBy querySortBy = new QuerySortBy("post", "rating", QuerySortOrder.ASCENDING);
List<QuerySortBy> sortByList = new ArrayList<QuerySortBy>();
sortByList.add(querySortBy);
ObserveQueryOptions options = new ObserveQueryOptions(predicate, sortByList);
rxDataStore.observeQuery(Model.class, options).subscribe(
modelDataStoreQuerySnapshot -> {
Log.d("mySampleApp", "success on snapshot");
Log.d("mySampleApp", "number of records: " + modelDataStoreQuerySnapshot.getItems().size());
Log.d("mySampleApp", "sync status: " + modelDataStoreQuerySnapshot.getIsSynced());
});