All posts

Building a Job Queue With NATS.io and Go

Jakkie Koekemoer
Oct 1, 2024
Building a Job Queue With NATS.io and Go

Building a Job Queue with NATS.io and Go

Job queues are essential components in software architecture that are designed to manage and process tasks asynchronously. A job queue allows applications to defer work that doesn't need to be completed immediately. Instead of handling tasks synchronously and potentially blocking the main application flow, tasks are placed in a queue and processed asynchronously by worker processes. Common applications of job queues include processing large batches of data, sending emails or notifications, generating reports, and handling long-running computations. They can also decouple different parts of a system, allowing each component to operate independently and communicate through queued tasks.

This article will cover how to create a lightweight and performant job queue, along with a dead letter queue. We'll explain how to implement this using the Go programming language and NATS, a high-performance messaging system designed for cloud-native applications, IoT messaging, and microservices architectures.

What Is a Job Queue?

At its core, a job queue is a data structure that stores tasks to be executed asynchronously. Jobs are placed into the queue by producers (parts of the application that generate work) and pulled from the queue by consumers (worker processes that perform the actual work). This decoupling of job production and consumption allows for flexible scaling, as the number of workers can be adjusted based on the workload to ensure efficient resource utilization. Job queues often come with additional features like persistence to prevent data loss in case of system failures, retry mechanisms for failed jobs, and monitoring capabilities to track queue health and performance.

Consider a web application. When a user uploads a file, the application can quickly acknowledge the upload and offload the file processing (like resizing images or extracting metadata) to a job queue. For sending notifications, such as emails or SMS messages, the main application only needs to enqueue the message request, leaving the actual sending to be handled by a background worker. In an e-commerce application, the order processing system can place new orders into a job queue while a separate inventory service retrieves and processes these orders independently. This decoupling ensures that the order placement and inventory update processes do not directly depend on each other and can be scaled independently. The failure of one service also doesn't affect the other.

Job Queue Implementation with Go and NATS

NATS is used for real-time data streaming and communication between distributed systems. The core component in NATS offers reliable publish-subscribe, request-reply, and queuing messaging patterns. NATS JetStream is an extension of the NATS messaging system that provides advanced features like persistent storage, message replay, and streaming. JetStream is built into nats-server itself—it's a feature that you can enable or disable according to your requirements. It enables persistence mechanisms for streams, increasing system resilience and reliability and providing additional features such as message replay.

Solution Overview

To create our job queue and dead letter queue, we'll build a sample application with three components, consisting of a publisher and two stream consumers:

  • Publisher: Sends data to a NATS subject
  • Consumer (worker): Receives and processes data from the NATS stream; we can run multiple instances to load balance the processing among them
  • Dead letter queue processor: Handles failed messages that could not be processed by the consumer

Architecture diagram

We will develop these components using the NATS Go client. NATS clients are used to connect to and communicate with the NATS server. The ones in the nats-io GitHub organization are official clients maintained by NATS authors.

Note that the NATS Go client is JetStream-enabled, so we don't need a different library for JetStream-specific operations.

In this article, we'll use Synadia Cloud, which offers a globally accessible, fully managed NATS.io platform that works across multiple clouds and includes a user-friendly admin portal and API.

Prerequisites

To follow along, make sure you have a recent version of the Go programming language installed for your operating system.

You'll also need to install natscli to communicate with the NATS server. Follow the steps in the documentation to install natscli for your operating system.

Setting Up Synadia Cloud

Before you begin, sign up for Synadia Cloud.

After you sign up, select "NGS (NATS Global System)". There should be a default account created when you log in:

Default account

Inside the account, navigate to Users. You should have a "CLI" user created in advance. You'll use this user for the sample application in this guide.

Users list

To download the user credentials, click the user details and choose Get Connected > Download Credentials to save the .creds file to your machine.

User details page

Save the Synadia Cloud connection configuration as context. Replace ENTER_PATH_TO_CREDS_FILE with the path where you saved the .creds file:

