Using RxJava with Amplify
Amplify also provides a set of APIs that expose Reactive Extensions, a cross-platform library for asynchronous and event-based programs.
To use it, you'll interact with the RxAmplify
facade instead of the default Amplify
facade.
import com.amplifyframework.rx.RxAmplify;// ...
Post post = Post.builder() .title("My First Post") .build();
RxAmplify.DataStore.save(post) .subscribe( () -> Log.i("RxAmplifyDemo", "Saved a post"), failure -> Log.e("RxAmplifyDemo", "Save failed", failure) );
Compared to the traditional callback API, this doesn't make a big difference when used for a single method call.
However, it greatly improves readability when chaining asynchronous calls. Moreover, you can use standard RxJava operators to compose other complex functionality into readable chunks.
Let's revisit your nested example where you saved Post
, Editor
, and PostEditor
. With Amplify's RxJava interface you can merge these operations together.
Completable.mergeArray( RxAmplify.DataStore.save(post), RxAmplify.DataStore.save(editor)).andThen( RxAmplify.DataStore.save(postEditor)).subscribe( () -> Log.i("RxAmplifyDemo", "Post, Editor, and PostEditor saved"), failure -> Log.e("RxAmplifyDemo", "One or more items not saved", failure));
Compared to nesting these dependent calls in callbacks, this provides a much more readable pattern.
Install RxJava support
Amplify's RxJava support is included in an optional module, rxbindings
. To start using the Rx APIs, add the following dependency to your application's Gradle file:
Under Gradle Scripts, open build.gradle (Module :app).
Add the following line in dependencies
:
dependencies { // Add the below line in `dependencies` implementation 'com.amplifyframework:rxbindings:ANDROID_V1_VERSION'}
Using Rx primitives
Amplify tries to map the behavior of your callback-based APIs to well-known Rx primitives in an intuitive way. Functions whose callbacks emit a single value (or error) will now return Rx Single
s, instead. Functions whose callbacks emit no particular value will now return Rx Completable
s, instead. Lastly, functions whose callbacks emit a stream of values will now return Observable
s, instead.
Special cases with RxJava
Some APIs return an operation which can be cancelled. Examples include subscribing to an API or uploading or downloading objects from Storage.
API subscriptions with RxJava
The API category's subscribe()
method exposes two Observable
s: one for subscription data, and one for connection state. You can access these Observable
s using observeConnectionState()
and observeSubscriptionData()
on the returned operation:
RxSubscriptionOperation<? extends GraphQLResponse<?>> subscription = RxAmplify.API.subscribe(request);
subscription .observeConnectionState() .subscribe( connectionStateEvent -> Log.i("RxAmplifyDemo", String.valueOf(connectionStateEvent)) );
subscription .observeSubscriptionData() .subscribe( data -> Log.i("RxAmplifyDemo", "Data on subscription = " + data), failure -> Log.e("RxAmplifyDemo", "Subscription failed", failure), () -> Log.i("RxAmplifyDemo", "Subscription completed") );
Storage upload and download operations with RxJava
The Storage category's downloadFile()
and uploadFile()
work largely the same way. uploadFile()
and downloadFile()
both return an operation containing a Single
and an Observable
. The Single
can be used to obtain the result of the download, and the Observable
can be used to monitor download/upload progress.
// DownloadRxProgressAwareSingleOperation<StorageDownloadFileResult> download = RxAmplify.Storage.downloadFile(remoteKey, localFile);
download .observeProgress() .subscribe( progress -> Log.i("RxAmplifyDemo", "Download progress = " + progress.toString()) );
download .observeResult() .subscribe( result -> Log.i("RxAmplifyDemo", "Download finished! " + result.getFile().getPath()), failure -> Log.e("RxAmplifyDemo", "Download failed", failure) );
// UploadRxProgressAwareSingleOperation<StorageUploadFileResult> upload = RxAmplify.Storage.uploadFile(remoteKey, localFile);
upload .observeProgress() .subscribe( progress -> Log.i("RxAmplifyDemo", "Upload progress = " + progress.toString()) );
upload .observeResult() .subscribe( result -> Log.i("RxAmplifyDemo", "Upload finished! " + result.getKey()), failure -> Log.e("RxAmplifyDemo", "Upload failed", failure) );