Over 10 years we help companies reach their financial and branding goals. Engitech is a values-driven technology agency dedicated.

Gallery

Contacts

Bhubaneswar, India

info@krescitus.com

+1 -800-456-478-23

Blogs Technology

What is Kinesis Data Stream?

  • Kinesis Data Stream is used to collect a large stream of data records in real time. You can create data-processing applications, known as Kinesis Data Streams applications. A typical Kinesis Data Streams application reads data from a data stream as data records. 
  • Amazon Kinesis Data Stream provides two APIs for put data into an Amazon Kinesis stream: Put Record and Put Records. Put Record allows a single data record within an API call as well as Put Records allows multiple data records within an API call.

What is  Kinesis Data FireHose?

  • Kinesis Data FireHose is a fully managed service for delivering real-time streaming data  to destinations such as Amazon Simple Storage Service, DynamoDB.

 

What is AWS Lambda?

  • AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of Amazon Web Services. In AWS Lambda the code is executed based on the response of events in AWS services such as add/delete files in the S3 bucket, and HTTP requests from the Amazon API gateway.

What is the IAM Role?

  •  IAM Roles are entities you can create and assign specific permissions to perform actions in AWS. When your trusted identities assume IAM roles, they grant only the permissions scoped by those IAM roles.

What is S3?

  • S3 is an object storage service that offers scalability, data availability, security, and performance. You can use Amazon S3 to store and retrieve data anytime anywhere.

But Now We can go towards practically checking How it works. Basically, we have some of the few steps to work with real-time streaming data.

  • A role 
  • A kinesis stream
  • A lambda function to write data to the stream
  • A lambda function to read data from the stream
  • A bucket where you can store records

 

To create a role go to the IAM console and Choose a role.

  • Then give your role name and Use case and click next.

  • Then find related services AmazonKinesisFullAccess, AmazonKinesisFirehoseFullAccess. Click again Next.

  • Give your Role name Review the details and click Create role.

  • Now Check your Role available within Role lists.

Go to the aws kinesis stream console then create your Data streams.

  • Choose the name of your kinesis stream. Then click Create Data Stream you can create your stream right now. and it looks like this.

For Creating a Lambda  Function go to the Lambda console.

  • Let’s proceed with the Lambda function Choose Author from scratch. Give the name of your function, Select the programming language as Python (There is also more programming language available as Node.js, Go, Python, Ruby, Java, C#, and .NET).
  •  IAM roles grant permissions to Lambda functions. Click on create function then your function is ready to work now.
  • Then For  Permissions expand Change default execution role and select Use an existing role
  • Click Create function

After your Function creates its Redirect to this page.

  • Now write the code on how lambda can write to kinesis stream data 
  • Change its name to write-kstream.py
  • Replace the below sample code with the given code. 

Then First the writing part write-kstream.py


import boto3
from datetime import datetime
import calendar
import random
import time
import json

stream_name = "your_stream_name"
k_client = boto3.client("kinesis", region_name="ap-south-1")

def lambda_handler(event, context):
    for i in range(10):
        property_value = random.randint(0, 100)
        property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
        the_data = "testString" + str(property_value)

        # write the data to the stream
        put_to_stream(the_data, property_value, property_timestamp)

        # wait for 1 second
        time.sleep(1)


def put_to_stream(the_data, property_value, property_timestamp):
    payload = {
        "prop": str(property_value),
        "timestamp": str(property_timestamp),
        "the_data": the_data,
    }
    # print (payload)
    put_response = k_client.put_record(
        StreamName=stream_name, Data=json.dumps(payload), PartitionKey=the_data
    )

    

After successfully running that code you can check that on your kinesis stream metrics and the flow of data.

Now go to your kinesis Data stream and check how data is flowing through the kinesis stream.

Now Create another Lambda function that can read data from the Kinesis Data stream.

  • You can follow the same process as we do above.
  • After your function is created, change the code as given below.
import boto3
from datetime import datetime
import time

def lambda_handler(event, context):
    stream_name = "your_stream_name"

    kinesis_client = boto3.client("kinesis", region_name="ap-south-1")
    response = kinesis_client.describe_stream(StreamName=stream_name)
    my_shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]

    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType="LATEST"
    )
    my_shard_iterator = shard_iterator["ShardIterator"]
    record_response = kinesis_client.get_records(
        ShardIterator=my_shard_iterator, Limit=100
    )

    while "NextShardIterator" in record_response:
        record_response = kinesis_client.get_records(
            ShardIterator=record_response["NextShardIterator"], Limit=2
        )
        if record_response["Records"]:
            print(record_response)
            time.sleep(1)

After successfully running that code you can check that on your kinesis stream metrics and the flow of data.
Now go to your kinesis Data stream and check how data is flowing through the kinesis stream.

Why do we need Bucket?

  • By using the lambda function we read data from the Kinesis Datastream and also write the data into the Kinesis Data stream.
  • So now we have to send that data to the bucket for storing purposes. that stored data can be accessed any time.
  • Then create an S3 bucket. by going to S3 Console and clicking Create bucket.

Give a unique name to your Bucket and Choose region, go to bottom then click Create bucket.

Now Your Demokinesis Bucket Created Successfully.

Why do we need a delivery stream?

  • Kinesis Data Firehose is a delivery stream. When a  web server sends data to a producer. You can also need your Kinesis Data Firehose delivery stream to automatically read data from the existing Kinesis data stream, and load it into your destination.

Then to create a delivery stream you have to go to the kinesis console then choose delivery stream and after that choose the source as per your Kinesis Data stream name and destination as Your S3 bucket name.

We can now test our delivery stream by putting records through Directly to kinesis Data streams.

Here you can check how much data gets from the kinesis data stream and put it to s3.

After Data transfer to its destination path go and check there is automatically a subfolder generated with given records within the selected bucket.

Conclusion:

In this post, we demonstrated how real-time clickstream events work with AWS Kinesis Data Stream, and Kinesis Data Firehose and also put its data to the destination path.