Page updated Nov 8, 2023

Using RxJava with Amplify

Amplify also provides a set of APIs that expose Reactive Extensions, a cross-platform library for asynchronous and event-based programs.

To use it, you'll interact with the RxAmplify facade instead of the default Amplify facade.

1import com.amplifyframework.rx.RxAmplify;
2// ...
3
4Post post = Post.builder()
5 .title("My First Post")
6 .build();
7
8RxAmplify.DataStore.save(post)
9 .subscribe(
10 () -> Log.i("RxAmplifyDemo", "Saved a post"),
11 failure -> Log.e("RxAmplifyDemo", "Save failed", failure)
12 );

Compared to the traditional callback API, this doesn't make a big difference when used for a single method call.

However, it greatly improves readability when chaining asynchronous calls. Moreover, you can use standard RxJava operators to compose other complex functionality into readable chunks.

Let's revisit your nested example where you saved Post, Editor, and PostEditor. With Amplify's RxJava interface you can merge these operations together.

1Completable.mergeArray(
2 RxAmplify.DataStore.save(post),
3 RxAmplify.DataStore.save(editor)
4).andThen(
5 RxAmplify.DataStore.save(postEditor)
6).subscribe(
7 () -> Log.i("RxAmplifyDemo", "Post, Editor, and PostEditor saved"),
8 failure -> Log.e("RxAmplifyDemo", "One or more items not saved", failure)
9);

Compared to nesting these dependent calls in callbacks, this provides a much more readable pattern.

Installation

Amplify's RxJava support is included in an optional module, rxbindings. To start using the Rx APIs, add the following dependency to your application's Gradle file:

Under Gradle Scripts, open build.gradle (Module :app).

Add the following line in dependencies:

1dependencies {
2 // Add the below line in `dependencies`
3 implementation 'com.amplifyframework:rxbindings:ANDROID_VERSION'
4}

Usage

Amplify tries to map the behavior of your callback-based APIs to well-known Rx primitives in an intuitive way. Functions whose callbacks emit a single value (or error) will now return Rx Singles, instead. Functions whose callbacks emit no particular value will now return Rx Completables, instead. Lastly, functions whose callbacks emit a stream of values will now return Observables, instead.

Special cases

Some APIs return an operation which can be cancelled. Examples include subscribing to an API or uploading or downloading objects from Storage.

API subscriptions

The API category's subscribe() method exposes two Observables: one for subscription data, and one for connection state. You can access these Observables using observeConnectionState() and observeSubscriptionData() on the returned operation:

1RxSubscriptionOperation<? extends GraphQLResponse<?>> subscription =
2 RxAmplify.API.subscribe(request);
3
4subscription
5 .observeConnectionState()
6 .subscribe(
7 connectionStateEvent -> Log.i("RxAmplifyDemo", String.valueOf(connectionStateEvent))
8 );
9
10subscription
11 .observeSubscriptionData()
12 .subscribe(
13 data -> Log.i("RxAmplifyDemo", "Data on subscription = " + data),
14 failure -> Log.e("RxAmplifyDemo", "Subscription failed", failure),
15 () -> Log.i("RxAmplifyDemo", "Subscription completed")
16 );

Storage upload & download operations

The Storage category's downloadFile() and uploadFile() work largely the same way. uploadFile() and downloadFile() both return an operation containing a Single and an Observable. The Single can be used to obtain the result of the download, and the Observable can be used to monitor download/upload progress.

1// Download
2RxProgressAwareSingleOperation<StorageDownloadFileResult> download =
3 RxAmplify.Storage.downloadFile(remoteKey, localFile);
4
5download
6 .observeProgress()
7 .subscribe(
8 progress -> Log.i("RxAmplifyDemo", "Download progress = " + progress.toString())
9 );
10
11download
12 .observeResult()
13 .subscribe(
14 result -> Log.i("RxAmplifyDemo", "Download finished! " + result.getFile().getPath()),
15 failure -> Log.e("RxAmplifyDemo", "Download failed", failure)
16 );
17
18// Upload
19RxProgressAwareSingleOperation<StorageUploadFileResult> upload =
20 RxAmplify.Storage.uploadFile(remoteKey, localFile);
21
22upload
23 .observeProgress()
24 .subscribe(
25 progress -> Log.i("RxAmplifyDemo", "Upload progress = " + progress.toString())
26 );
27
28upload
29 .observeResult()
30 .subscribe(
31 result -> Log.i("RxAmplifyDemo", "Upload finished! " + result.getKey()),
32 failure -> Log.e("RxAmplifyDemo", "Upload failed", failure)
33 );