Terminal window
nats context save --select "NGS-Default-CLI" --server "tls://connect.ngs.global" --creds ABSOLUTE_PATH_TO_CREDS_FILE

Once the configuration is successfully saved, you should see this output:

Terminal window
1
NATS Configuration Context "NGS-Default-CLI"
2
3
Server URLs: tls://connect.ngs.global
4
Credentials: /Users/demo/Desktop/NGS-Default-CLI.creds (OK)
5
Path: /Users/demo/.config/nats/context/NGS-Default-CLI.json
6
Connection: OK

To confirm that you are connected to Synadia Cloud, execute the command below to get your account details:

Terminal window
nats account info

Creating a NATS JetStream

There are multiple ways to create a stream. We can use the Synadia Cloud console, NATS client SDKs, or natscli.

Let's create one using the stream subcommand of natscli:

Terminal window
nats stream add --subjects="orders.*" --storage=file --replicas=1 --retention=work --discard=new --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=256MB --max-age=-1 --max-msg-size=1MIB --dupe-window=2m0s --deny-delete --no-deny-purge --allow-direct orders_stream

JetStream allows for a lot of flexibility via configurations. Let's walk through the ones used here:

  • --subjects is a list of subjects that the stream will consume from. The wildcard orders.* means the stream will include any subject that starts with orders..
  • --storage specifies the persistence mechanism for messages. file means that the messages will be persisted on disk.
  • --replicas specifies the number of replicas in clustered mode. A value of 1 means there is no replication.
  • --retention sets how messages are retained. The work retention policy keeps the messages until it is delivered to one consumer that explicitly acknowledges receipt, and then it's cleared.
  • --discard determines what happens when a stream reaches its limits of size or messages. new means new messages will be discarded if limits are reached.
  • --max-msgs defines the number of messages to retain in the store for this stream. A value of -1 means there is no limit on the number of messages.
  • --max-msgs-per-subject specifies the number of messages to retain in the store for this stream per unique subject. A value of -1 means there is no limit on the number of messages per subject.
  • --max-bytes is the maximum combined size of all messages in a stream. Here, the limit is 256 megabytes.
  • --max-age defines the oldest messages that can be stored in the stream. A value of -1 means messages do not expire based on age.
  • --max-msg-size is the maximum size any single message can be to be accepted by the stream. The Synadia Cloud account limit defaults to 1.0MiB message size, which is what we used here.
  • --dupe-window defines the window for identifying duplicate messages. Here, a window of 2 minutes is set, meaning duplicates within this time frame will be detected if the message was published using a unique MsgId.
  • --deny-delete is used to enable/disable message deletion via the API.
  • --deny-purge disallows the entire stream and subject to be purged via the API.
  • --allow-direct allows direct access to stream data via the direct get API.

Finally, we include the name of the stream (orders_stream).

Creating a Dead Letter Queue

A dead letter queue (DLQ) is a special type of stream that is used to temporarily store messages that failed to deliver. A DLQ is often used to troubleshoot any issues that might cause message delivery to fail.

Create the orders_unprocessed JetStream that will function as a dead letter queue:

Terminal window
nats stream add --subjects='$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.*' --storage=file --replicas=1 --retention=work --discard=new --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=256MB --max-age=-1 --max-msg-size=-1 --dupe-window=2m0s --deny-delete --no-deny-purge --allow-direct orders_unprocessed

Writing the JetStream Publisher

Now that we've created the stream, we can create our application.

Create a Go project:

Terminal window
mkdir nats-demo
cd nats-demo
go mod init <YOUR_GITHUB_USERNAME>/nats-demo

Install the required dependencies:

Terminal window
go get github.com/nats-io/nats.go
go get github.com/nats-io/nats.go/jetstream

Create a file at publisher/publisher.go, then start by importing the necessary modules:

Terminal window
1
package main
2
3
import (
4
"context"
5
"fmt"
6
"log"
7
"os"
8
"time"
9
10
"github.com/nats-io/nats.go"
11
"github.com/nats-io/nats.go/jetstream"
12
)

