RethinkConn is back — the biggest NATS event of the year returns June 4. Save your (virtual) spot.
All posts

How to Connect NATS JetStream to RabbitMQ with Redpanda Connect

How to Connect NATS JetStream to RabbitMQ with Redpanda Connect

You can connect RabbitMQ to NATS JetStream by running RabbitMQ as the message source, using Redpanda Connect as the bridge, and publishing processed messages into a JetStream stream. This tutorial walks through a local RabbitMQ setup, a Synadia Cloud NATS account with JetStream enabled, and a Redpanda Connect pipeline that consumes RabbitMQ messages, validates them, and writes them to NATS JetStream.

The example implements a one-way flow:

1
RabbitMQ exchange and queue -> Redpanda Connect -> NATS JetStream

Use this pattern when you want RabbitMQ to handle queueing and routing while NATS JetStream stores and serves event streams for downstream processing.

What are you building?

In this tutorial, you will:

  1. Create a NATS JetStream stream for analytics events.
  2. Run RabbitMQ locally with Docker.
  3. Create a RabbitMQ topic exchange, durable queue, and binding.
  4. Configure Redpanda Connect to consume from RabbitMQ and publish to NATS JetStream.
  5. Publish sample messages into RabbitMQ.
  6. Validate that the messages arrive in JetStream.

Redpanda Connect connects NATS JetStream and RabbitMQ

In this setup, RabbitMQ acts as the source where messages are produced. Redpanda Connect consumes messages from RabbitMQ, applies processing such as schema validation, and sends the processed messages to NATS JetStream as the sink.

When should you connect RabbitMQ and NATS JetStream?

RabbitMQ and NATS JetStream are both messaging technologies, but they are often used for different parts of an architecture.

Use this integration when you need to:

  • Keep RabbitMQ for exchange-based routing, queues, and existing AMQP producers.
  • Persist or stream selected events through NATS JetStream.
  • Bridge systems during a migration or modernization project.
  • Move routed RabbitMQ events into a high-throughput event stream for downstream consumers.
  • Add validation or filtering between RabbitMQ and NATS using Redpanda Connect.

NATS JetStream provides message persistence, at-least-once delivery, and multiple consumption modes. RabbitMQ provides a rich broker model with queues, exchanges, routing keys, and plugin support. Redpanda Connect lets you build a pipeline that connects the two without writing a custom bridge service.

What do you need before you start?

You will need the following:

  • Docker: You will use Docker to run RabbitMQ and Redpanda Connect locally.
  • NATS server with JetStream enabled: This tutorial uses Synadia Cloud to simplify setup. Sign up for a free account, create or access a NATS system, and download your user credentials for later.
  • JSON schema file: The Redpanda Connect configuration references a JSON schema file. Replace the example schema path with the actual path to your schema.

How do you create a NATS JetStream stream?

The first step is to set up a NATS server with JetStream enabled. If you run your own NATS server, follow the official NATS setup documentation. To focus on the integration, this tutorial uses Synadia Cloud.

Synadia Cloud sign-up

In Synadia Cloud, open the JetStream tab and create a stream. When creating a JetStream stream, specify a stream name, an optional description, and one or more subjects that the stream will store messages for.

For this tutorial, create a stream with:

  • Stream name: analytics_stream
  • Subject pattern: analytics.events.*

Subjects in NATS are like topics or channels that messages are published to and subscribed from, such as analytics.events.login or user.notifications. NATS also supports wildcard subjects, which lets streams and subscriptions match hierarchical subject patterns.

Messages published to matching subjects such as analytics.events.login or analytics.events.purchase will be captured by the stream.

Setting up a stream in NATS JetStream

How do you run RabbitMQ locally?

Run RabbitMQ with the management UI enabled:

Terminal window
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

This command pins rabbitmq:3.13-management; newer -management image tags (including the 4.x line) work the same way for the steps in this tutorial.

To verify that RabbitMQ is running, open http://localhost:15672/#/ in your browser and log in with:

  • Username: guest
  • Password: guest

Note on the guest user: By default, RabbitMQ’s built-in guest user can only authenticate over a loopback (localhost) connection. Logging in to the management UI at localhost:15672 works because that request comes from your machine. However, when Redpanda Connect runs in a separate container and connects through host.docker.internal, RabbitMQ sees a non-loopback connection and may reject guest with an ACCESS_REFUSED error. If that happens, create a dedicated RabbitMQ user (management UI → Admin tab → Add a user, then grant it permissions on the / virtual host) and use those credentials in connect.yaml, or start the RabbitMQ container with -e RABBITMQ_DEFAULT_USER=<user> -e RABBITMQ_DEFAULT_PASS=<password> to create a user that is not loopback-restricted.

How do you create the RabbitMQ queue, exchange, and binding?

With RabbitMQ running, create the queue and exchange that will handle the message flow.

RabbitMQ uses:

  • Queues to store and distribute messages to consumers.
  • Exchanges to route messages to queues based on bindings and routing keys.

This tutorial uses the RabbitMQ management UI, but you can also create these resources through the RabbitMQ API or a client library.

