DynamoDB Streams
With AWS Lambda, you can seamlessly integrate various event sources, such as Amazon DynamoDB, Amazon SQS, and others, to trigger Lambda functions in response to real-time events. This feature enables you to build responsive, event-driven applications that react to changes in data or system state without the need for polling services.
In this guide, lets configure a Lambda function with an Amazon DynamoDB stream as an event source. The Lambda function is automatically triggered whenever an item is added, updated, or deleted from the table, enabling you to build real-time applications that react to changes in your data. In this example, we will use a Todo
table created by a data model on the GraphQL API.
To get started, install the AWS Lambda Powertools Logger, which provides structured logging capabilities for your Lambda function, and the aws-lambda
package, which is used to define the handler type.
npm add --save-dev @aws-lambda-powertools/logger @types/aws-lambda
Second, create a new directory and a resource file, amplify/functions/dynamoDB-function/resource.ts
. Then, define the function with defineFunction
:
import { defineFunction } from "@aws-amplify/backend";
export const myDynamoDBFunction = defineFunction({ name: "dynamoDB-function", resourceGroupName: "data",});
Third, create the corresponding handler file, amplify/functions/dynamoDB-function/handler.ts
, file with the following contents:
import type { DynamoDBStreamHandler } from "aws-lambda";import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({ logLevel: "INFO", serviceName: "dynamodb-stream-handler",});
export const handler: DynamoDBStreamHandler = async (event) => { for (const record of event.Records) { logger.info(`Processing record: ${record.eventID}`); logger.info(`Event Type: ${record.eventName}`);
if (record.eventName === "INSERT") { // business logic to process new records logger.info(`New Image: ${JSON.stringify(record.dynamodb?.NewImage)}`); } } logger.info(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [], };};
Lastly, create DynamoDB table as event source in the amplify/backend.ts
file:
import { defineBackend } from "@aws-amplify/backend";import { Stack } from "aws-cdk-lib";import { Policy, PolicyStatement, Effect } from "aws-cdk-lib/aws-iam";import { StartingPosition, EventSourceMapping } from "aws-cdk-lib/aws-lambda";import { auth } from "./auth/resource";import { data } from "./data/resource";import { myDynamoDBFunction } from "./functions/dynamoDB-function/resource";
const backend = defineBackend({ auth, data, myDynamoDBFunction,});
const todoTable = backend.data.resources.tables["Todo"];const policy = new Policy( Stack.of(todoTable), "MyDynamoDBFunctionStreamingPolicy", { statements: [ new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", ], resources: ["*"], }), ], });backend.myDynamoDBFunction.resources.lambda.role?.attachInlinePolicy(policy);
const mapping = new EventSourceMapping( Stack.of(todoTable), "MyDynamoDBFunctionTodoEventStreamMapping", { target: backend.myDynamoDBFunction.resources.lambda, eventSourceArn: todoTable.tableStreamArn, startingPosition: StartingPosition.LATEST, });
mapping.node.addDependency(policy);