Kinesis Data Streams client
AmplifyKinesisClient is a standalone client for streaming data to Amazon Kinesis Data Streams. It provides:
- Local persistence for offline support
- Automatic retry for failed records
- Automatic batching (up to 500 records or 10 MB per request)
- Interval-based automatic flushing (default: every 30 seconds)
- Enable/disable toggle that silently drops new records while preserving cached ones
Getting started
Installation
Add the dependency to your module's build.gradle.kts:
dependencies { implementation("com.amplifyframework:aws-kinesis:LATEST_VERSION")}Initialize the client
import com.amplifyframework.kinesis.AmplifyKinesisClient
val kinesis = AmplifyKinesisClient( context = applicationContext, region = "us-east-1", credentialsProvider = credentialsProvider)Configuration options
You can customize the client behavior by passing an options object:
| Option | Default | Description |
|---|---|---|
cacheMaxBytes | 5 MB | Maximum size of the local record cache in bytes. |
maxRetries | 5 | Maximum retry attempts per record before it is discarded. |
flushStrategy | FlushStrategy.Interval(30.seconds) | Automatic flush interval. Use FlushStrategy.None for manual-only flushing. |
configureClient | null | Escape hatch to customize the underlying AWS SDK KinesisClient. |
import com.amplifyframework.kinesis.AmplifyKinesisClientimport com.amplifyframework.kinesis.AmplifyKinesisClientOptionsimport com.amplifyframework.recordcache.FlushStrategyimport kotlin.time.Duration.Companion.seconds
val kinesis = AmplifyKinesisClient( context = applicationContext, region = "us-east-1", credentialsProvider = credentialsProvider, options = AmplifyKinesisClientOptions { cacheMaxBytes = 10L * 1024 * 1024 // 10 MB maxRetries = 3 flushStrategy = FlushStrategy.Interval(60.seconds) configureClient { retryStrategy { maxAttempts = 10 } } })To disable automatic flushing:
options = AmplifyKinesisClientOptions { flushStrategy = FlushStrategy.None}Usage
Record data
Use record() to persist data to the local cache. Records are sent to Kinesis during the next flush cycle (automatic or manual).
val result = kinesis.record( data = "Hello Kinesis".toByteArray(), partitionKey = "partition-1", streamName = "my-stream")when (result) { is Result.Success -> { /* recorded successfully */ } is Result.Failure -> { /* handle error */ }}Records submitted while the client is disabled are silently dropped.
Flush records
The client automatically flushes cached records at the configured interval (default: 30 seconds). You can also trigger a manual flush:
when (val result = kinesis.flush()) { is Result.Success -> println("Flushed ${result.data.recordsFlushed} records") is Result.Failure -> println("Flush error: ${result.error}")}Each flush sends at most one batch per stream (up to 500 records or 10 MB). Remaining records are picked up in subsequent flush cycles. If a flush is already in progress, the call returns immediately with flushInProgress: true.
Manual flushes work even when the client is disabled, allowing you to drain cached records without re-enabling collection.
Clear cache
Delete all cached records from local storage:
kinesis.clearCache()Enable and disable
You can toggle record collection and automatic flushing at runtime. When disabled, new records are silently dropped but already-cached records remain in storage.
kinesis.disable()// Records are dropped, auto-flush paused
kinesis.enable()// Collection and auto-flush resumeAdvanced
Escape hatch
Access the underlying AWS SDK KinesisClient for operations not covered by this client's API:
val sdkClient = kinesis.kinesisClient// Use sdkClient for direct Kinesis API callsError handling
All operations surface errors through a sealed exception hierarchy:
| Error type | Description |
|---|---|
AmplifyKinesisValidationException | Record input validation failed (oversized record, invalid partition key). |
AmplifyKinesisLimitExceededException | Local cache is full. Call flush() or clearCache() to free space. |
AmplifyKinesisStorageException | Local database error. |
AmplifyKinesisUnknownException | Unexpected or uncategorized error. |
Operations return Result<T, AmplifyKinesisException>:
when (val result = kinesis.record(...)) { is Result.Success -> { /* success */ } is Result.Failure -> when (result.error) { is AmplifyKinesisValidationException -> { /* invalid input */ } is AmplifyKinesisLimitExceededException -> { /* cache full */ } is AmplifyKinesisStorageException -> { /* database error */ } is AmplifyKinesisUnknownException -> { /* unexpected error */ } }}Retry behavior
- All
PutRecordserror codes (ProvisionedThroughputExceededException,InternalFailure) are treated as retryable. - Each failed record's retry count is incremented after each attempt.
- Records exceeding
maxRetries(default: 5) are permanently deleted from the cache. - SDK-level Kinesis errors are logged and skipped per-stream, so other streams can still flush.
- Non-SDK errors (network failures, storage errors) abort the flush entirely.
Kinesis service limits
The client enforces these limits before sending to the service:
| Limit | Value |
|---|---|
Max records per PutRecords request | 500 |
| Max single record size | 10 MB |
Max total payload per PutRecords request | 10 MB |
| Max partition key length | 256 characters |