Amazon Data Firehose client
AmplifyFirehoseClient is a standalone client for streaming data to Amazon Data Firehose delivery streams. It provides:
- Local persistence for offline support
- Automatic retry for failed records
- Automatic batching (up to 500 records or 4 MB per request)
- Interval-based automatic flushing (default: every 30 seconds)
- Enable/disable toggle that silently drops new records while preserving cached ones
Getting started
Installation
Add the dependency to your pubspec.yaml:
dependencies: amplify_firehose: ^2.11.0Initialize the client
import 'package:amplify_firehose/amplify_firehose.dart';
final firehose = await AmplifyFirehoseClient.create( region: 'us-east-1', credentialsProvider: credentialsProvider,);The Flutter client automatically resolves the local storage path using path_provider. On web, it uses IndexedDB with an in-memory fallback.
Configuration options
You can customize the client behavior by passing an options object:
| Option | Default | Description |
|---|---|---|
cacheMaxBytes | 5 MB | Maximum size of the local record cache in bytes. |
maxRetries | 5 | Maximum retry attempts per record before it is discarded. |
flushStrategy | FlushInterval(interval: Duration(seconds: 30)) | Automatic flush interval. Use FlushNone() for manual-only flushing. |
import 'package:amplify_firehose/amplify_firehose.dart';
final firehose = await AmplifyFirehoseClient.create( region: 'us-east-1', credentialsProvider: credentialsProvider, options: const AmplifyFirehoseClientOptions( cacheMaxBytes: 10 * 1024 * 1024, // 10 MB maxRetries: 5, flushStrategy: FlushInterval( interval: Duration(seconds: 30), ), ),);To disable automatic flushing:
options: const AmplifyFirehoseClientOptions( flushStrategy: FlushNone(),)Usage
Record data
Use record() to persist data to the local cache. Records are sent to Firehose during the next flush cycle (automatic or manual).
import 'dart:convert';import 'dart:typed_data';
final result = await firehose.record( data: Uint8List.fromList(utf8.encode('Hello Firehose')), streamName: 'my-delivery-stream',);switch (result) { case Ok(): // recorded successfully case Error(:final error): // handle error}Records submitted while the client is disabled are silently dropped.
Flush records
The client automatically flushes cached records at the configured interval (default: 30 seconds). You can also trigger a manual flush:
switch (await firehose.flush()) { case Ok(:final value): print('Flushed ${value.recordsFlushed} records'); case Error(:final error): print('Flush error: $error');}Each flush sends at most one batch per stream (up to 500 records or 4 MB). Remaining records are picked up in subsequent flush cycles. If a flush is already in progress, the call returns immediately with flushInProgress: true.
Manual flushes work even when the client is disabled, allowing you to drain cached records without re-enabling collection.
Clear cache
Delete all cached records from local storage:
final result = await firehose.clearCache();Enable and disable
You can toggle record collection and automatic flushing at runtime. When disabled, new records are silently dropped but already-cached records remain in storage.
firehose.disable();// Records are dropped, auto-flush paused
firehose.enable();// Collection and auto-flush resumeClose the client
When you're done with the client, close it to release resources:
await firehose.close();After closing, all operations return an error. Create a new client instance if needed.
Advanced
Escape hatch
Access the underlying AWS SDK FirehoseClient for operations not covered by this client's API:
final sdkClient = firehose.firehoseClient;// Use sdkClient for direct Firehose API callsError handling
All operations surface errors through a sealed exception hierarchy:
| Error type | Description |
|---|---|
FirehoseValidationException | Record input validation failed (oversized record). |
FirehoseLimitExceededException | Local cache is full. Call flush() or clearCache() to free space. |
FirehoseStorageException | Local database error. |
FirehoseClientClosedException | Operation attempted on a closed client. |
FirehoseUnknownException | Unexpected or uncategorized error. |
Operations return Result<T> which can be pattern-matched:
final result = await firehose.record( data: payload, streamName: 'stream',);switch (result) { case Ok(): // success case Error(:final error): switch (error) { case FirehoseValidationException(): print('Validation error: ${error.message}'); case FirehoseLimitExceededException(): print('Cache full'); case FirehoseStorageException(): print('Storage error: ${error.message}'); case FirehoseClientClosedException(): print('Client is closed'); case FirehoseUnknownException(): print('Unknown error: ${error.message}'); }}Retry behavior
- All
PutRecordBatcherror codes (ServiceUnavailableException,InternalFailure) are treated as retryable. - Each failed record's retry count is incremented after each attempt.
- Records exceeding
maxRetries(default: 5) are permanently deleted from the cache. - SDK-level Firehose errors are logged and skipped per-stream, so other streams can still flush.
- Non-SDK errors (network failures, storage errors) abort the flush entirely.
Firehose service limits
The client enforces these limits before sending to the service:
| Limit | Value |
|---|---|
Max records per PutRecordBatch request | 500 |
| Max single record size | 1,000 KiB |
Max total payload per PutRecordBatch request | 4 MB |