How do you create the RabbitMQ queue?

In the RabbitMQ management UI:

  1. Open the Queues and Streams tab.
  2. Add a new queue named analytics_events_queue.
  3. Set durability to Durable.

Creating and configuring a RabbitMQ queue

How do you create the RabbitMQ exchange?

Next:

  1. Open the Exchanges tab.
  2. Add a new exchange named analytics_events_exchange.
  3. Set the exchange type to topic.
  4. Set durability to Durable.

Creating and configuring a RabbitMQ exchange

How do you bind the exchange to the queue?

Create a binding so messages published to the topic exchange are routed into the queue.

Use this routing key:

1
analytics.events.#

This binding routes any message with a RabbitMQ routing key that starts with analytics.events. to analytics_events_queue.

Note on wildcard semantics: RabbitMQ and NATS use different wildcard rules. In a RabbitMQ topic binding, # matches zero or more words, so analytics.events.# also matches deeper keys such as analytics.events.user.login. The NATS stream subject analytics.events.* uses *, which matches exactly one token. The sample events in this tutorial use a single token after analytics.events. (login, purchase, logout), so both patterns line up. If you introduce routing keys with additional tokens, widen the NATS stream subject to analytics.events.> (NATS > matches one or more trailing tokens) so JetStream still captures them.

To add the binding:

  1. Open the Queues and Streams tab.
  2. Select analytics_events_queue.
  3. Bind it to analytics_events_exchange.
  4. Use analytics.events.# as the routing key.

Binding the RabbitMQ queue to an exchange

How do you configure Redpanda Connect for RabbitMQ and NATS JetStream?

Create a file named connect.yaml. This configuration tells Redpanda Connect to read from the RabbitMQ queue, validate messages against a JSON schema, and publish valid messages to NATS JetStream.

1
input:
2
label: analytics_queue
3
amqp_0_9:
4
urls:
5
- 'amqp://guest:guest@host.docker.internal:5672/'
6
queue: analytics_events_queue
7
pipeline:
8
threads: 1
9
processors:
10
- json_schema:
11
schema_path: "file://path_to_schema.json"
12
- catch:
13
- log:
14
level: ERROR
15
message: "Schema validation failed due to: ${!error()}"
16
- mapping: 'root = deleted()'
17
output:
18
label: analytics_stream
19
nats_jetstream:
20
urls:
21
- 'tls://connect.ngs.global'
22
subject: '${! metadata("amqp_routing_key") }'
23
headers:
24
Content-Type: application/json
25
auth:
26
user_jwt: >-
27
<NATS USER JWT>
28
user_nkey_seed: <USER NKEY SEED>

Replace the placeholder values before running the pipeline:

  • Replace file://path_to_schema.json with the path to your JSON schema file.
  • Replace <NATS USER JWT> with the user JWT from your NATS credentials file.
  • Replace <USER NKEY SEED> with the user NKey seed from your NATS credentials file.

Why the output subject is not a wildcard: A NATS publisher must publish to a concrete subject. Subjects that contain wildcard tokens (* or >) are valid only for subscriptions and for stream or consumer subject filters — the NATS server rejects an attempt to publish to a wildcard subject. The configuration above sets the output subject to ${! metadata("amqp_routing_key") }, an interpolation that publishes each message to the subject matching its RabbitMQ routing key (for example, analytics.events.login). Those concrete subjects are all captured by the analytics.events.* stream you created earlier. The metadata key amqp_routing_key is added by the amqp_0_9 input; confirm the exact name against the input documentation for your Redpanda Connect version.

If you prefer not to derive the subject from the routing key, publish every event to a single fixed subject instead — for example subject: analytics.events.ingest. That subject still matches the analytics.events.* stream, but all events then share one subject rather than mapping each event type to its own subject.

What does the Redpanda Connect input do?

The input section is labeled analytics_queue. It uses the AMQP 0-9 protocol to connect to RabbitMQ at host.docker.internal:5672 with the default local credentials. It reads messages from analytics_events_queue.

What does the Redpanda Connect pipeline do?

The pipeline section runs with one thread and includes two processors:

  1. json_schema validates incoming messages against the configured JSON schema.
  2. catch handles validation errors by logging the error and deleting invalid messages with a mapping operation.

This step matters because it prevents malformed events from being written into JetStream.

What does the Redpanda Connect output do?

The output section is labeled analytics_stream. It configures NATS JetStream as the output, sets a JSON content header, and authenticates with the NATS user credentials downloaded from Synadia Cloud. The subject field is the NATS subject each message is published to. It must resolve to a concrete subject (no wildcards); here it is derived from the message’s RabbitMQ routing key so each event type maps to its own subject under analytics.events..

How do you run Redpanda Connect?

Run Redpanda Connect from the directory where you saved connect.yaml:

Terminal window
docker run --rm -it -v $(pwd)/connect.yaml:/connect.yaml docker.redpanda.com/redpandadata/connect run

When Redpanda Connect starts, it begins consuming messages from the RabbitMQ queue and publishing them to NATS JetStream. You should see logs similar to the following:

