
Job queues are essential components in software architecture that are designed to manage and process tasks asynchronously. A job queue allows applications to defer work that doesn’t need to be completed immediately. Instead of handling tasks synchronously and potentially blocking the main application flow, tasks are placed in a queue and processed asynchronously by worker processes. Common applications of job queues include processing large batches of data, sending emails or notifications, generating reports, and handling long-running computations. They can also decouple different parts of a system, allowing each component to operate independently and communicate through queued tasks.
This article will cover how to create a lightweight and performant job queue, along with a dead letter queue. We’ll explain how to implement this using the Go programming language and NATS, a high-performance messaging system designed for cloud-native applications, IoT messaging, and microservices architectures.
At its core, a job queue is a data structure that stores tasks to be executed asynchronously. Jobs are placed into the queue by producers (parts of the application that generate work) and pulled from the queue by consumers (worker processes that perform the actual work). This decoupling of job production and consumption allows for flexible scaling, as the number of workers can be adjusted based on the workload to ensure efficient resource utilization. Job queues often come with additional features like persistence to prevent data loss in case of system failures, retry mechanisms for failed jobs, and monitoring capabilities to track queue health and performance.
Consider a web application. When a user uploads a file, the application can quickly acknowledge the upload and offload the file processing (like resizing images or extracting metadata) to a job queue. For sending notifications, such as emails or SMS messages, the main application only needs to enqueue the message request, leaving the actual sending to be handled by a background worker. In an e-commerce application, the order processing system can place new orders into a job queue while a separate inventory service retrieves and processes these orders independently. This decoupling ensures that the order placement and inventory update processes do not directly depend on each other and can be scaled independently. The failure of one service also doesn’t affect the other.
NATS is used for real-time data streaming and communication between distributed systems. The core component in NATS offers reliable publish-subscribe, request-reply, and queuing messaging patterns. NATS JetStream is an extension of the NATS messaging system that provides advanced features like persistent storage, message replay, and streaming. JetStream is built into nats-server itself—it’s a feature that you can enable or disable according to your requirements. It enables persistence mechanisms for streams, increasing system resilience and reliability and providing additional features such as message replay.
To create our job queue and dead letter queue, we’ll build a sample application with three components, consisting of a publisher and two stream consumers:

We will develop these components using the NATS Go client. NATS clients are used to connect to and communicate with the NATS server. The ones in the nats-io GitHub organization are official clients maintained by NATS authors.
Note that the NATS Go client is JetStream-enabled, so we don’t need a different library for JetStream-specific operations.
In this article, we’ll use Synadia Cloud, which offers a globally accessible, fully managed NATS.io platform that works across multiple clouds and includes a user-friendly admin portal and API.
To follow along, make sure you have a recent version of the Go programming language installed for your operating system.
You’ll also need to install natscli to communicate with the NATS server. Follow the steps in the documentation to install natscli for your operating system.
Before you begin, sign up for Synadia Cloud.
After you sign up, select “NGS (NATS Global System)”. There should be a default account created when you log in:

Inside the account, navigate to Users. You should have a “CLI” user created in advance. You’ll use this user for the sample application in this guide.

To download the user credentials, click the user details and choose Get Connected > Download Credentials to save the .creds file to your machine.

