Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Page updated Sep 9, 2024

DynamoDB Streams

With AWS Lambda, you can seamlessly integrate various event sources, such as Amazon DynamoDB, Amazon SQS, and others, to trigger Lambda functions in response to real-time events. This feature enables you to build responsive, event-driven applications that react to changes in data or system state without the need for polling services.

In this guide, lets configure a Lambda function with an Amazon DynamoDB stream as an event source. The Lambda function is automatically triggered whenever an item is added, updated, or deleted from the table, enabling you to build real-time applications that react to changes in your data. In this example, we will use a Todo table created by a data model on the GraphQL API.

To get started, install the AWS Lambda Powertools Logger, which provides structured logging capabilities for your Lambda function, and the aws-lambda package, which is used to define the handler type.

Terminal
npm add --save-dev @aws-lambda-powertools/logger @types/aws-lambda

Second, create a new directory and a resource file, amplify/functions/dynamoDB-function/resource.ts. Then, define the function with defineFunction:

amplify/functions/dynamoDB-function/resource.ts
import { defineFunction } from "@aws-amplify/backend";
export const myDynamoDBFunction = defineFunction({
name: "dynamoDB-function",
});

Third, create the corresponding handler file, amplify/functions/dynamoDB-function/handler.ts, file with the following contents:

amplify/functions/dynamoDB-function/handler.ts
import type { DynamoDBStreamHandler } from "aws-lambda";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "dynamodb-stream-handler",
});
export const handler: DynamoDBStreamHandler = async (event) => {
for (const record of event.Records) {
logger.info(`Processing record: ${record.eventID}`);
logger.info(`Event Type: ${record.eventName}`);
if (record.eventName === "INSERT") {
// business logic to process new records
logger.info(`New Image: ${JSON.stringify(record.dynamodb?.NewImage)}`);
}
}
logger.info(`Successfully processed ${event.Records.length} records.`);
return {
batchItemFailures: [],
};
};

Lastly, create DynamoDB table as event source in the amplify/backend.ts file:

amplify/backend.ts
import { defineBackend } from "@aws-amplify/backend";
import { Stack } from "aws-cdk-lib";
import { Policy, PolicyStatement, Effect } from "aws-cdk-lib/aws-iam";
import { StartingPosition, EventSourceMapping } from "aws-cdk-lib/aws-lambda";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { myDynamoDBFunction } from "./functions/dynamoDB-function/resource";
const backend = defineBackend({
auth,
data,
myDynamoDBFunction,
});
const todoTable = backend.data.resources.tables["Todo"];
const policy = new Policy(
Stack.of(todoTable),
"MyDynamoDBFunctionStreamingPolicy",
{
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams",
],
resources: ["*"],
}),
],
}
);
backend.myDynamoDBFunction.resources.lambda.role?.attachInlinePolicy(policy);
const mapping = new EventSourceMapping(
Stack.of(todoTable),
"MyDynamoDBFunctionTodoEventStreamMapping",
{
target: backend.myDynamoDBFunction.resources.lambda,
eventSourceArn: todoTable.tableStreamArn,
startingPosition: StartingPosition.LATEST,
}
);
mapping.node.addDependency(policy);