In the main function, get the credentials file path from the environment variable:

Terminal window
1
func main() {
2
credentialsFile := os.Getenv("NATS_CREDENTIALS_FILE_PATH")
3
subjectName := os.Args[1]
4
5
}

Use the nats.Connect function to connect with the Synadia Cloud nats server. A NATS connection will attempt to remain always connected, with built-in reconnection logic in case of network failures or server restarts:

Terminal window
1
nc, err := nats.Connect("connect.ngs.global",
2
nats.UserCredentials(credentialsFile),
3
nats.Name(fmt.Sprintf("publisher_%s", subjectName)))
4
if err != nil {
5
log.Fatal(err)
6
}
7
8
defer nc.Close()

Create an instance of jetstream.JetStream with jetstream.New:

Terminal window
1
js, err := jetstream.New(nc)
2
if err != nil {
3
log.Fatal(err)
4
}

Use the Publish function on the JetStream instance in a loop to send twenty messages to the subject:

Terminal window
1
for i := 0; i < 20; i++ {
2
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
3
defer cancel()
4
_, err := js.Publish(ctx, subjectName, []byte(fmt.Sprintf("order-%d", i)))
5
if err != nil {
6
log.Println(err)
7
}
8
log.Println("published order no.", i)
9
time.Sleep(1 * time.Second)
10
}

Start the publisher to send data to the orders.new subject. Replace ENTER_PATH_TO_CREDS_FILE with the path where we saved the .creds file:

Terminal window
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run publisher/publisher.go orders.new

The application will send twenty messages to the subject before terminating. The application logs should look something like this:

Terminal window
1
//...
2
published order no. 14
3
published order no. 15
4
published order no. 16
5
published order no. 17
6
published order no. 18
7
published order no. 19

Writing the JetStream Consumer

Every message that is sent to the subject(s) configured with the stream will be automatically available in the stream. A JetStream consumer client can consume a subset of messages stored in a stream. On the server side, NATS will keep track of which messages were delivered and acknowledged by clients.

Create consumer/consumer.go and import the modules:

Terminal window
1
package main
2
3
import (
4
"context"
5
"fmt"
6
"log"
7
"math/rand"
8
"os"
9
"os/signal"
10
"syscall"
11
"time"
12
13
"github.com/nats-io/nats.go"
14
"github.com/nats-io/nats.go/jetstream"
15
)

As before, fetch the credentials file from the environment variable in the main function:

Terminal window
1
func main() {
2
credentialsFile := os.Getenv("NATS_CREDENTIALS_FILE_PATH")
3
}

The consumer receives two arguments: the name of the stream to consume the messages from and a subject filter that filters the subjects the consumer can read from:

Terminal window
1
streamName := os.Args[1]
2
subjectFilter := os.Args[2]

As before, connect to the NATS server and create a JetStream:

Terminal window
1
nc, err := nats.Connect("connect.ngs.global",
2
nats.UserCredentials(credentialsFile),
3
nats.Name(fmt.Sprintf("consumer_%s", streamName)))
4
if err != nil {
5
log.Fatal(err)
6
}
7
8
defer nc.Close()
9
10
js, err := jetstream.New(nc)
11
if err != nil {
12
log.Fatal(err)
13
}

After connecting to the Synadia Cloud nats server and creating the JetStream instance, use the CreateOrUpdateConsumer function to create a jetstream.Consumer instance.

Take note of the configuration, specifically MaxDeliver, BackOff, and FilterSubject:

Terminal window
1
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
2
defer cancel()
3
4
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jetstream.ConsumerConfig{
5
6
Name: "processor",
7
Durable: "processor",
8
Description: "Orders processor",
9
BackOff: []time.Duration{
10
5 * time.Second,
11
},
12
MaxDeliver: 2,
13
FilterSubject: subjectFilter,
14
AckPolicy: jetstream.AckExplicitPolicy,
15
})
16
17
if err != nil {
18
log.Fatal(err)
19
}

Invoke the Consume function on the jetstream.Consumer instance with a callback. Messages from the stream will be continuously received and processed with the provided callback logic:

