Auto-Scaling Kinesis Data Streams Applications on Kubernetes
This guide explores how to auto-scale your Kinesis Data Streams consumer applications on Kubernetes so you can save on costs and improve resource efficiency.
Want to learn how to auto-scale your Kinesis Data Streams consumer applications on Kubernetes so you can save on costs and improve resource efficiency? This blog offers a step-by-step guide on how to do just that.
By leveraging Kubernetes for auto-scaling Kinesis consumer applications, you can benefit from its built-in features such as the Horizontal Pod Autoscaler.
What Are Amazon Kinesis and Kinesis Data Streams?
Amazon Kinesis is a platform for real-time data processing, ingestion, and analysis. Kinesis Data Streams is a Serverless streaming data service (part of the Kinesis streaming data platform, along with Kinesis Data Firehose, Kinesis Video Streams, and Kinesis Data Analytics.
Kinesis Data Streams can scale elastically and continuously adapt to changes in data ingestion rates and stream consumption rates. It can be used to build real-time data analytics applications, real-time dashboards, and real-time data pipelines.
Let’s start off with an overview of some of the key concepts of Kinesis Data Streams.
Kinesis Data Streams: High-Level Architecture
- A Kinesis data stream is a set of shards. Each shard has a sequence of data records.
- The producers continually push data to Kinesis Data Streams, and the consumers process the data in real-time.
- A partition key is used to group data by shard within a stream.
- Kinesis Data Streams segregates the data records belonging to a stream into multiple shards.
- It uses the partition key that is associated with each data record to determine which shard a given data record belongs to.
- Consumers get records from Amazon Kinesis Data Streams, process them, and store their results in Amazon DynamoDB, Amazon Redshift, Amazon S3, etc.
- These consumers are also known as Amazon Kinesis Data Streams Application.
-
One
of the methods of developing custom consumer applications that can
process data from KDS data streams is to use the Kinesis Client
Library (
KCL
).
How Do Kinesis Consumer Applications Scale Horizontally?
The Kinesis Client Library ensures there is a record processor
running for every shard and processing data from that shard. KCL
helps
you consume and process data from a Kinesis data stream by taking care of many
of the complex tasks associated with distributed computing and scalability. It
connects to the data stream, enumerates the shards within the data stream, and
uses leases to coordinate shard associations with its consumer applications.
A record processor is instantiated for every shard it
manages. KCL
pulls data records from the
data stream, pushes the records to the corresponding record processor, and
checkpoints processed records. More importantly, it balances shard-worker
associations (leases) when the worker instance count changes or when the data
stream is re-sharded (shards are split or merged). This means that you are able
to scale your Kinesis Data Streams application by simply adding more instances
since KCL
will automatically balance the
shards across the instances.
But, you still need a way to scale your applications when the load increases. Of course, you could do it manually or build a custom solution to get this done.
This is where Kubernetes Event-driven Autoscaling (KEDA)
can help. KEDA
is a Kubernetes-based
event-driven autoscaling component that can monitor event sources like Kinesis
and scale the underlying Deployment
s (and Pod
s) based on
the number of events needing to be processed.
To witness auto-scaling in action, you will work with a Java
application that uses the Kinesis Client Library (KCL
) 2.x to
consume data from a Kinesis Data Stream. It will be deployed to a Kubernetes
cluster on Amazon EKS and
will be scaled automatically using KEDA
. The
application includes an implementation of the ShardRecordProcessor
that
processes data from the Kinesis stream and persists it to a DynamoDB table. We
will use the AWS CLI to produce data to the Kinesis stream and observe the
scaling of the application.
Before, we dive in, here is a quick overview of KEDA
.
What Is KEDA?
KEDA
is an open-source CNCF
project that's built on top of native Kubernetes primitives such as the
Horizontal Pod Autoscaler and can be added to any Kubernetes cluster. Here is a
high-level overview of its key components (you can refer to the KEDA documentation for a
deep dive):
1. The keda-operator-metrics-apiserver
component
in KEDA
acts as a Kubernetes metrics server that
exposes metrics for the Horizontal Pod Autoscaler
2. A KEDA Scaler integrates with an external system (such as Redis) to fetch these metrics (e.g., length of a List) to drive auto-scaling of any container in Kubernetes based on the number of events needing to be processed.
3. The role of
the keda-operator
component is to activate and deactivateDeployment
; i.e.,
scale to and from zero.
You will see the Kinesis Stream KEDA scaler in action that scales based on the shard count of AWS Kinesis Stream.
Now let's move on to the practical part of this post.
Prerequisites
In addition to an AWS account, you will need to have the AWS CLI, kubectl, Docker, Java 11, and Maven installed.
Setup an EKS Cluster, Create a DynamoDB Table and a Kinesis Data Stream
There are a variety of ways in which you can create an Amazon EKS cluster. I prefer
using the eksctl CLI because of the convenience it
offers. Creating an EKS cluster using eksctl
can be
as easy as this:
eksctl create cluster --name --region
For details, refer to the Getting started with Amazon EKS – eksctl documentation.
Create a DynamoDB table to persist application data. You can use the AWS CLI to create a table with the following command:
aws dynamodb create-table \
--table-name users \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
Create a Kinesis stream with two shards using the AWS CLI:
aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
Clone this GitHub repository and change it to the right directory:
git clone https://github.com/abhirockzz/kinesis-keda-autoscaling
cd kinesis-keda-autoscaling
Ok, let's get started!
Setup and Configure KEDA on EKS
For the purposes of this tutorial, you will use YAML files to deploy KEDA
. But you
could also use Helm charts.
Install KEDA
:
# update version 2.8.2 if required
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
Verify the installation:
# check Custom Resource Definitions
kubectl get crd
# check KEDA Deployments
kubectl get deployment -n keda
# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
Configure IAM Roles
The KEDA operator as well as the Kinesis consumer application
need to invoke AWS APIs. Since both will run as Deployment
s in EKS, we
will use IAM Roles for Service
Accounts (IRSA) to provide the necessary permissions.
In this particular scenario:
-
KEDA
operator needs to be able to get the shard count for a Kinesis stream: it does so by usingDescribeStreamSummary
API. - The application (KCL library to be specific) needs to interact with Kinesis and DynamoDB: it needs a bunch of IAM permissions to do so.
Configure IRSA for KEDA Operator
Set your AWS Account ID and OIDC Identity provider as environment variables:
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1
OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
Create a JSON
file
with Trusted Entities for the role:
read -r -d '' TRUST_RELATIONSHIP < <>
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json
Now, create the IAM role and attach the policy (take a look
at policy_kinesis_keda.json
file
for details):
export ROLE_NAME=keda-operator-kinesis-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"
aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy
Associate the IAM role and Service Account:
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/keda-operator -n keda
You will need to restart KEDA
operator Deployment
for
this to take effect:
kubectl rollout restart deployment.apps/keda-operator -n keda
# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"
# expected output
AWS_STS_REGIONAL_ENDPOINTS: regional
AWS_DEFAULT_REGION: us-east-1
AWS_REGION: us-east-1
AWS_ROLE_ARN: arn:aws:iam:::role/keda-operator-kinesis-role
AWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
Configure IRSA for the KCL Consumer Application
Start by creating a Kubernetes Service Account:
kubectl apply -f - < <>
apiVersion: v1
kind: ServiceAccount
meta "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user3", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user4", "city":"seattle"}' | base64)
The KCL application persists each record to a target DynamoDB
table
(which is named users
in
this case). You can check the table to verify the records.
aws dynamodb scan --table-name users
Notice that the value for the processed_by
attribute?
It's the same as the KCL consumer Pod
. This will
make it easier for us to verify the end-to-end autoscaling process.
Create the KEDA Scaler for Kinesis
Here is the ScaledObject
definition.
Notice that it's targeting the kcl-consumer
Deployment
(the
one we just created) and the shardCount
is set
to 1
:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
meta "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64)
Verify the value for the processed_by
attribute.
Since we have scaled out to two Pod
s, the value
should be different for each record since each Pod
will
process a subset of the records from the Kinesis stream.
Increase Kinesis Stream Capacity
Let's scale out the number of shards from two to three and
continue to monitor KCL
application
auto-scaling.
aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
Once Kinesis re-sharding is complete, the KEDA
scaler
will spring into action and scale out the KCL application to three Pod
s.
kubectl get pods -l=app=kcl-consumer -w
Just like before, confirm that the Kinesis shard lease has been
updated in the kinesis-keda-demo
control
table in DynamoDB
. Check the leaseOwner
attribute.
Continue to send more data to the Kinesis stream. As expected,
the Pod
s will share the record processing
and this will reflect in the processed_by
attribute
in the users
table.
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user9", "city":"new york"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user10", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user11", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user12", "city":"seattle"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user14", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user15", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user16", "city":"seattle"}' | base64)
Scale Down
So far, we have only scaled in one direction. What happens when we reduce the shard capacity of the Kinesis stream? Try this out for yourself: reduce the shard count from three to two and see what happens to the KCL application.
Once you have verified the end-to-end solution, you should clean up the resources to avoid incurring any additional charges.
Delete Resources
Delete the EKS cluster, Kinesis stream, and DynamoDB table.
eksctl delete cluster --name keda-kinesis-demo
aws kinesis delete-stream --stream-name kinesis-keda-demo
aws dynamodb delete-table --table-name users
Conclusion
In this post, you learned how to use KEDA
to
auto-scale a KCL
application that consumes data
from a Kinesis stream.
You can configure the KEDA scaler as per your application
requirements. For example, you can set the shardCount
to 3
and
have one Pod
for every three shards in your
Kinesis stream. However, if you want to maintain a one-to-one mapping, you can
set the shardCount
to 1
and KCL
will
take care of distributed coordination and lease assignment, thereby ensuring
that each Pod
has one instance of the record
processor. This is an effective approach that allows you to scale out your
Kinesis stream processing pipeline to meet the demands of your applications.
We Provide consulting, implementation, and management services on DevOps, DevSecOps, DataOps, Cloud, Automated Ops, Microservices, Infrastructure, and Security
Services offered by us: https://www.zippyops.com/services
Our Products: https://www.zippyops.com/products
Our Solutions: https://www.zippyops.com/solutions
For Demo, videos check out YouTube Playlist: https://www.youtube.com/watch?v=4FYvPooN_Tg&list=PLCJ3JpanNyCfXlHahZhYgJH9-rV6ouPro
If this seems interesting, please email us at [email protected] for a call.
Recent Comments
No comments
Leave a Comment
We will be happy to hear what you think about this post