Save the Synadia Cloud connection configuration as context. Replace ENTER_PATH_TO_CREDS_FILE with the path where you saved the .creds file:
nats context save --select "NGS-Default-CLI" --server "tls://connect.ngs.global" --creds ABSOLUTE_PATH_TO_CREDS_FILEOnce the configuration is successfully saved, you should see this output:
1NATS Configuration Context "NGS-Default-CLI"2
3 Server URLs: tls://connect.ngs.global4 Credentials: /Users/demo/Desktop/NGS-Default-CLI.creds (OK)5 Path: /Users/demo/.config/nats/context/NGS-Default-CLI.json6 Connection: OKTo confirm that you are connected to Synadia Cloud, execute the command below to get your account details:
nats account infoThere are multiple ways to create a stream. We can use the Synadia Cloud console, NATS client SDKs, or natscli.
Let’s create one using the stream subcommand of natscli:
nats stream add --subjects="orders.*" --storage=file --replicas=1 --retention=work --discard=new --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=256MB --max-age=-1 --max-msg-size=1MIB --dupe-window=2m0s --deny-delete --no-deny-purge --allow-direct orders_streamJetStream allows for a lot of flexibility via configurations. Let’s walk through the ones used here:
--subjects is a list of subjects that the stream will consume from. The wildcard orders.* means the stream will include any subject that starts with orders..--storage specifies the persistence mechanism for messages. file means that the messages will be persisted on disk.--replicas specifies the number of replicas in clustered mode. A value of 1 means there is no replication.--retention sets how messages are retained. The work retention policy keeps the messages until it is delivered to one consumer that explicitly acknowledges receipt, and then it’s cleared.--discard determines what happens when a stream reaches its limits of size or messages. new means new messages will be discarded if limits are reached.--max-msgs defines the number of messages to retain in the store for this stream. A value of -1 means there is no limit on the number of messages.--max-msgs-per-subject specifies the number of messages to retain in the store for this stream per unique subject. A value of -1 means there is no limit on the number of messages per subject.--max-bytes is the maximum combined size of all messages in a stream. Here, the limit is 256 megabytes.--max-age defines the oldest messages that can be stored in the stream. A value of -1 means messages do not expire based on age.--max-msg-size is the maximum size any single message can be to be accepted by the stream. The Synadia Cloud account limit defaults to 1.0MiB message size, which is what we used here.--dupe-window defines the window for identifying duplicate messages. Here, a window of 2 minutes is set, meaning duplicates within this time frame will be detected if the message was published using a unique MsgId.--deny-delete is used to enable/disable message deletion via the API.--deny-purge disallows the entire stream and subject to be purged via the API.--allow-direct allows direct access to stream data via the direct get API.Finally, we include the name of the stream (orders_stream).
A dead letter queue (DLQ) is a special type of stream that is used to temporarily store messages that failed to deliver. A DLQ is often used to troubleshoot any issues that might cause message delivery to fail.
Create the orders_unprocessed JetStream that will function as a dead letter queue:
nats stream add --subjects='$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.*' --storage=file --replicas=1 --retention=work --discard=new --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=256MB --max-age=-1 --max-msg-size=-1 --dupe-window=2m0s --deny-delete --no-deny-purge --allow-direct orders_unprocessedNow that we’ve created the stream, we can create our application.
Create a Go project:
mkdir nats-democd nats-demogo mod init <YOUR_GITHUB_USERNAME>/nats-demoInstall the required dependencies:
go get github.com/nats-io/nats.gogo get github.com/nats-io/nats.go/jetstreamCreate a file at publisher/publisher.go, then start by importing the necessary modules:
1package main2
3import (4 "context"5 "fmt"6 "log"7 "os"8 "time"9
10 "github.com/nats-io/nats.go"11 "github.com/nats-io/nats.go/jetstream"12)In the main function, get the credentials file path from the environment variable:
1func main() {2 credentialsFile := os.Getenv("NATS_CREDENTIALS_FILE_PATH")3 subjectName := os.Args[1]4
5}Use the nats.Connect function to connect with the Synadia Cloud nats server. A NATS connection will attempt to remain always connected, with built-in reconnection logic in case of network failures or server restarts:
1nc, err := nats.Connect("connect.ngs.global",2 nats.UserCredentials(credentialsFile),3 nats.Name(fmt.Sprintf("publisher_%s", subjectName)))4if err != nil {5 log.Fatal(err)6}7
8defer nc.Close()Create an instance of jetstream.JetStream with jetstream.New:
1js, err := jetstream.New(nc)2if err != nil {3 log.Fatal(err)4}Use the Publish function on the JetStream instance in a loop to send twenty messages to the subject:
1 for i := 0; i < 20; i++ {2 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)3 defer cancel()4 _, err := js.Publish(ctx, subjectName, []byte(fmt.Sprintf("order-%d", i)))5 if err != nil {6 log.Println(err)7 }8 log.Println("published order no.", i)9 time.Sleep(1 * time.Second)10}Start the publisher to send data to the orders.new subject. Replace ENTER_PATH_TO_CREDS_FILE with the path where we saved the .creds file:
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILEgo run publisher/publisher.go orders.newThe application will send twenty messages to the subject before terminating. The application logs should look something like this:
1//...2published order no. 143published order no. 154published order no. 165published order no. 176published order no. 187published order no. 19Every message that is sent to the subject(s) configured with the stream will be automatically available in the stream. A JetStream consumer client can consume a subset of messages stored in a stream. On the server side, NATS will keep track of which messages were delivered and acknowledged by clients.
Create consumer/consumer.go and import the modules:
1package main2
3import (4 "context"5 "fmt"6 "log"7 "math/rand"8 "os"9 "os/signal"10 "syscall"11 "time"12
13 "github.com/nats-io/nats.go"14 "github.com/nats-io/nats.go/jetstream"15)As before, fetch the credentials file from the environment variable in the main function:
1func main() {2 credentialsFile := os.Getenv("NATS_CREDENTIALS_FILE_PATH")3}The consumer receives two arguments: the name of the stream to consume the messages from and a subject filter that filters the subjects the consumer can read from:
1streamName := os.Args[1]2subjectFilter := os.Args[2]As before, connect to the NATS server and create a JetStream:
1nc, err := nats.Connect("connect.ngs.global",2 nats.UserCredentials(credentialsFile),3 nats.Name(fmt.Sprintf("consumer_%s", streamName)))4if err != nil {5 log.Fatal(err)6}7
8defer nc.Close()9
10js, err := jetstream.New(nc)11if err != nil {12 log.Fatal(err)13}After connecting to the Synadia Cloud nats server and creating the JetStream instance, use the CreateOrUpdateConsumer function to create a jetstream.Consumer instance.
Take note of the configuration, specifically MaxDeliver, BackOff, and FilterSubject:
1ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)2defer cancel()3
4consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jetstream.ConsumerConfig{5
6 Name: "processor",7 Durable: "processor",8 Description: "Orders processor",9 BackOff: []time.Duration{10 5 * time.Second,11 },12 MaxDeliver: 2,13 FilterSubject: subjectFilter,14 AckPolicy: jetstream.AckExplicitPolicy,15})16
17if err != nil {18 log.Fatal(err)19}Invoke the Consume function on the jetstream.Consumer instance with a callback. Messages from the stream will be continuously received and processed with the provided callback logic:
1c, err := consumer.Consume(func(msg jetstream.Msg) {2
3 meta, err := msg.Metadata()4
5 if err != nil {6 log.Printf("Error getting metadata: %s\n", err)7 return8 }9
10
11 if rand.Intn(10) < 5 {12
13 log.Println("error processing", string(msg.Data()))14
15 if meta.NumDelivered == 2 {16 log.Println(string(msg.Data()), "will be processed via DLQ")17 }18 return19 }20
21 log.Println("received", string(msg.Data()), "from stream sequence #", meta.Sequence.Stream)22 time.Sleep(10 * time.Millisecond)23
24 msg.Ack()25})26
27if err != nil {28 log.Fatal(err)29}30
31defer c.Stop()32
33sig := make(chan os.Signal, 1)34signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)35<-sigThe processing logic here is simplified on purpose; it simply logs the message payload along with the message sequence number. Take note of the if rand.Intn(10) < 5 section, which purposely creates a situation where the message is not acknowledged (hence it will be redelivered). In this case, redelivery will only happen twice (as per the consumer configuration), after which it will be handled by NATS.
We’ll explore this further in the next section, where we’ll implement a dead letter queue to make sure unprocessed messages are not lost.
Start the consumer application to process data from the stream. Replace ABSOLUTE_PATH_TO_CREDS_FILE with the path where we saved the .creds file:
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run consumer/consumer.go orders_stream "orders.*"The application logs should look something like this:
1received order-0 from stream sequence # 4482error processing order-13error processing order-24received order-3 from stream sequence # 4495error processing order-46//...7received order-19 from stream sequence # 4658error processing order-09order-0 will be processed via DLQ10received order-1 from stream sequence # 44711error processing order-212order-2 will be processed via DLQ13//...A few messages (orders) are received and processed successfully, as evident in the logs: received order-0 from stream sequence # 448. The messages that fail to be processed (more on this in the upcoming sections) are also highlighted in the logs, such as error processing order-2.
Keep the consumer running, as we’ll use it again later.
If a message reaches its maximum number of delivery attempts (as per the MaxDeliver configuration), an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER> subject. The advisory message payload contains a stream_seq field that contains the sequence number of the message in the stream, which can be used to get the message from the stream and reprocess it. This forms the foundation of a DLQ implementation. We’ve already created the orders_unprocessed stream, which reads from the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.* subject and acts as the DLQ.
Create dlq/consumer.go and add the starting code:
1package main2
3import (4 "context"5 "encoding/json"6 "fmt"7 "log"8 "os"9 "os/signal"10 "syscall"11 "time"12
13 "github.com/nats-io/nats.go"14 "github.com/nats-io/nats.go/jetstream"15)16
17func main() {18 credentialsFile := os.Getenv("NATS_CREDENTIALS_FILE_PATH")19}The program will receive two arguments, consisting of the name of the DLQ stream and the name of the original stream:
1dlqStreamName := os.Args[1]2originalStreamName := os.Args[2]As before, connect to the NATS server:
1nc, err := nats.Connect("connect.ngs.global",2 nats.UserCredentials(credentialsFile),3 nats.Name(fmt.Sprintf("dlq_%s", originalStreamName)))4if err != nil {5 log.Fatal(err)6}7
8defer nc.Close()9
10js, err := jetstream.New(nc)11if err != nil {12 log.Fatal(err)13}Create a consumer for the DLQ:
1ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)2defer cancel()3
4consumer, err := js.CreateOrUpdateConsumer(ctx, dlqStreamName, jetstream.ConsumerConfig{5
6 Name: "unprocessed_order_handler",7 Durable: "unprocessed_order_handler",8 Description: "handle unprocessed jobs",9 BackOff: []time.Duration{10 5 * time.Second,11 },12 MaxDeliver: 2,13 AckPolicy: jetstream.AckExplicitPolicy,14})15
16if err != nil {17 log.Fatal(err)18}Get a reference to the original stream:
1originalStream, err := js.Stream(ctx, originalStreamName)The DLQ handler implementation is similar to the consumer component. The application subscribes to the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.* subject and:
JSON payloadGetMsg function on the jetstream.Stream (of orders_stream) to get the message payloadAck())1type UnprocessedOrderEvent struct {2 Stream string `json:"stream"`3 Consumer string `json:"consumer"`4 StreamSeq int `json:"stream_seq"`5}6
7c, err := consumer.Consume(func(msg jetstream.Msg) {8 var event UnprocessedOrderEvent9
10 err = json.Unmarshal(msg.Data(), &event)11
12 if err != nil {13 log.Fatal(err)14 }15
16 order, err := originalStream.GetMsg(ctx, uint64(event.StreamSeq))17
18 if err != nil {19 log.Fatal(err)20 }21
22 log.Println("reprocessing order", string(order.Data))23 msg.Ack()24
25})26
27if err != nil {28 log.Fatal(err)29}30
31defer c.Stop()32
33sig := make(chan os.Signal, 1)34signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)35<-sigStart the dead letter queue processor to process data from the stream. Replace ABSOLUTEt_PATH_TO_CREDS_FILE with the path where you saved the .creds file:
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run dlq/consumer.go orders_unprocessed orders_streamThe application logs should look something like this:
1reprocessing order order-02reprocessing order order-23reprocessing order order-44reprocessing order order-105reprocessing order order-18You can run multiple instances of the consumer application to balance the load among them. Load balancing comes out of the box with NATS. When multiple consumers listen to the same subject, NATS automatically distributes the messages across these subscribers. This balances the load, ensuring that no single consumer is overwhelmed with too much work while others are idle. Since NATS handles load balancing internally, developers don’t need to set up or maintain external load balancers. This reduces complexity and the chances of misconfiguration. It also allows for horizontal scaling without needing to configure external load balancers or adjust message routing logic.
We should already have the consumer running. Start a second instance of the consumer application. Replace ABSOLUTE_PATH_TO_CREDS_FILE with the path where we saved the .creds file:
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILEgo run consumer/consumer.go orders_stream "orders.*"Now, run the producer application again. Replace ABSOLUTE_PATH_TO_CREDS_FILE with the path where we saved the .creds file:
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILEgo run publisher/publisher.go orders.newFollow the application logs of both instances to verify that each of them is processing a different message from the orders_stream stream. This will be evident from the order (for example, order-1 might be processed by the first instance and order-2 might be processed by the second one), as well as the stream sequence number.
The dead letter queue processor should continue to work the same way and process the failed messages.
You can find the complete code on Github.
In this article, we learned about the basics of job queues and how they enable asynchronous processing. Using NATS JetStream, we implemented a job processing application along with dead letter queue functionality in Go.
Thanks to a fully managed solution like Synadia Cloud, we didn’t have to manage NATS-related infrastructure. We were able to easily set up the stream with persistence, fault tolerance, and message replay capabilities using natscli and connect Go application components (producer, consumers, and DLQ processor) to Synadia Cloud using the JetStream-compliant NATS Go client.
News and content from across the community