Terminal window
1
c, err := consumer.Consume(func(msg jetstream.Msg) {
2
3
meta, err := msg.Metadata()
4
5
if err != nil {
6
log.Printf("Error getting metadata: %s\n", err)
7
return
8
}
9
10
11
if rand.Intn(10) < 5 {
12
13
log.Println("error processing", string(msg.Data()))
14
15
if meta.NumDelivered == 2 {
16
log.Println(string(msg.Data()), "will be processed via DLQ")
17
}
18
return
19
}
20
21
log.Println("received", string(msg.Data()), "from stream sequence #", meta.Sequence.Stream)
22
time.Sleep(10 * time.Millisecond)
23
24
msg.Ack()
25
})
26
27
if err != nil {
28
log.Fatal(err)
29
}
30
31
defer c.Stop()
32
33
sig := make(chan os.Signal, 1)
34
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
35
<-sig

The processing logic here is simplified on purpose; it simply logs the message payload along with the message sequence number. Take note of the if rand.Intn(10) < 5 section, which purposely creates a situation where the message is not acknowledged (hence it will be redelivered). In this case, redelivery will only happen twice (as per the consumer configuration), after which it will be handled by NATS.

We'll explore this further in the next section, where we'll implement a dead letter queue to make sure unprocessed messages are not lost.

Start the consumer application to process data from the stream. Replace ABSOLUTE_PATH_TO_CREDS_FILE with the path where we saved the .creds file:

Terminal window
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run consumer/consumer.go orders_stream "orders.*"

The application logs should look something like this:

Terminal window
1
received order-0 from stream sequence # 448
2
error processing order-1
3
error processing order-2
4
received order-3 from stream sequence # 449
5
error processing order-4
6
//...
7
received order-19 from stream sequence # 465
8
error processing order-0
9
order-0 will be processed via DLQ
10
received order-1 from stream sequence # 447
11
error processing order-2
12
order-2 will be processed via DLQ
13
//...

A few messages (orders) are received and processed successfully, as evident in the logs: received order-0 from stream sequence # 448. The messages that fail to be processed (more on this in the upcoming sections) are also highlighted in the logs, such as error processing order-2.

Keep the consumer running, as we'll use it again later.

Implementing a Dead Letter Queue Using JetStream

If a message reaches its maximum number of delivery attempts (as per the MaxDeliver configuration), an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER> subject. The advisory message payload contains a stream_seq field that contains the sequence number of the message in the stream, which can be used to get the message from the stream and reprocess it. This forms the foundation of a DLQ implementation. We've already created the orders_unprocessed stream, which reads from the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.* subject and acts as the DLQ.

Create dlq/consumer.go and add the starting code:

Terminal window
1
package main
2
3
import (
4
"context"
5
"encoding/json"
6
"fmt"
7
"log"
8
"os"
9
"os/signal"
10
"syscall"
11
"time"
12
13
"github.com/nats-io/nats.go"
14
"github.com/nats-io/nats.go/jetstream"
15
)
16
17
func main() {
18
credentialsFile := os.Getenv("NATS_CREDENTIALS_FILE_PATH")
19
}

The program will receive two arguments, consisting of the name of the DLQ stream and the name of the original stream:

Terminal window
1
dlqStreamName := os.Args[1]
2
originalStreamName := os.Args[2]

As before, connect to the NATS server:

Terminal window
1
nc, err := nats.Connect("connect.ngs.global",
2
nats.UserCredentials(credentialsFile),
3
nats.Name(fmt.Sprintf("dlq_%s", originalStreamName)))
4
if err != nil {
5
log.Fatal(err)
6
}
7
8
defer nc.Close()
9
10
js, err := jetstream.New(nc)
11
if err != nil {
12
log.Fatal(err)
13
}

Create a consumer for the DLQ:

