Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Kinesis Data Streams client

AmplifyKinesisClient is a standalone client for streaming data to Amazon Kinesis Data Streams. It provides:

  • Local persistence for offline support
  • Automatic retry for failed records
  • Automatic batching (up to 500 records or 10 MB per request)
  • Interval-based automatic flushing (default: every 30 seconds)
  • Enable/disable toggle that silently drops new records while preserving cached ones

This is a standalone client, separate from the Amplify Analytics category plugin. It communicates directly with the Kinesis Data Streams API using PutRecords.

Before using this client, ensure your backend is configured with the required IAM permissions. See Set up Kinesis Data Streams.

Getting started

Installation

Add AmplifyKinesisClient to your project using Swift Package Manager. In Xcode, go to File > Add Package Dependencies and enter the repository URL for the Amplify Swift SDK.

Initialize the client

import AmplifyKinesisClient
let kinesis = try AmplifyKinesisClient(
region: "us-east-1",
credentialsProvider: credentialsProvider
)

Configuration options

You can customize the client behavior by passing an options object:

OptionDefaultDescription
cacheMaxBytes5 MBMaximum size of the local record cache in bytes.
maxRetries5Maximum retry attempts per record before it is discarded.
flushStrategy.interval(30)Automatic flush interval in seconds. Use .none for manual-only flushing.
configureClientnilClosure to customize the underlying KinesisClientConfiguration.
let kinesis = try AmplifyKinesisClient(
region: "us-east-1",
credentialsProvider: credentialsProvider,
options: .init(
cacheMaxBytes: 10 * 1_024 * 1_024, // 10 MB
maxRetries: 3,
flushStrategy: .interval(60),
configureClient: { config in
// Customize the underlying KinesisClientConfiguration
}
)
)

To disable automatic flushing:

options: .init(flushStrategy: .none)

Usage

Record data

Use record() to persist data to the local cache. Records are sent to Kinesis during the next flush cycle (automatic or manual).

let result = try await kinesis.record(
data: "Hello Kinesis".data(using: .utf8)!,
partitionKey: "partition-1",
streamName: "my-stream"
)

Records submitted while the client is disabled are silently dropped.

Flush records

The client automatically flushes cached records at the configured interval (default: 30 seconds). You can also trigger a manual flush:

let flushResult = try await kinesis.flush()
print("Flushed \(flushResult.recordsFlushed) records")

Each flush sends at most one batch per stream (up to 500 records or 10 MB). Remaining records are picked up in subsequent flush cycles. If a flush is already in progress, the call returns immediately with flushInProgress: true.

Manual flushes work even when the client is disabled, allowing you to drain cached records without re-enabling collection.

Clear cache

Delete all cached records from local storage:

let cleared = try await kinesis.clearCache()

Enable and disable

You can toggle record collection and automatic flushing at runtime. When disabled, new records are silently dropped but already-cached records remain in storage.

await kinesis.disable()
// Records are dropped, auto-flush paused
await kinesis.enable()
// Collection and auto-flush resume

Advanced

Escape hatch

Access the underlying AWS SDK KinesisClient for operations not covered by this client's API:

let sdkClient = kinesis.getKinesisClient()
// Use sdkClient for direct Kinesis API calls

Error handling

All operations surface errors through a sealed exception hierarchy:

Error typeDescription
KinesisError.validationRecord input validation failed (oversized record, invalid partition key).
KinesisError.cacheLimitExceededLocal cache is full. Call flush() or clearCache() to free space.
KinesisError.cacheLocal database error.
KinesisError.unknownUnexpected or uncategorized error.

Operations throw KinesisError:

do {
try await kinesis.record(
data: payload,
partitionKey: "key",
streamName: "stream"
)
} catch let error as KinesisError {
switch error {
case .validation(let desc, _, _):
print("Validation error: \(desc)")
case .cacheLimitExceeded:
print("Cache full")
case .cache(let desc, _, _):
print("Storage error: \(desc)")
case .unknown(let desc, _, _):
print("Unknown error: \(desc)")
}
}

Retry behavior

  • All PutRecords error codes (ProvisionedThroughputExceededException, InternalFailure) are treated as retryable.
  • Each failed record's retry count is incremented after each attempt.
  • Records exceeding maxRetries (default: 5) are permanently deleted from the cache.
  • SDK-level Kinesis errors are logged and skipped per-stream, so other streams can still flush.
  • Non-SDK errors (network failures, storage errors) abort the flush entirely.

Kinesis service limits

The client enforces these limits before sending to the service:

LimitValue
Max records per PutRecords request500
Max single record size10 MB
Max total payload per PutRecords request10 MB
Max partition key length256 characters