Using Amazon Kinesis

You are currently viewing the AWS SDK for Mobile documentation which is a collection of low-level libraries. Use the Amplify libraries for all new app development. Learn more

You can view the Mobile SDK API reference here.

The two classes AWSKinesisRecorder and AWSFirehoseRecorder allow you to interface with Amazon Kinesis and Amazon Kinesis Firehose to stream analytics data for real-time processing.

What is Amazon Kinesis?

Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. Amazon Kinesis can collect and process hundreds of terabytes of data per hour from hundreds of thousands of sources, so you can write applications that process information in real-time. With Amazon Kinesis applications, you can build real-time dashboards, capture exceptions and generate alerts, drive recommendations, and make other real-time business or operational decisions. You can also easily send data to other services such as Amazon Simple Storage Service, Amazon DynamoDB, and Amazon Redshift.

The Amazon Kinesis AWSKinesisRecorder client lets you store PutRecord requests on disk and then send them all at once. This is useful because many mobile applications that use Amazon Kinesis will create multiple PutRecord requests per second. Sending an individual request for each PutRecord action could adversely impact battery life. Moreover, the requests could be lost if the device goes offline. Thus, using the high-level Amazon Kinesis client for batching can preserve both battery life and data.

What is Amazon Kinesis Firehose?

Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. With Firehose, you do not need to write any applications or manage any resources. You configure your data producers to send data to Firehose and it automatically delivers the data to the destination that you specified.

The Amazon Kinesis Firehose AWSFirehoseRecorder client lets you store PutRecords requests on disk and then send them using Kinesis Data FirehosePutRecordBatch.

For more information about Amazon Kinesis Firehose, see Amazon Kinesis Firehose.

Integrating Amazon Kinesis and Amazon Kinesis Firehose

Add the following to your Podfile:

1pod 'AWSKinesis'

The instructions direct you to import the headers for the services you'll be using. For this example, you need the following import.

1import AWSKinesis

To use Amazon Kinesis in an application, you must set the correct permissions. The following IAM policy allows the user to submit records to a specific Amazon Kinesis stream, which is identified by ARN.

1{
2 "Statement": [
3 {
4 "Effect": "Allow",
5 "Action": "kinesis:PutRecords",
6 "Resource": "arn:aws:kinesis:us-west-2:111122223333:stream/mystream"
7 }
8 ]
9}

The following IAM policy allows the user to submit records to a specific Amazon Kinesis Firehose stream.

1{
2 "Statement": [
3 {
4 "Effect": "Allow",
5 "Action": "firehose:PutRecordBatch",
6 "Resource": "arn:aws:firehose:us-west-2:111122223333:deliverystream/mystream"
7 }
8 ]
9}

This policy should be applied to roles assigned to the Amazon Cognito identity pool, but you need to replace the Resource value with the correct ARN for your Amazon Kinesis or Amazon Kinesis Firehose stream. You can apply policies at the IAM console. To learn more about IAM policies, see Using IAM.

To learn more about Amazon Kinesis-specific policies, see Controlling Access to Amazon Kinesis Resources with IAM.

To learn more about Amazon Kinesis Firehose policies, see Controlling Access with Amazon Kinesis Firehose.

Working with the API

Once you have credentials, you can use AWSKinesisRecorder with Amazon Kinesis. The following snippet returns a shared instance of the Amazon Kinesis service client:

1let kinesisRecorder = AWSKinesisRecorder.default()

You can use AWSFirehoseRecorder with Amazon Kinesis Firehose. The following snippet returns a shared instance of the Amazon Kinesis Firehose service client:

1let firehoseRecorder = AWSFirehoseRecorder.default()

Configure Kinesis:

You can configure AWSKinesisRecorder or AWSFirehoseRecorder through their properties:

1kinesisRecorder.diskAgeLimit = TimeInterval(30 * 24 * 60 * 60); // 30 days
2kinesisRecorder.diskByteLimit = UInt(10 * 1024 * 1024); // 10MB
3kinesisRecorder.notificationByteThreshold = UInt(5 * 1024 * 1024); // 5MB

The diskAgeLimit property sets the expiration for cached requests. When a request exceeds the limit, it's discarded. The default is no age limit. The diskByteLimit property holds the limit of the disk cache size in bytes. If the storage limit is exceeded, older requests are discarded. The default value is 5 MB. Setting the value to 0 means that there's no practical limit. The notificationByteThreshold property sets the point beyond which Kinesis issues a notification that the byte threshold has been reached. The default value is 0, meaning that by default Amazon Kinesis doesn't post the notification.

To see how much local storage is being used for Amazon Kinesis PutRecord requests, check the diskBytesUsed property.

With AWSKinesisRecorder created and configured, you can use saveRecord() to save records to local storage.

1let yourData = "Test_data".data(using: .utf8)
2kinesisRecorder.saveRecord(
3 yourData,
4 streamName: "YourStreamName"
5)

In the preceding example, you create an NSData object and save it locally. YourStreamName should be a string corresponding to the name of your Kinesis stream. You can create new streams in the Amazon Kinesis console.

Here is a similar snippet for Amazon Kinesis Firehose:

1let yourData = "Test_data".data(using: .utf8)
2firehoseRecorder.saveRecord(
3 yourData,
4 streamName: "YourStreamName"
5)

To submit all the records stored on the device, call submitAllRecords.

1kinesisRecorder.submitAllRecords()
2firehoseRecorder.submitAllRecords()

submitAllRecords sends all locally saved requests to the Amazon Kinesis service. Requests that are successfully sent will be deleted from the device. Requests that fail because the device is offline will be kept and submitted later. Invalid requests are deleted.

Both saveRecord and submitAllRecords are asynchronous operations, so you should ensure that saveRecord is complete before you invoke submitAllRecords. The following code sample shows the methods used correctly together.

1// Create an array to store a batch of objects.
2var tasks = Array<AWSTask<AnyObject>>()
3for i in 0...100 {
4 tasks.append(kinesisRecorder!.saveRecord(String(format: "TestString-%02d", i).data(using: .utf8), streamName: "YourStreamName")!)
5}
6
7AWSTask(forCompletionOfAllTasks: tasks).continueOnSuccessWith(block: { (task:AWSTask<AnyObject>) -> AWSTask<AnyObject>? in
8 return kinesisRecorder?.submitAllRecords()
9}).continueWith(block: { (task:AWSTask<AnyObject>) -> Any? in
10 if let error = task.error as? NSError {
11 print("Error: \(error)")
12 }
13 return nil
14})

To learn more about working with Amazon Kinesis, see the Amazon Kinesis Developer Resources.

To learn more about the Amazon Kinesis classes, see the class reference for AWSKinesisRecorder.

To learn more about the Amazon Kinesis Firehose classes, see the class reference for AWSFirehoseRecorder.