NEW: Free hands-on NATS workshops. Live sessions on NATS fundamentals, leaf nodes, AI agents on NATS, & more.
All posts

How to Connect Amazon Kinesis Data Streams to NATS JetStream

How to Connect Amazon Kinesis to NATS JetStream

You can connect Amazon Kinesis Data Streams to NATS JetStream by using Redpanda Connect as the bridge: Kinesis is configured as the input, NATS JetStream is configured as the output, and an optional Redpanda Connect pipeline maps or transforms messages before publishing them to a JetStream subject.

In this tutorial, you’ll build a vehicle telemetry pipeline that:

  1. Creates an Amazon Kinesis data stream named vehicle_data.
  2. Runs a local NATS server with JetStream enabled.
  3. Creates a JetStream stream named vehicle_data for the subject vehicle.data.
  4. Runs Redpanda Connect to consume from Kinesis and publish to JetStream.
  5. Produces sample vehicle data and verifies that transformed messages arrive in JetStream.

Rough architecture diagram

When should you connect Kinesis to NATS JetStream?

Use this pattern when you want to combine Amazon Kinesis ingestion with NATS-based application messaging or JetStream persistence. For example, a transportation company might ingest high-volume vehicle telemetry through Kinesis and then publish processed events into NATS JetStream for downstream services, analytics, or operational workflows.

Redpanda Connect is a straightforward way to integrate the two systems because it provides ready-to-use components for both Kinesis input and NATS JetStream output, configured declaratively in YAML.

What are NATS JetStream, Kinesis, and Redpanda Connect doing in this tutorial?

Short answer: Kinesis is the source of vehicle events, Redpanda Connect reads and transforms those events, and NATS JetStream stores them on the vehicle.data subject.

  • Amazon Kinesis Data Streams collects and stores real-time streaming data.
  • NATS provides a lightweight, high-performance communication backbone for distributed systems.
  • NATS JetStream extends NATS with persistence and streaming functionality.
  • Redpanda Connect connects the source and sink, and can also map or transform message payloads in flight.

Prerequisites

To complete the tutorial, you’ll need the following:

How do you create the Kinesis data stream?

First, create the Kinesis data stream that Redpanda Connect will consume from.

  1. Open the AWS console.
  2. Search for Kinesis and select the Kinesis service.

Searching for Kinesis

  1. Click Create data stream.
  2. Enter vehicle_data as the stream name.
  3. Leave the capacity mode as On-demand so the stream can automatically scale for the tutorial workload.

Creating the data stream

  1. Click Create data stream to create the stream in your selected AWS Region.

How do you run NATS with JetStream enabled?

Next, run a local NATS server with JetStream enabled. This NATS server will receive messages from Redpanda Connect.

Open a terminal and run:

Terminal window
docker network create demo-network && \
docker run --name nats --rm -d \
--network demo-network \
-p 4222:4222 -p 8222:8222 \
nats --http_port 8222 --jetstream

Then open http://localhost:8222 in your browser. Click the JetStream link to view JetStream information. On a freshly started server with no streams, the JetStream usage statistics look similar to this:

1
{
2
"streams": 0,
3
"consumers": 0,
4
"messages": 0,
5
"bytes": 0
6
}

At this point, streams, consumers, messages, and bytes are all 0 because you have not created a JetStream stream yet. These counters are server-wide totals, so they stay at 0 until a stream exists and starts receiving messages.

How do you create the NATS JetStream stream?

Create a JetStream stream named vehicle_data that stores messages published to the vehicle.data subject.

Run:

Terminal window
nats stream add vehicle_data \
--subjects "vehicle.data" \
--storage file \
--retention limits \
--max-msgs 10000 \
--replicas 1 \
--discard old \
--max-msgs-per-subject=-1 \
--max-bytes=-1 \
--max-age=-1 \
--max-msg-size=-1 \
--dupe-window=2m0s \
--no-allow-rollup \
--no-deny-delete \
--no-deny-purge

Note: The command includes stream configuration options such as replicas, retention, storage, limits, and discard policy. For a demo, the values shown here are sufficient. You can also run nats stream add vehicle_data without flags and let the NATS CLI prompt you for each value.

The output should be similar to the following:

1
Stream vehicle_data was created
2
3
Information for Stream vehicle_data created 2024-08-13 00:20:59
4
5
Subjects: vehicle.data
6
Replicas: 1
7
Storage: File
8
9
Options:
10
11
Retention: Limits
12
Acknowledgments: true
13
Discard Policy: Old
14
Duplicate Window: 2m0s
15
Direct Get: true
16
Allows Msg Delete: true
17
Allows Purge: true
18
Allows Rollups: false
19
20
Limits:
21
22
Maximum Messages: 10,000
23
Maximum Per Subject: unlimited
24
Maximum Bytes: unlimited
25
Maximum Age: unlimited
26
Maximum Message Size: unlimited
27
Maximum Consumers: unlimited
28
29
State:
30
31
Messages: 0
32
Bytes: 0 B
33
First Sequence: 0
34
Last Sequence: 0
35
Active Consumers: 0

