Streaming analytics data
The Amazon Kinesis analytics provider allows you to send analytics data to an Kinesis stream for real-time processing.
Setup Kinesis stream
The following is an example utilizing the AWS Cloud Development Kit (AWS CDK) to create the Analytics resource powered by Amazon Kinesis.
import { auth } from "./auth/resource";import { data } from "./data/resource";import { Policy, PolicyStatement } from "aws-cdk-lib/aws-iam";import { Stream } from "aws-cdk-lib/aws-kinesis";import { Stack } from "aws-cdk-lib/core";
const backend = defineBackend({ auth, data, // additional resources });
// create a new stack for the Kinesis streamconst kinesisStack = backend.createStack("kinesis-stack");
// create a new Kinesis stream with one shardconst kinesisStream = new Stream(kinesisStack, "KinesisStream", { streamName: "myKinesisStream", shardCount: 1,});
// create a new policy to allow PutRecords to the Kinesis streamconst kinesisPolicy = new Policy(kinesisStack, "KinesisPolicy", { statements: [ new PolicyStatement({ actions: ["kinesis:PutRecords"], resources: [kinesisStream.streamArn], }), ],});
// apply the policy to the authenticated and unauthenticated rolesbackend.auth.resources.authenticatedUserIamRole.attachInlinePolicy(kinesisPolicy);backend.auth.resources.unauthenticatedUserIamRole.attachInlinePolicy(kinesisPolicy);
Installation and Configuration
If you did not use the CLI, ensure you have setup IAM permissions for kinesis:PutRecords
.
Example IAM policy for Amazon Kinesis:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "kinesis:PutRecords", "Resource": "arn:aws:kinesis:<your-aws-region>:<your-aws-account-id>:stream/<your-stream-name>" // replace the template fields }]}
For more information visit the Amazon Kinesis Developer Documentation.
Configure Kinesis:
// Configure the plugin after adding it to the Analytics moduleimport { Amplify } from 'aws-amplify';
Amplify.configure({ ...Amplify.getConfig(), Analytics: { Kinesis: { // REQUIRED - Amazon Kinesis service region region: 'us-east-1',
// OPTIONAL - The buffer size for events in number of items. bufferSize: 1000,
// OPTIONAL - The number of events to be deleted from the buffer when flushed. flushSize: 100,
// OPTIONAL - The interval in milliseconds to perform a buffer check and flush if necessary. flushInterval: 5000, // 5s
// OPTIONAL - The limit for failed recording retries. resendLimit: 5 } }});
Stream data
You can send a data to a Kinesis stream with the standard record()
method:
import { record } from 'aws-amplify/analytics/kinesis';
record({ data: { // The data blob to put into the record }, partitionKey: 'myPartitionKey', streamName: 'myKinesisStream'});
Flush events
The recorded events are saved in a buffer and sent to the remote server periodically (You can tune it with the flushInterval
option). If needed, you have the option to manually clear all the events from the buffer by using the 'flushEvents' API.
import { flushEvents } from 'aws-amplify/analytics/kinesis';
flushEvents();