Kinesis Streaming Data Ingestion into Amazon Redshift

INTRODUCTION

Rajas Walavalkar
6 min readJul 21, 2023

Streaming data is widely used in various domains and has variety of applications in our day to day software and businesses. One of the major challenges that we face while handling the streaming data is regarding its Volume & Velocity. When we look at Volume it is majorly defined in records/sec that will be ingested and when we consider velocity its how fast we can consume this data to generate insights. Amazon Kinesis Data Streams is one of the widely used streaming service which is managed by AWS and Amazon Redshift is an enterprise wide Data Warehousing solution that AWS offers.

Interestingly in this blog, we are going to see how we can combine the power of Amazon Kinesis to ingest streaming data and use the analytical power of Amazon Redshift for getting insights from the streaming data in real time.

Pre-requisites

  1. AWS account access to create IAM roles & policies
  2. Already deployed Amazon Redshift cluster

Lets start with our hands-on lab to create a streaming analytics solution on AWS

Step 1: Create and Deploy Kinesis Data Streams

For this, lets open up AWS Console and search Kinesis on the search bar and go the Kinesis Console. Then click on Create Data Streams button which open up a form to fill in the details of the Kinesis Data Streams. Provide the following details

  • Data stream name : Kinesis-DataStreams-Source
  • Data stream capacity : Select Provisioned option and then keep the Provisioned shard value to 1

Click on the Create data Streams at the end of the form and this will create your kinesis data streams in few minutes.

Successful creation of a Kinesis Data Streams

Step 2: Create an IAM role and Policy for Amazon Redshift

Open up the IAM console on your AWS account and go to the Roles section from the right hand side panel and then click on Create Role button on the top right corner and follow the bellow steps to create a role

  • On the Select trusted entity page, keep the Trusted entity Type as AWS Service. In the Use cases for other AWS services: dropdown, please select Redshift and within that select Redshift — Customizable option and click on Next button
  • On the Add permissions page, click on Create Policy button, this will open up a new tab in the same window in where we will define our IAM policy document. Over here click on the JSON button on the top right corner. Here you copy paste the below IAM policy JSON document as it is.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadStream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "*"
},
{
"Sid": "ListStream",
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
  • Then click on Next button and then provide a name to the Policy as KinesisDataStreams-ReadAccess-Policy and then click on Create Policy button
  • Now lets come back to our previous tab of Create IAM role, and hit refresh button besides the Create Policy button. Then copy paste the name of the policy which we created above and select the policy and clieck on Next button
  • On this page provide a name to the IAM role as KinesisStream-RedshiftAccess-Policy and then click on Create Role button at the bottom.

You can seacrh the above IAM role created on the console search bar it should show up with all the configurations that we have made

Successful Creation of IAM Role and IAM Policy For Redshift

Step 3: Attach the above IAM role to Amazon Redshift cluster

Open up Amazon Redshift console on the AWS account and then select the Redshift cluster which you will be using to perform analytics on the Streaming data. Once you click on the Redshift cluster -

  • Click on the Action button on the top right corner and then select the Manage IAM roles option from the dropdown
  • On this page select the IAM role which we created in the previous step KinesisStream-RedshiftAccess-Policy then click on Associate IAM role button and then click on Save Changes button
  • The status of the Redshift cluster will change to Modifying which will take some time to change it to be Available. Once the cluster is available then you can connect to the cluster using Query data option

Step 4: Create Schema and Materialized View on Redshift as Kinesis Data Stream as Source

Once you are in the query editor of Redshift, here you can run the following query to create an external schema for all the Kinesis data streams. Once you run this query you can see the Kinesis data stream name which we created in Step 1 within the table folder

CREATE EXTERNAL SCHEMA kinesis_schema
FROM KINESIS
IAM_ROLE 'arn:aws:iam::<account-id>:role/KinesisStream-RedshiftAccess-Policy';
Successfull creation of Kinesis Schema within Redshift

Similarly, now lets create a Materialized view which leverages the Kinesis Data Stream as a table. Copy paste the following query and run it as it is

CREATE MATERIALIZED VIEW machine_sensors_streams AS
SELECT
approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
json_parse( from_varbyte(kinesis_data, 'utf-8')) as payload
FROM "dev"."kinesis_schema"."<Kinesis-data-stream-name>";

Step 5: Ingest Data into Kinesis Streams using a Python Script

Now, lets test our entire solution by Ingesting data into the Kinesis Data Streams using and automated Python script. For this you can open a cloudshell within your AWS account and then save and upload the following Python script into the cloudshell environment.

import boto3
import random
import datetime as dt

client = boto3.client('kinesis', region_name='us-east-1')

mach=['Body Makers','Palletizers','Cupping System','Decorator']

date = dt.datetime.now()

for x in range(1, 6):
id = random.randint(101, 104)
machine = random.choice(mach)
v = x * random.randint(1, 4)
t = x * random.randint(1, 3)
p = x * random.randint(4,7)
mydata = '{ "datetime": "' + str(date) + '", "sensor_id": ' + str(id) + ', "machine_name": "' + str(machine) + '", "vibration": ' + str(v) + ', "temperature": ' + str(t) + ', "pressure": ' + str(p) + '}'
date += dt.timedelta(minutes=2)
print(mydata)
partitionkey = random.randint(10, 100);
response = client.put_record(StreamName='<Kinesis-data-stream-name>', Data=mydata, PartitionKey=str(partitionkey))

print("Ingestion Done")

Once the file is uploaded then you can run the python script by using the following command. This will ingest records into the Kinesis Data streams

python3 <python-script-name>.py

We can even check the data which is ingested into the Kinesis Data Streams by checking the monitoring graphs on the Kinesis Console

Step 5: Query the Streaming data using the Materialized view

Before we query the kinesis stream data, we will have to refresh the materialized view using the following command

REFRESH MATERIALIZED VIEW <VIEW_NAME>;

Once the Materialized View is refreshed, we can run the following query and can see the output which we are getting from the Kinesis Data Sterams

Select * from <VIEW_NAME>;

You can see the output of the above query in the following image, which shows how the data which was ingested into the Kinesis streams can be queries by defining a Materialized view and then performing further analytical operations as per the business use case.

CONCLUSION:

This blog gives you a good start to understand how we can leverage multiple services together to get a complete solution architecture for streaming data pipelines. Here we saw, that its easy and in very less time we can setup sources of streaming data get them into a warehouse and perform real-time data analytics. The beauty of this feature is that it works with other streaming sources as well like Apache Kafka and Amazon MSK. Also if you take a look carefully there is no requirement to write loading scripts to ingest data into warehouse for getting real time data available for analytics. We can actually take this architecture and convert into a production scale solution by adding proper monitoring and by placing BI tools as consumers of the materialized views which can reflect real time data on the reporting dashboards.

REFERENCES:

  1. Redshift Streaming Ingestion — https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion.html
  2. KDS to Redshift — https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion-getting-started.html
  3. Create Kinesis Data Streams — https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl-create-stream.html
  4. Kinesis Boto3 Producer — https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html
  5. Refresh Materialized view — https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-refresh-sql-command.html

--

--

Rajas Walavalkar

Technical Architect - Data & AWS Ambassador at Quantiphi Analytics. Worked on ETL, Data Warehouses, Big Data (AWS Glue, Spark), BI & Dashboarding (D&A).