Return to the JetStream monitoring page at http://localhost:8222. The stream count should now be 1.

How do you configure Redpanda Connect for Kinesis to JetStream?

Redpanda Connect uses an input for the source and an output for the sink. In this tutorial:

  • The input is Amazon Kinesis Data Streams.
  • The output is NATS JetStream.
  • The pipeline maps and transforms the JSON payload before publishing to NATS.

Create a file named connect.yaml in your preferred working directory:

1
input:
2
aws_kinesis:
3
streams: ["vehicle_data"]
4
dynamodb:
5
table: "vehicle_data"
6
create: true
7
region: "[YOUR_AWS_KINESIS_REGION]"
8
credentials:
9
id: "[YOUR_AWS_ACCESS_KEY_ID]"
10
secret: "[YOUR_AWS_CLIENT_SECRET]"
11
12
pipeline:
13
processors:
14
- mapping: |
15
root.vehicle_id = this.vehicle_id
16
root.timestamp = this.timestamp
17
root.speed = this.speed * 1.60934
18
root.fuel_level = this.fuel_level
19
root.latitude = this.location.latitude
20
root.longitude = this.location.longitude
21
22
output:
23
nats_jetstream:
24
urls: [nats://nats:4222]
25
subject: vehicle.data
26
headers:
27
Content-Type: application/json

Replace the placeholder values with your own AWS details:

  • [YOUR_AWS_KINESIS_REGION]: the AWS Region where you created the Kinesis stream.
  • [YOUR_AWS_ACCESS_KEY_ID]: your AWS access key ID.
  • [YOUR_AWS_CLIENT_SECRET]: your AWS secret access key.

For more information on AWS credentials, see Manage access keys for IAM users.

Why does the Redpanda Connect config include DynamoDB?

The Kinesis input configuration includes a dynamodb section:

1
dynamodb:
2
table: "vehicle_data"
3
create: true

As described in the Redpanda Connect Kinesis input documentation, this table is used for storing and accessing shard and consumer data. With create: true, Redpanda Connect creates the DynamoDB table automatically.

What does the pipeline transformation do?

The pipeline section maps the incoming vehicle JSON into the shape that will be written to JetStream. It also converts speed by multiplying it by 1.60934, and it flattens location.latitude and location.longitude into top-level latitude and longitude fields.

How do you run Redpanda Connect?

From the same directory as connect.yaml, run Redpanda Connect in a container on the same Docker network as NATS:

Terminal window
docker run --rm -it --name=redpanda-connect \
--network demo-network \
-v $(pwd)/connect.yaml:/connect.yaml \
docker.redpanda.com/redpandadata/connect \
run

You should see output similar to this:

1
INFO Running main config from file found in a default path @service=benthos benthos_version=4.30.1 path=connect.yaml
2
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos
3
INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=benthos
4
INFO Output type nats_jetstream is now active @service=benthos label="" path=root.output
5
INFO Input type aws_kinesis is now active @service=benthos label="" path=root.input

Leave this terminal window open. Redpanda Connect must keep running to continue consuming from Kinesis and publishing to JetStream.

How do you produce test data into Kinesis?

To verify the pipeline, run the sample vehicle data producer application.

Clone the demo repository:

Terminal window
git clone https://github.com/SystemCraftsman/nats-redpanda-connect-kinesis-demo.git

Navigate into the producer app directory:

Terminal window
cd nats-redpanda-connect-kinesis-demo/apps/VehicleDataProducer

Activate your Python virtual environment, then install the dependencies:

Terminal window
pip install -r requirements.txt

This installs boto3, which the producer application uses to interact with AWS services such as Kinesis.

Before running the app, export your AWS configuration:

Terminal window
export AWS_DEFAULT_REGION='[YOUR_AWS_KINESIS_REGION]'
export AWS_ACCESS_KEY_ID='[YOUR_AWS_ACCESS_KEY_ID]'
export AWS_SECRET_ACCESS_KEY='[YOUR_AWS_CLIENT_SECRET]'

Then run the producer:

Terminal window
python app.py

You should see output similar to this:

1
Sent data for vehicle_1: {'ShardId': 'shardId-000000000002', 'SequenceNumber': '49654821927667342525694265153029849795926049811702218786', 'ResponseMetadata': {'RequestId': 'e49b68e4-0cee-c87d-bbae-b630230dc8da', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e49b68e4-0cee-c87d-bbae-b630230dc8da', 'x-amz-id-2': 'NO+p4Q7NEHzixBxZLMWgRKx54EqpNVCbZR3+j2I1Uv9+Z5k/HPYiO642nAfucAWcZ5wEdnzp88OJI4hP/Nmu4lmJD9vLOCOm', 'date': 'Tue, 13 Aug 2024 22:22:12 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
2
Sent data for vehicle_2: {'ShardId': 'shardId-000000000002', 'SequenceNumber': '49654821927667342525694265153143488822969824954124599330', 'ResponseMetadata': {'RequestId': 'f333697a-3d03-3eb5-ac06-b7af12e03e12', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f333697a-3d03-3eb5-ac06-b7af12e03e12', 'x-amz-id-2': 'ujmF9zuGemZD2HszAOUSRw0IFrRT+sQPLNXYZjIYNz8muVS6xhmqa0f1BDShaW/bk1HLwnrQTZxPM9D3h7oOmbAKtJOUJY/A', 'date': 'Tue, 13 Aug 2024 22:22:13 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
3
Sent data for vehicle_3: {'ShardId': 'shardId-000000000000', 'SequenceNumber': '49654821927979552958473693877257970718944511224426004482', 'ResponseMetadata': {'RequestId': 'd98518fd-c046-d9f4-86b0-c628efa5d953', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd98518fd-c046-d9f4-86b0-c628efa5d953', 'x-amz-id-2': 'NrFz4jgNRb+7XJtsGCngOAMI3aMueo+7868GcYjS6ZQriFIIWta+5pOBsJxNLXpQwqLFKCPO1+zp0YxP9B65DQ49Y0Bl2XIq', 'date': 'Tue, 13 Aug 2024 22:22:13 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
4
...output omitted...

How do you verify that messages reached NATS JetStream?

Use the NATS CLI to view messages in the vehicle_data stream:

Terminal window
nats stream view vehicle_data

You should see messages published on the vehicle.data subject with the Content-Type: application/json header and the transformed payload:

1
[152913] Subject: vehicle.data Received: 2024-08-13T23:24:53+03:00
2
3
Content-Type: application/json
4
5
{"fuel_level":24.24,"latitude":-76.594167,"longitude":106.770849,"speed":76.2183424,"timestamp":"2024-08-13T20:24:52.426587Z","vehicle_id":"vehicle_3"}
6
7
[152914] Subject: vehicle.data Received: 2024-08-13T23:24:54+03:00
8
9
Content-Type: application/json
10
11
{"fuel_level":21.14,"latitude":7.966871,"longitude":-64.717011,"speed":52.4483906,"timestamp":"2024-08-13T20:24:53.636805Z","vehicle_id":"vehicle_1"}
12
13
[152915] Subject: vehicle.data Received: 2024-08-13T23:24:54+03:00
14
15
Content-Type: application/json
16
17
{"fuel_level":26.42,"latitude":-75.722179,"longitude":55.893106,"speed":100.2136018,"timestamp":"2024-08-13T20:24:53.842285Z","vehicle_id":"vehicle_2"}
18
19
...output omitted...
20
21
? Next Page? (Y/n)

This confirms that Redpanda Connect consumed records from Kinesis, applied the mapping and transformation, and published the processed messages to NATS JetStream.

Troubleshooting

Why does the Python producer report missing AWS credentials?

If you run python app.py before setting AWS environment variables, you may see an error similar to this:

1
botocore.exceptions.PartialCredentialsError: Partial credentials found in env, missing: AWS_SECRET_ACCESS_KEY

Export the required variables and run the app again:

Terminal window
export AWS_DEFAULT_REGION='[YOUR_AWS_KINESIS_REGION]'
export AWS_ACCESS_KEY_ID='[YOUR_AWS_ACCESS_KEY_ID]'
export AWS_SECRET_ACCESS_KEY='[YOUR_AWS_CLIENT_SECRET]'
python app.py

Why is the JetStream stream count still zero?

If the JetStream monitoring endpoint still shows streams: 0, the JetStream stream has not been created. Re-run the nats stream add vehicle_data command and confirm that the CLI reports Stream vehicle_data was created.

Why are no messages visible in nats stream view vehicle_data?

Check the pipeline in this order:

  1. Confirm the Kinesis stream is named vehicle_data.
  2. Confirm Redpanda Connect is still running.
  3. Confirm the Redpanda Connect logs show both aws_kinesis and nats_jetstream as active.
  4. Confirm the NATS URL in connect.yaml is nats://nats:4222 when Redpanda Connect and NATS are on the same Docker network.
  5. Confirm the JetStream stream is configured for the vehicle.data subject.
  6. Confirm the Python producer is sending records successfully to Kinesis.

What you built

You connected Amazon Kinesis Data Streams to NATS JetStream using Redpanda Connect. Kinesis acted as the source, Redpanda Connect consumed and transformed the vehicle telemetry records, and JetStream persisted the processed messages on the vehicle.data subject.

To skip managing your own NATS infrastructure, Synadia Cloud gives you a fully managed NATS service with JetStream ready to go. Sign up for free and swap the local Docker server in this tutorial for a Synadia Cloud endpoint.


Want help from the NATS experts? Meet with our architects to get help tailored to your use case and environment.

Get the NATS Newsletter

News and content from across the community


Cancel