NATS Weekly #42

NATS Weekly #42

Week of August 29 - September 4, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

⚡Releases

Official releases from NATS repos and others in the ecosystem.

🎬 Media

Audio or video recordings about or referencing NATS.

💬 Discussions

Github Discussions from various NATS repositories.

🧑‍🎓 Examples

New or updated examples on NATS by Example.

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

What are the options to defer message redeliveries for a consumer?

When a consumer is created there are three ack policies: AckNonePolicy, AckExplicitPolicy, and AckAllPolicy (using Go as the variable naming for this example). No acknowledgement is generally not desirable unless your application doesn't want redeliveries even if there are errors during message processing. Explicit acks is the most common and provides explicit control over each message. "Ack all" is a special case where batches of messages are handled and an ack on a later message will implicitly ack all message prior to that message (that was not previously acked). For this question, we will assume the :ack explicit" policy, but applies to the "ack all" as well.

By default, when a message is received by a subscription bound to a consumer (push or pull), it has the AckWait duration to send an ack to the server indicating whether to proceed to the next message (ack), the message should redelivered (using nak), or the message should be explicitly skipped (term).

The "ack wait" time is 30 seconds by default, but can be set explicitly on the consumer configuration. This means that if the client doesn't provide any decision after this period of time, the server will redeliver the message ("max delivers" can also be set, by default it is unlimited).

Assuming another delivery attempt is allowed, once that "ack wait" is reached, the server will immediately redeliver the message to any available subscriptions.

However, what if you wanted the server to redeliver after some backoff period? A set of BackOff durations can be defined for a consumer where each backoff corresponds to a delivery attempt.

nats.ConsumerConfig{

// Other settings..

BackOff: []time.Duration{

500 * time.Millisecond,

time.Second,

2 * time.Second,

},

}

For the first redelivery of a message, the server will wait 500ms before it redelivers, followed by one second, and so on. If the max deliver attempts exceeds the number of backoffs, the last backoff will be used for all remaining delivery attempts.

If you have a more dynamic use case where your application wants to explicitly Nak and provide a backoff, the NakWithDelay(...) method on a message can be used which tells the server "negative ack and redelivery after this time delay" such as time.Second.

What are good practices for setting the Nats-Msg-Id header?

The Nats-Msg-Id header can be set on a message to be appended to a stream. This is used for de-duplication of messages within a set duration window (default two minutes).

js.AddStream(&nats.StreamConfig{

Name: "EVENTS",

Subjects: []string{"events.>"},

})

msg := nats.NewMsg("events.order_placed")

msg.Header.Set(nats.MsgIdHdr, "...")

// set msg.Data, etc.

_, err := js.PublishMsg(msg)

When could duplication occur? In general, there are two situations.

  • A network error occurs on publish and your application needs to retry the publish (more on this in a moment).

  • Your application may be attempting a dual write or using the outbox pattern for publishing database changes to NATS. If using the outbox pattern with a database supporting transactions.

In either situation, there can be a situation where a publish is sent by the application (via a client library), and a network issue occurs before the client receives an acknowledgement. In other words, the server received the message and sent back an ack, but the client didn't get the ack. "Did the server actually receive the message??"

Since that question can't be answered, it is safest to resend the message. As long as the Nats-Msg-Id header is set with the same value as the first time, the server will see this message was already received and ignore it as a duplicate, and ack back the first message's information (such as the sequence number).

In practice, if you absolutely can't have a message not be received by a stream, you should always retry a publish on a message and rely on the Nats-Msg-Id.

For completeness, it is important to call out that this is not a general deduplication strategy since it only applies within the deduplication window defined on the stream configuration. In other words, don't expect that if you restart a batch process hours or days later that the server will magically deduplicate messages. This would need to be handled on the consumption side when processing messages (and it could rely on the same header using a local implementation of deduplication).

Is a timeout error expected when receiving messages?

Yes! This has been a common question and applies to subscriptions bound to push and pull consumers. Using Go as an example, the timeout is observed when using a synchronous subscription.

sub1, _ := js.SubscribeSync("events.>")

msg, err := sub.NextMsg()

// err could be nats.ErrTimeout

sub2, _ := js.PullSubscribe("events.>", "processor")

msgs, err := sub.Fetch(1)

// err could be nats.ErrTimeout

In both cases, a timeout is normal since it simply means that no more messages are currently available in the stream. In practice, messages are wrap in a loop too be consumed and timeouts are generally ignored (unless they mean something to your application).