Terminal window
INFO Running main config from file found in a default path @service=benthos benthos_version=v4.31.0 path=connect.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos
INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=benthos
INFO Input type amqp_0_9 is now active @service=benthos label=analytics_queue path=root.input
INFO Output type nats_jetstream is now active @service=benthos label=analytics_stream path=root.output

How do you publish test messages into RabbitMQ?

Use the RabbitMQ management UI to publish sample analytics events.

  1. Open the Exchanges tab.
  2. Select analytics_events_exchange.
  3. Find the Publish message section.
  4. Publish each message with the routing key and payload shown below.

Login event

  • Routing key: analytics.events.login
  • Payload:
1
{
2
"user_id": 1234,
3
"timestamp": "2024-06-01T10:30:00Z",
4
"event_type": "login"
5
}

Purchase event

  • Routing key: analytics.events.purchase
  • Payload:
1
{
2
"user_id": 1234,
3
"timestamp": "2024-06-01T11:15:00Z",
4
"event_type": "purchase",
5
"product_id": "ABC123",
6
"amount": 99.99
7
}

Logout event

  • Routing key: analytics.events.logout
  • Payload:
1
{
2
"user_id": 1234,
3
"timestamp": "2024-06-01T12:00:00Z",
4
"event_type": "logout"
5
}

Click Publish message for each event. RabbitMQ routes each message through analytics_events_exchange into analytics_events_queue, where Redpanda Connect consumes it.

How do you validate that messages reached NATS JetStream?

To confirm that the messages were published to JetStream:

  1. Open Synadia Cloud.
  2. Go to the JetStream dashboard.
  3. Open the stream you created earlier.
  4. Check the stream overview for the three analytics messages.

Synadia Cloud's JetStream dashboard

If the messages appear in the stream, the RabbitMQ to NATS JetStream integration is working.

Troubleshooting RabbitMQ to NATS JetStream connections

Why is Redpanda Connect unable to reach RabbitMQ?

Check the AMQP URL in connect.yaml:

1
urls:
2
- 'amqp://guest:guest@host.docker.internal:5672/'

This address must be reachable from the Redpanda Connect container. host.docker.internal resolves to the host on Docker Desktop (macOS and Windows). On Linux, it is not defined by default — either add --add-host=host.docker.internal:host-gateway to the docker run command for Redpanda Connect, or update the URL to a RabbitMQ address that the container can reach (such as a shared Docker network or the host’s LAN address).

If the container reaches RabbitMQ but the connection is refused with an ACCESS_REFUSED authentication error, the cause is usually the guest user’s loopback restriction — see the note about the guest user in the RabbitMQ setup section above.

Why are no messages appearing in the RabbitMQ queue?

Verify that:

  • The exchange is named analytics_events_exchange.
  • The queue is named analytics_events_queue.
  • The queue is bound to the exchange.
  • The binding key is analytics.events.#.
  • Published messages use routing keys such as analytics.events.login.

Why are messages not appearing in JetStream?

Check the following:

  • Redpanda Connect is running and both input and output components are active.
  • The NATS user JWT and NKey seed are correct.
  • The JetStream stream exists.
  • The output subject resolves to a concrete subject (no * or > wildcards), and the stream’s subject filter (analytics.events.*) covers the subjects the output publishes to, such as analytics.events.login.
  • The messages pass the configured JSON schema validation.

Why are validation failures logged?

The json_schema processor validates each message against your schema. If a message does not match the schema, the catch processor logs the validation error and deletes the invalid message from the pipeline before it reaches JetStream.

FAQ: Connecting RabbitMQ and NATS JetStream

Is this integration bidirectional?

No. This tutorial shows a one-way pipeline from RabbitMQ to NATS JetStream. A reverse or bidirectional integration would require a separate configuration.

Do I need Synadia Cloud to use this pattern?

No. Synadia Cloud simplifies NATS and JetStream setup for this tutorial. You can also run and manage your own NATS server with JetStream enabled.

What role does Redpanda Connect play?

Redpanda Connect is the bridge. It consumes messages from RabbitMQ using AMQP 0-9, optionally processes or validates them, and writes the resulting messages to NATS JetStream.

What is the main reason to use RabbitMQ and NATS JetStream together?

Use RabbitMQ when you need exchange-based routing and queueing, and use NATS JetStream when you want persisted event streams for downstream consumers. Redpanda Connect lets you connect those roles in a pipeline.

Conclusion

Combining RabbitMQ routing with NATS JetStream persistence lets you support existing AMQP workflows while making selected events available through NATS. If you want to simplify the deployment and management of NATS JetStream, check out Synadia Cloud.

By combining RabbitMQ routing with NATS JetStream persistence and streaming, you can build messaging architectures that support existing AMQP workflows while making selected events available through NATS. If you are interested in simplifying the deployment and management of NATS JetStream, check out Synadia Cloud.


Want help from the NATS experts? Meet with our architects to get help tailored to your use case and environment.

Related posts

All posts
Get the NATS Newsletter

News and content from across the community


Cancel