You can connect Amazon Kinesis Data Streams to NATS JetStream by using Redpanda Connect as the bridge: Kinesis is configured as the input, NATS JetStream is configured as the output, and an optional Redpanda Connect pipeline maps or transforms messages before publishing them to a JetStream subject.
In this tutorial, you’ll build a vehicle telemetry pipeline that:
vehicle_data.vehicle_data for the subject vehicle.data.
Use this pattern when you want to combine Amazon Kinesis ingestion with NATS-based application messaging or JetStream persistence. For example, a transportation company might ingest high-volume vehicle telemetry through Kinesis and then publish processed events into NATS JetStream for downstream services, analytics, or operational workflows.
Redpanda Connect is a straightforward way to integrate the two systems because it provides ready-to-use components for both Kinesis input and NATS JetStream output, configured declaratively in YAML.
Short answer: Kinesis is the source of vehicle events, Redpanda Connect reads and transforms those events, and NATS JetStream stores them on the vehicle.data subject.
To complete the tutorial, you’ll need the following:
First, create the Kinesis data stream that Redpanda Connect will consume from.

vehicle_data as the stream name.
Next, run a local NATS server with JetStream enabled. This NATS server will receive messages from Redpanda Connect.
Open a terminal and run:
docker network create demo-network && \docker run --name nats --rm -d \ --network demo-network \ -p 4222:4222 -p 8222:8222 \ nats --http_port 8222 --jetstreamThen open http://localhost:8222 in your browser. Click the JetStream link to view JetStream information. On a freshly started server with no streams, the JetStream usage statistics look similar to this:
1{2 "streams": 0,3 "consumers": 0,4 "messages": 0,5 "bytes": 06}At this point, streams, consumers, messages, and bytes are all 0 because you have not created a JetStream stream yet. These counters are server-wide totals, so they stay at 0 until a stream exists and starts receiving messages.
Create a JetStream stream named vehicle_data that stores messages published to the vehicle.data subject.
Run:
nats stream add vehicle_data \ --subjects "vehicle.data" \ --storage file \ --retention limits \ --max-msgs 10000 \ --replicas 1 \ --discard old \ --max-msgs-per-subject=-1 \ --max-bytes=-1 \ --max-age=-1 \ --max-msg-size=-1 \ --dupe-window=2m0s \ --no-allow-rollup \ --no-deny-delete \ --no-deny-purgeNote: The command includes stream configuration options such as replicas, retention, storage, limits, and discard policy. For a demo, the values shown here are sufficient. You can also run
nats stream add vehicle_datawithout flags and let the NATS CLI prompt you for each value.
The output should be similar to the following:
1Stream vehicle_data was created2
3Information for Stream vehicle_data created 2024-08-13 00:20:594
5 Subjects: vehicle.data6 Replicas: 17 Storage: File8
9Options:10
11 Retention: Limits12 Acknowledgments: true13 Discard Policy: Old14 Duplicate Window: 2m0s15 Direct Get: true16 Allows Msg Delete: true17 Allows Purge: true18 Allows Rollups: false19
20Limits:21
22 Maximum Messages: 10,00023 Maximum Per Subject: unlimited24 Maximum Bytes: unlimited25 Maximum Age: unlimited26 Maximum Message Size: unlimited27 Maximum Consumers: unlimited28
29State:30
31 Messages: 032 Bytes: 0 B33 First Sequence: 034 Last Sequence: 035 Active Consumers: 0Return to the JetStream monitoring page at http://localhost:8222. The stream count should now be 1.
Redpanda Connect uses an input for the source and an output for the sink. In this tutorial:
Create a file named connect.yaml in your preferred working directory:
1input:2 aws_kinesis:3 streams: ["vehicle_data"]4 dynamodb:5 table: "vehicle_data"6 create: true7 region: "[YOUR_AWS_KINESIS_REGION]"8 credentials:9 id: "[YOUR_AWS_ACCESS_KEY_ID]"10 secret: "[YOUR_AWS_CLIENT_SECRET]"11
12pipeline:13 processors:14 - mapping: |15 root.vehicle_id = this.vehicle_id16 root.timestamp = this.timestamp17 root.speed = this.speed * 1.6093418 root.fuel_level = this.fuel_level19 root.latitude = this.location.latitude20 root.longitude = this.location.longitude21
22output:23 nats_jetstream:24 urls: [nats://nats:4222]25 subject: vehicle.data26 headers:27 Content-Type: application/jsonReplace the placeholder values with your own AWS details:
[YOUR_AWS_KINESIS_REGION]: the AWS Region where you created the Kinesis stream.[YOUR_AWS_ACCESS_KEY_ID]: your AWS access key ID.[YOUR_AWS_CLIENT_SECRET]: your AWS secret access key.For more information on AWS credentials, see Manage access keys for IAM users.
The Kinesis input configuration includes a dynamodb section:
1dynamodb:2 table: "vehicle_data"3 create: trueAs described in the Redpanda Connect Kinesis input documentation, this table is used for storing and accessing shard and consumer data. With create: true, Redpanda Connect creates the DynamoDB table automatically.
The pipeline section maps the incoming vehicle JSON into the shape that will be written to JetStream. It also converts speed by multiplying it by 1.60934, and it flattens location.latitude and location.longitude into top-level latitude and longitude fields.
From the same directory as connect.yaml, run Redpanda Connect in a container on the same Docker network as NATS:
docker run --rm -it --name=redpanda-connect \ --network demo-network \ -v $(pwd)/connect.yaml:/connect.yaml \ docker.redpanda.com/redpandadata/connect \ runYou should see output similar to this:
1INFO Running main config from file found in a default path @service=benthos benthos_version=4.30.1 path=connect.yaml2INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos3INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=benthos4INFO Output type nats_jetstream is now active @service=benthos label="" path=root.output5INFO Input type aws_kinesis is now active @service=benthos label="" path=root.inputLeave this terminal window open. Redpanda Connect must keep running to continue consuming from Kinesis and publishing to JetStream.
To verify the pipeline, run the sample vehicle data producer application.
Clone the demo repository:
git clone https://github.com/SystemCraftsman/nats-redpanda-connect-kinesis-demo.gitNavigate into the producer app directory:
cd nats-redpanda-connect-kinesis-demo/apps/VehicleDataProducerActivate your Python virtual environment, then install the dependencies:
pip install -r requirements.txtThis installs boto3, which the producer application uses to interact with AWS services such as Kinesis.
Before running the app, export your AWS configuration:
export AWS_DEFAULT_REGION='[YOUR_AWS_KINESIS_REGION]'export AWS_ACCESS_KEY_ID='[YOUR_AWS_ACCESS_KEY_ID]'export AWS_SECRET_ACCESS_KEY='[YOUR_AWS_CLIENT_SECRET]'Then run the producer:
python app.pyYou should see output similar to this:
1Sent data for vehicle_1: {'ShardId': 'shardId-000000000002', 'SequenceNumber': '49654821927667342525694265153029849795926049811702218786', 'ResponseMetadata': {'RequestId': 'e49b68e4-0cee-c87d-bbae-b630230dc8da', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e49b68e4-0cee-c87d-bbae-b630230dc8da', 'x-amz-id-2': 'NO+p4Q7NEHzixBxZLMWgRKx54EqpNVCbZR3+j2I1Uv9+Z5k/HPYiO642nAfucAWcZ5wEdnzp88OJI4hP/Nmu4lmJD9vLOCOm', 'date': 'Tue, 13 Aug 2024 22:22:12 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}2Sent data for vehicle_2: {'ShardId': 'shardId-000000000002', 'SequenceNumber': '49654821927667342525694265153143488822969824954124599330', 'ResponseMetadata': {'RequestId': 'f333697a-3d03-3eb5-ac06-b7af12e03e12', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f333697a-3d03-3eb5-ac06-b7af12e03e12', 'x-amz-id-2': 'ujmF9zuGemZD2HszAOUSRw0IFrRT+sQPLNXYZjIYNz8muVS6xhmqa0f1BDShaW/bk1HLwnrQTZxPM9D3h7oOmbAKtJOUJY/A', 'date': 'Tue, 13 Aug 2024 22:22:13 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}3Sent data for vehicle_3: {'ShardId': 'shardId-000000000000', 'SequenceNumber': '49654821927979552958473693877257970718944511224426004482', 'ResponseMetadata': {'RequestId': 'd98518fd-c046-d9f4-86b0-c628efa5d953', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd98518fd-c046-d9f4-86b0-c628efa5d953', 'x-amz-id-2': 'NrFz4jgNRb+7XJtsGCngOAMI3aMueo+7868GcYjS6ZQriFIIWta+5pOBsJxNLXpQwqLFKCPO1+zp0YxP9B65DQ49Y0Bl2XIq', 'date': 'Tue, 13 Aug 2024 22:22:13 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}4...output omitted...Use the NATS CLI to view messages in the vehicle_data stream:
nats stream view vehicle_dataYou should see messages published on the vehicle.data subject with the Content-Type: application/json header and the transformed payload:
1[152913] Subject: vehicle.data Received: 2024-08-13T23:24:53+03:002
3 Content-Type: application/json4
5{"fuel_level":24.24,"latitude":-76.594167,"longitude":106.770849,"speed":76.2183424,"timestamp":"2024-08-13T20:24:52.426587Z","vehicle_id":"vehicle_3"}6
7[152914] Subject: vehicle.data Received: 2024-08-13T23:24:54+03:008
9 Content-Type: application/json10
11{"fuel_level":21.14,"latitude":7.966871,"longitude":-64.717011,"speed":52.4483906,"timestamp":"2024-08-13T20:24:53.636805Z","vehicle_id":"vehicle_1"}12
13[152915] Subject: vehicle.data Received: 2024-08-13T23:24:54+03:0014
15 Content-Type: application/json16
17{"fuel_level":26.42,"latitude":-75.722179,"longitude":55.893106,"speed":100.2136018,"timestamp":"2024-08-13T20:24:53.842285Z","vehicle_id":"vehicle_2"}18
19...output omitted...20
21? Next Page? (Y/n)This confirms that Redpanda Connect consumed records from Kinesis, applied the mapping and transformation, and published the processed messages to NATS JetStream.
If you run python app.py before setting AWS environment variables, you may see an error similar to this:
1botocore.exceptions.PartialCredentialsError: Partial credentials found in env, missing: AWS_SECRET_ACCESS_KEYExport the required variables and run the app again:
export AWS_DEFAULT_REGION='[YOUR_AWS_KINESIS_REGION]'export AWS_ACCESS_KEY_ID='[YOUR_AWS_ACCESS_KEY_ID]'export AWS_SECRET_ACCESS_KEY='[YOUR_AWS_CLIENT_SECRET]'python app.pyIf the JetStream monitoring endpoint still shows streams: 0, the JetStream stream has not been created. Re-run the nats stream add vehicle_data command and confirm that the CLI reports Stream vehicle_data was created.
nats stream view vehicle_data?Check the pipeline in this order:
vehicle_data.aws_kinesis and nats_jetstream as active.connect.yaml is nats://nats:4222 when Redpanda Connect and NATS are on the same Docker network.vehicle.data subject.You connected Amazon Kinesis Data Streams to NATS JetStream using Redpanda Connect. Kinesis acted as the source, Redpanda Connect consumed and transformed the vehicle telemetry records, and JetStream persisted the processed messages on the vehicle.data subject.
To skip managing your own NATS infrastructure, Synadia Cloud gives you a fully managed NATS service with JetStream ready to go. Sign up for free and swap the local Docker server in this tutorial for a Synadia Cloud endpoint.
Want help from the NATS experts? Meet with our architects to get help tailored to your use case and environment.



News and content from across the community