Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Using RxJava with Amplify

Amplify Android v1 is deprecated as of June 1st, 2024. No new features or bug fixes will be added. Dependencies may become outdated and potentially introduce compatibility issues.

Please use the latest version (v2) of Amplify Library for Android to get started. Refer to the upgrade guide for instructions on upgrading your application to the latest version.

Amplify libraries should be used for all new cloud connected applications. If you are currently using the AWS Mobile SDK for Android, you can access the documentation here.

Amplify also provides a set of APIs that expose Reactive Extensions, a cross-platform library for asynchronous and event-based programs.

To use it, you'll interact with the RxAmplify facade instead of the default Amplify facade.

import com.amplifyframework.rx.RxAmplify;
// ...
Post post = Post.builder()
.title("My First Post")
.build();
RxAmplify.DataStore.save(post)
.subscribe(
() -> Log.i("RxAmplifyDemo", "Saved a post"),
failure -> Log.e("RxAmplifyDemo", "Save failed", failure)
);

Compared to the traditional callback API, this doesn't make a big difference when used for a single method call.

However, it greatly improves readability when chaining asynchronous calls. Moreover, you can use standard RxJava operators to compose other complex functionality into readable chunks.

Let's revisit your nested example where you saved Post, Editor, and PostEditor. With Amplify's RxJava interface you can merge these operations together.

Completable.mergeArray(
RxAmplify.DataStore.save(post),
RxAmplify.DataStore.save(editor)
).andThen(
RxAmplify.DataStore.save(postEditor)
).subscribe(
() -> Log.i("RxAmplifyDemo", "Post, Editor, and PostEditor saved"),
failure -> Log.e("RxAmplifyDemo", "One or more items not saved", failure)
);

Compared to nesting these dependent calls in callbacks, this provides a much more readable pattern.

Install RxJava support

Amplify's RxJava support is included in an optional module, rxbindings. To start using the Rx APIs, add the following dependency to your application's Gradle file:

Under Gradle Scripts, open build.gradle (Module :app).

Add the following line in dependencies:

dependencies {
// Add the below line in `dependencies`
implementation 'com.amplifyframework:rxbindings:ANDROID_V1_VERSION'
}

Using Rx primitives

Amplify tries to map the behavior of your callback-based APIs to well-known Rx primitives in an intuitive way. Functions whose callbacks emit a single value (or error) will now return Rx Singles, instead. Functions whose callbacks emit no particular value will now return Rx Completables, instead. Lastly, functions whose callbacks emit a stream of values will now return Observables, instead.

Special cases with RxJava

Some APIs return an operation which can be cancelled. Examples include subscribing to an API or uploading or downloading objects from Storage.

API subscriptions with RxJava

The API category's subscribe() method exposes two Observables: one for subscription data, and one for connection state. You can access these Observables using observeConnectionState() and observeSubscriptionData() on the returned operation:

RxSubscriptionOperation<? extends GraphQLResponse<?>> subscription =
RxAmplify.API.subscribe(request);
subscription
.observeConnectionState()
.subscribe(
connectionStateEvent -> Log.i("RxAmplifyDemo", String.valueOf(connectionStateEvent))
);
subscription
.observeSubscriptionData()
.subscribe(
data -> Log.i("RxAmplifyDemo", "Data on subscription = " + data),
failure -> Log.e("RxAmplifyDemo", "Subscription failed", failure),
() -> Log.i("RxAmplifyDemo", "Subscription completed")
);

Storage upload and download operations with RxJava

The Storage category's downloadFile() and uploadFile() work largely the same way. uploadFile() and downloadFile() both return an operation containing a Single and an Observable. The Single can be used to obtain the result of the download, and the Observable can be used to monitor download/upload progress.

// Download
RxProgressAwareSingleOperation<StorageDownloadFileResult> download =
RxAmplify.Storage.downloadFile(remoteKey, localFile);
download
.observeProgress()
.subscribe(
progress -> Log.i("RxAmplifyDemo", "Download progress = " + progress.toString())
);
download
.observeResult()
.subscribe(
result -> Log.i("RxAmplifyDemo", "Download finished! " + result.getFile().getPath()),
failure -> Log.e("RxAmplifyDemo", "Download failed", failure)
);
// Upload
RxProgressAwareSingleOperation<StorageUploadFileResult> upload =
RxAmplify.Storage.uploadFile(remoteKey, localFile);
upload
.observeProgress()
.subscribe(
progress -> Log.i("RxAmplifyDemo", "Upload progress = " + progress.toString())
);
upload
.observeResult()
.subscribe(
result -> Log.i("RxAmplifyDemo", "Upload finished! " + result.getKey()),
failure -> Log.e("RxAmplifyDemo", "Upload failed", failure)
);