Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Amazon Data Firehose client

AmplifyFirehoseClient is a standalone client for streaming data to Amazon Data Firehose delivery streams. It provides:

  • Local persistence for offline support
  • Automatic retry for failed records
  • Automatic batching (up to 500 records or 4 MB per request)
  • Interval-based automatic flushing (default: every 30 seconds)
  • Enable/disable toggle that silently drops new records while preserving cached ones

This is a standalone client, separate from the Amplify Analytics category plugin. It communicates directly with the Firehose API using PutRecordBatch.

Before using this client, ensure your backend is configured with the required IAM permissions. See Set up Amazon Data Firehose.

Getting started

Installation

Add the dependency to your pubspec.yaml:

dependencies:
amplify_firehose: ^2.11.0

Initialize the client

import 'package:amplify_firehose/amplify_firehose.dart';
final firehose = await AmplifyFirehoseClient.create(
region: 'us-east-1',
credentialsProvider: credentialsProvider,
);

The Flutter client automatically resolves the local storage path using path_provider. On web, it uses IndexedDB with an in-memory fallback.

Configuration options

You can customize the client behavior by passing an options object:

OptionDefaultDescription
cacheMaxBytes5 MBMaximum size of the local record cache in bytes.
maxRetries5Maximum retry attempts per record before it is discarded.
flushStrategyFlushInterval(interval: Duration(seconds: 30))Automatic flush interval. Use FlushNone() for manual-only flushing.
import 'package:amplify_firehose/amplify_firehose.dart';
final firehose = await AmplifyFirehoseClient.create(
region: 'us-east-1',
credentialsProvider: credentialsProvider,
options: const AmplifyFirehoseClientOptions(
cacheMaxBytes: 10 * 1024 * 1024, // 10 MB
maxRetries: 5,
flushStrategy: FlushInterval(
interval: Duration(seconds: 30),
),
),
);

To disable automatic flushing:

options: const AmplifyFirehoseClientOptions(
flushStrategy: FlushNone(),
)

Usage

Record data

Use record() to persist data to the local cache. Records are sent to Firehose during the next flush cycle (automatic or manual).

import 'dart:convert';
import 'dart:typed_data';
final result = await firehose.record(
data: Uint8List.fromList(utf8.encode('Hello Firehose')),
streamName: 'my-delivery-stream',
);
switch (result) {
case Ok():
// recorded successfully
case Error(:final error):
// handle error
}

Records submitted while the client is disabled are silently dropped.

Flush records

The client automatically flushes cached records at the configured interval (default: 30 seconds). You can also trigger a manual flush:

switch (await firehose.flush()) {
case Ok(:final value):
print('Flushed ${value.recordsFlushed} records');
case Error(:final error):
print('Flush error: $error');
}

Each flush sends at most one batch per stream (up to 500 records or 4 MB). Remaining records are picked up in subsequent flush cycles. If a flush is already in progress, the call returns immediately with flushInProgress: true.

Manual flushes work even when the client is disabled, allowing you to drain cached records without re-enabling collection.

Clear cache

Delete all cached records from local storage:

final result = await firehose.clearCache();

Enable and disable

You can toggle record collection and automatic flushing at runtime. When disabled, new records are silently dropped but already-cached records remain in storage.

firehose.disable();
// Records are dropped, auto-flush paused
firehose.enable();
// Collection and auto-flush resume

Close the client

When you're done with the client, close it to release resources:

await firehose.close();

After closing, all operations return an error. Create a new client instance if needed.

Advanced

Escape hatch

Access the underlying AWS SDK FirehoseClient for operations not covered by this client's API:

final sdkClient = firehose.firehoseClient;
// Use sdkClient for direct Firehose API calls

Error handling

All operations surface errors through a sealed exception hierarchy:

Error typeDescription
FirehoseValidationExceptionRecord input validation failed (oversized record).
FirehoseLimitExceededExceptionLocal cache is full. Call flush() or clearCache() to free space.
FirehoseStorageExceptionLocal database error.
FirehoseClientClosedExceptionOperation attempted on a closed client.
FirehoseUnknownExceptionUnexpected or uncategorized error.

Operations return Result<T> which can be pattern-matched:

final result = await firehose.record(
data: payload,
streamName: 'stream',
);
switch (result) {
case Ok():
// success
case Error(:final error):
switch (error) {
case FirehoseValidationException():
print('Validation error: ${error.message}');
case FirehoseLimitExceededException():
print('Cache full');
case FirehoseStorageException():
print('Storage error: ${error.message}');
case FirehoseClientClosedException():
print('Client is closed');
case FirehoseUnknownException():
print('Unknown error: ${error.message}');
}
}

Retry behavior

  • All PutRecordBatch error codes (ServiceUnavailableException, InternalFailure) are treated as retryable.
  • Each failed record's retry count is incremented after each attempt.
  • Records exceeding maxRetries (default: 5) are permanently deleted from the cache.
  • SDK-level Firehose errors are logged and skipped per-stream, so other streams can still flush.
  • Non-SDK errors (network failures, storage errors) abort the flush entirely.

Firehose service limits

The client enforces these limits before sending to the service:

LimitValue
Max records per PutRecordBatch request500
Max single record size1,000 KiB
Max total payload per PutRecordBatch request4 MB