Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Page updated May 1, 2024

Amazon Kinesis Data Streams

With AWS Lambda, you can seamlessly integrate various event sources, such as Amazon Kinesis, Amazon SQS, and others, to trigger Lambda functions in response to real-time events. This feature enables you to build responsive, event-driven applications that react to changes in data or system state without the need for polling services.

In this guide, let us configure a Lambda function with a Kinesis data stream as an event source. The Lambda function is automatically triggered whenever new data is published to the stream - whether you're processing streaming data, reacting to application events, or automating workflows.

To get started, install the AWS Lambda Powertools Logger, which provides structured logging capabilities for your Lambda function, and the aws-lambda package, which is used to define the handler type.

Terminal
npm add @aws-lambda-powertools/logger @types/aws-lambda

Second, create a new directory and a resource file, amplify/functions/kinesis-function/resource.ts. Then, define the function with defineFunction:

amplify/functions/kinesis-function/resource.ts
import { defineFunction } from "@aws-amplify/backend";
export const myKinesisFunction = defineFunction({
name: "kinesis-function",
});

Third, create the corresponding handler file, amplify/functions/kinesis-function/handler.ts, file with the following contents:

amplify/functions/kinesis-function/handler.ts
import type {
KinesisStreamBatchResponse,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "node:buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler",
});
export const handler: KinesisStreamHandler = async (
event,
context
): Promise<KinesisStreamBatchResponse> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
} catch (err) {
logger.error(`An error occurred ${err}`);
/*
When processing stream data, if any item fails, returning the failed item's position immediately
prompts Lambda to retry from this item forward, ensuring continuous processing without skipping data.
*/
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
logger.info(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
const data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); // Placeholder for an async process
return data;
}

Lastly, create the Kinesis stream and add it as a event source in the amplify/backend.ts file:

amplify/backend.ts
import { defineBackend } from "@aws-amplify/backend";
import { Stream } from "aws-cdk-lib/aws-kinesis";
import { StartingPosition } from "aws-cdk-lib/aws-lambda";
import { KinesisEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { myKinesisFunction } from "./functions/kinesis-function/resource";
const backend = defineBackend({
auth,
data,
myKinesisFunction,
});
const kinesisStack = backend.createStack("kinesis-stack");
const kinesisStream = new Stream(kinesisStack, "KinesisStream", {
streamName: "myKinesisStream",
shardCount: 1,
});
const eventSource = new KinesisEventSource(kinesisStream, {
startingPosition: StartingPosition.LATEST,
reportBatchItemFailures: true,
});
backend.myKinesisFunction.resources.lambda.addEventSource(eventSource);