Terminal window
1
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
2
defer cancel()
3
4
consumer, err := js.CreateOrUpdateConsumer(ctx, dlqStreamName, jetstream.ConsumerConfig{
5
6
Name: "unprocessed_order_handler",
7
Durable: "unprocessed_order_handler",
8
Description: "handle unprocessed jobs",
9
BackOff: []time.Duration{
10
5 * time.Second,
11
},
12
MaxDeliver: 2,
13
AckPolicy: jetstream.AckExplicitPolicy,
14
})
15
16
if err != nil {
17
log.Fatal(err)
18
}

Get a reference to the original stream:

Terminal window
1
originalStream, err := js.Stream(ctx, originalStreamName)

The DLQ handler implementation is similar to the consumer component. The application subscribes to the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.* subject and:

  • Extracts the stream sequence from the event JSON payload
  • Uses the GetMsg function on the jetstream.Stream (of orders_stream) to get the message payload
  • Processes the message (in this case, logs it to standard out) and acknowledges it (Ack())
Terminal window
1
type UnprocessedOrderEvent struct {
2
Stream string `json:"stream"`
3
Consumer string `json:"consumer"`
4
StreamSeq int `json:"stream_seq"`
5
}
6
7
c, err := consumer.Consume(func(msg jetstream.Msg) {
8
var event UnprocessedOrderEvent
9
10
err = json.Unmarshal(msg.Data(), &event)
11
12
if err != nil {
13
log.Fatal(err)
14
}
15
16
order, err := originalStream.GetMsg(ctx, uint64(event.StreamSeq))
17
18
if err != nil {
19
log.Fatal(err)
20
}
21
22
log.Println("reprocessing order", string(order.Data))
23
msg.Ack()
24
25
})
26
27
if err != nil {
28
log.Fatal(err)
29
}
30
31
defer c.Stop()
32
33
sig := make(chan os.Signal, 1)
34
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
35
<-sig

Start the dead letter queue processor to process data from the stream. Replace ABSOLUTEt_PATH_TO_CREDS_FILE with the path where you saved the .creds file:

Terminal window
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run dlq/consumer.go orders_unprocessed orders_stream

The application logs should look something like this:

Terminal window
1
reprocessing order order-0
2
reprocessing order order-2
3
reprocessing order order-4
4
reprocessing order order-10
5
reprocessing order order-18

Load Balanced Processing

You can run multiple instances of the consumer application to balance the load among them. Load balancing comes out of the box with NATS. When multiple consumers listen to the same subject, NATS automatically distributes the messages across these subscribers. This balances the load, ensuring that no single consumer is overwhelmed with too much work while others are idle. Since NATS handles load balancing internally, developers don’t need to set up or maintain external load balancers. This reduces complexity and the chances of misconfiguration. It also allows for horizontal scaling without needing to configure external load balancers or adjust message routing logic.

We should already have the consumer running. Start a second instance of the consumer application. Replace ABSOLUTE_PATH_TO_CREDS_FILE with the path where we saved the .creds file:

Terminal window
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run consumer/consumer.go orders_stream "orders.*"

Now, run the producer application again. Replace ABSOLUTE_PATH_TO_CREDS_FILE with the path where we saved the .creds file:

Terminal window
export NATS_CREDENTIALS_FILE_PATH=ABSOLUTE_PATH_TO_CREDS_FILE
go run publisher/publisher.go orders.new

Follow the application logs of both instances to verify that each of them is processing a different message from the orders_stream stream. This will be evident from the order (for example, order-1 might be processed by the first instance and order-2 might be processed by the second one), as well as the stream sequence number.

The dead letter queue processor should continue to work the same way and process the failed messages.

You can find the complete code on Github.

Conclusion

In this article, we learned about the basics of job queues and how they enable asynchronous processing. Using NATS JetStream, we implemented a job processing application along with dead letter queue functionality in Go.

Thanks to a fully managed solution like Synadia Cloud, we didn't have to manage NATS-related infrastructure. We were able to easily set up the stream with persistence, fault tolerance, and message replay capabilities using natscli and connect Go application components (producer, consumers, and DLQ processor) to Synadia Cloud using the JetStream-compliant NATS Go client.