Integrating NATS and JetStream: Modernizing Our Internal Communication

High-level NATS architecture

Introduction

Brokee was built using microservice architecture from day one as the initial focus for skills assessment was Kubernetes, and later we expanded to other technologies. At the same time, as new services were added, we sometimes took shortcuts with design decisions. Over the years, it resulted in a spaghetti architecture where many services were interconnected with each other and it became harder and harder to reason about dependencies and figure out which functionality should go to which service.

Discover how we improved our system's communication by integrating NATS messaging system and their JetStream functionality. We delve into the challenges we faced, the lessons we learned, and how we simplified our setup to make it more efficient. This integration has laid the foundation for a more scalable and resilient infrastructure, enabling us to adapt and innovate as our platform grows.

Why Change?

Our previous architecture relied heavily on a synchronous request-response model. While this served us well initially, it began to show limitations as our platform grew:

  • Scalability issues: Increasing traffic caused bottlenecks in our services.

  • Lack of flexibility: Adding new features required significant changes to the existing communication flow.

  • Reduced reliability: Single points of failure in the system led to occasional downtime.

Even though we use backoff and retry strategies in our APIs, requests can still fail if the server is unreachable, unable to handle them, or overwhelmed by too many requests. We needed a more robust, asynchronous system that could scale effortlessly. That’s when we turned to NATS and JetStream, which offered persistence.

Old architecture

Old architecture: tightly coupled services using synchronous request-response communication.

What is NATS and JetStream?

NATS is a lightweight, high-performance messaging system that supports pub/sub communication. JetStream extends NATS by adding durable message storage and stream processing capabilities, making it ideal for modern, distributed systems. For developers using the SDK, NATS offers support for a variety of programming languages, making it a flexible solution for integrating messaging capabilities.

With NATS and JetStream, we could:

  • Decouple services: Allow services to communicate without direct dependencies.

  • Enable persistence: Use JetStream’s durable subscriptions to ensure no messages are lost.

  • Simplify scaling: Seamlessly handle spikes in traffic without major architectural changes.

New architecture: decoupled services with asynchronous pub/sub communication via NATS.

The Integration Process

Here’s how we integrated NATS into our platform:

1. Setting Up NATS

We deployed NATS using Helm. Helm made the installation and configuration straightforward, allowing us to define resources and dependencies in a consistent, repeatable way.

To ensure reliability and scalability, we set up 3 running server instances of NATS, leveraging its clustering capabilities and the Raft consensus algorithm to handle increased traffic and provide fault tolerance.

For storage, we used persistent volumes, ensuring durability. NATS also offers the option to use memory-based storage. However, to optimize memory usage and prevent overload on our nodes, we decided to switch to persistent volume storage.

Additionally, we made the deployment more resilient by ensuring NATS instances were safely scheduled on separate nodes to avoid single points of failure and ensure high availability. We opted for the NATS headless service type as NATS clients need to be able to talk to server instances directly without load balancing.

config:
  jetstream:
    enabled: true
    fileStore:
      enabled: true
      pvc:
        enabled: true
        size: 10Gi
        storageClassName: premium-rwo-retain
  cluster:
    enabled: true
    replicas: 3

statefulSet:
  merge:
    spec:
      template:
        metadata:
          annotations:
            cluster-autoscaler.kubernetes.io/safe-to-evict: "false"

podTemplate:
  topologySpreadConstraints:
    kubernetes.io/hostname:
      maxSkew: 1
      whenUnsatisfiable: "DoNotSchedule"

2. Migrating to Pub/Sub

Our first step was replacing direct request-response calls with pub/sub communication. For example:

  • Before: Common Service would send an HTTP request directly to Auth Service and await a response.

  • After: Common Service publishes a message to the subject auth.users.roles.assign, which is then processed asynchronously by the Auth Service that subscribes to this subject.

We incorporated the Request-Reply pattern, which NATS makes simple and efficient using its core pub/sub mechanism. In this pattern, a request is published on a subject with a unique "inbox" reply subject. Responders send their replies to the inbox, enabling real-time responses. This approach is particularly useful for scenarios requiring immediate feedback.

To distribute the workload randomly across multiple instances, the Auth Service subscribes as part of a queue group, ensuring messages are distributed to different instances. NATS automatically manages to scale responders through these groups and ensures reliability with features like "drain before exiting" to process pending messages.

In the next Golang example, we prepare a payload and publish it as a request to a mentioned subject from the Common service using NATS. This demonstrates how the Request-Reply pattern enables sending data to a subject and awaiting a response.


func NATSRequestAssignCompanyRoleForUser(
    nc *nats.Conn,
    userID string,
    roleID string,
    timeout int,
) error {
    // subject -> 'auth.users.roles.assign'
    subject := models.Nats.Subjects.UsersRoleAssign
    
    payload := models.RoleAssignmentPayload{
        UserID:  userID,
        RoleIDs: []string{roleID},
    }

    payloadBytes, err := json.Marshal(payload)
    if err != nil {
        return fmt.Errorf("failed to marshal payload: %w", err)
    }

    msg, err := nc.Request(subject, payloadBytes, time.Duration(timeout)*time.Second)
    if err != nil {
        return fmt.Errorf("failed to send NATS request: %w", err)
    }
    
    var response map[string]interface{}
    if err := json.Unmarshal(msg.Data, &response); err != nil {
        return fmt.Errorf("failed to unmarshal response: %w", err)

    }
    
    success, ok := response["success"].(bool)
    if !ok || !success {
        return fmt.Errorf("role assignment failed, response: %v", response)
    }

    return nil
}

In this example, we set up a subscriber with a queue group that listens to the same subject in the Auth service. The queue group ensures load balancing among subscribers, while the handler processes the requests with the relevant business logic, sending responses back to the requester.


func SubscribeToRoleAssignQueue(
    nc *nats.Conn, handler func(msg *nats.Msg),
) error {
    _, err := nc.QueueSubscribe(
        models.Nats.Subjects.UsersRoleAssign,
        models.Nats.Queues.UserRolesAssign,
        func(msg *nats.Msg) {
            handler(msg)
        })
    if err != nil {
        return err
    }
    return nil
}

In a typical pub/sub setup, if a service fails or is unavailable, there’s no automatic way to repeat the message, and it can fail silently. To address this, we turned to JetStream, which provides message persistence and reliable delivery. With JetStream, even if a service goes down, messages can be reprocessed once the service is back online, ensuring no data is lost and improving overall system reliability.

3. Implementing JetStream

JetStream added persistence to our messaging:

  • Streams: We defined streams to capture messages, grouping related data for efficient processing. For example, an stack.delete could store all stacks destroying messages, ensuring messages are retained and available for subscribers even during downtime.

    In the example below, we defined a JetStream stream named STACKS for managing testing stack operations. It subscribes to a single subject, stack.delete but multiple subjects can be specified. The stream has a 1GB storage limit (maxBytes) and uses file storage with three replicas for fault tolerance. The retention policy is set to workqueue, ensuring messages are retained until processed, and once a message is acknowledged, it will be deleted from the stream. It connects to the specified NATS server instances for message handling.


apiVersion: jetstream.nats.io/v1beta2
kind: Stream
metadata:
  name: stacks
spec:
  name: STACKS
  description: "Manage stack operations"
  subjects: ["stack.delete"]
  maxBytes: 1073741824
  storage: file
  replicas: 3
  retention: workqueue
  servers:
    - "nats://nats-headless.nats-system:4222"

  • Durable Subscriptions: Services could subscribe to streams and resume from where they left off, ensuring no data loss.

    To provide flexibility and control over JetStream and consumer (a component that subscribes to a stream and processes the messages stored in that stream), we manage configurations through a manifest chart using JetStream Kubernetes controller called NACK, minimizing the need for code editing and rebuilding.

    In the code, only minimal edits are required for specifying the subject, consumer, and queue group names. This approach ensures the configuration of streams and consumers is easily adjustable.

    Additionally, we use push mode for streams, where messages are handled when placed in the queue. For durable queue consumers, the consumer and delivery group names must be the same to maintain consistency and work as expected.

  • Backoff and Acknowledgments: We use backoff in consumer configuration to control the number of retry attempts for message redelivery. Additionally, we set ackWait and maxDeliver to define how long to wait before knowing if a message is acknowledged and after will be delivered.

    In some places, we use backoff, while in others, we use ackWait with maxDeliver. You can use either backoff or ackWait, but not both together: for multiple retries, backoff is preferred; for fewer retries, ackWait is set to the execution time of your handler plus an additional 20-30% buffer, ensuring sufficient time to prevent premature exits and unacknowledged message.

    We also manually acknowledge messages after executing code, particularly in cases where validation fails due to invalid data, as there’s no need to redeliver the message. This helps to avoid unnecessary retries.

    The next configuration sets up a JetStream consumer named stack-delete for the deletion of infrastructure stacks. It subscribes to the stack.delete subject same as in stream subjects(via filterSubject) and uses a durable name STACK_DELETE, ensuring message delivery resumes from where it left off.


apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
  name: stack-delete
spec:
  ackPolicy: explicit
  ackWait: 20m
  deliverGroup: STACK_DELETE
  deliverSubject: deliver.stack.delete
  deliverPolicy: all
  description: Delete stack resources
  durableName: STACK_DELETE
  filterSubject: stack.delete
  maxAckPending: 1000
  maxDeliver: 5
  replayPolicy: instant
  servers:
    - "nats://nats-headless.nats-system:4222"
  streamName: STACKS

An example of using backoff instead of ackWait: By setting the desired retry interval instead of using ackWait, we ensure the total backoff interval is less than the maxDeliver value, or it will fail during creation/update. If there’s free interval capacity, it will reattempt with the last backoff interval.

...
spec:
  ackPolicy: explicit
  backoff:
    - 1m
    - 5m
    - 10m

Key settings include:

  • ackPolicy: Explicit acknowledgment ensures messages are redelivered if not acknowledged.

  • ackWait: Set to 20 minutes to accommodate infrastructure destruction that can take up to 10-15 minutes in some cases.

  • deliverGroup & deliverSubject: Enables queue group-based delivery to STACK_DELETE, ensuring load balancing among subscribers.

  • maxAckPending: Limits unacknowledged messages to 1,000.

  • maxDeliver: Allows up to 5 delivery attempts per message, retrying every 20 minutes. If the message is not acknowledged after 5 attempts, it will remain in the stream.

  • replayPolicy: Instant replay delivers messages as quickly as possible.

  • servers: The consumer connects to the STACKS stream on specified NATS servers for processing messages.

Next, we send a message to the stack.delete subject to request the deletion of a stack (the following example is written in Python). The process is straightforward: we create a message with the necessary information (userhash and test_id), and then publish it to the NATS server. Once the message is sent, we close the connection and return a response indicating whether the operation was successful or not.


async def delete_infra_stack(
    userhash: str,
    test_id: str,
) -> Dict[str, str]:

    try:
        nc = NATS()
        await nc.connect(servers=[NATSConfig.server_url])

        message = {"candidateId": userhash, "testId": test_id}

        await nc.publish(
            subject=NATSConfig.sub_stack_delete,
            payload=json.dumps(message).encode("utf-8"),
        )

        await nc.close()

        response = {
            "success": True,
            "message": f"Published {NATSConfig.sub_stack_delete} for {userhash}-{test_id}",
        }
    except Exception as e:
        response = {
            "success": False,
            "message": str(e),
        }

    return response

In the next code snippet written in Golang (we use multiple languages for our backend code), the consumer subscribes to the stack.delete subject using the STACK_DELETE durable name. This allows the consumer to handle stack deletion requests while maintaining message persistence and retry logic as configured in JetStream. As you may notice subscribing is pretty straightforward as we manage the consumer configuration through the chart, which simplifies setup and allows easy adjustments without complex code changes.


func SubscribeToJSDestroyStack(js nats.JetStreamContext, svc Service) error {
    subject := Nats.Subjects.StackDelete
    durableName := Nats.DurableName.StackDelete

    _, err := js.QueueSubscribe(subject, durableName, func(msg *nats.Msg) {
        handleDeleteStack(msg, svc)
    }, nats.Durable(durableName), nats.ManualAck())

    if err != nil {
        return fmt.Errorf("Error subscribing to %s: %v", subject, err)
    }

    return nil
}

func handleDeleteStack(msg *nats.Msg, svc Service) {
    var req deleteStackRequest
    if err := json.Unmarshal(msg.Data, &req); err != nil {
        // ack on bad request data
        msg.Ack()
        return
    }

    if _, err := svc.DeleteStack(context.Background(), req.TestId, req.CandidateId, msg); err == nil {
        // ack on success
        msg.Ack()
    }
}

4. Testing and Optimisation

We rigorously tested the system under load to ensure reliability and fine-tuned the configurations for optimal performance. Through this process, we identified the ideal settings for our message flow, ensuring efficient redelivery and minimal retries.

Challenges and Lessons Learned

Integrating NATS into our system posed several challenges, each of which provided valuable lessons in how to leverage NATS' features more effectively:

  1. Request/Reply and Durable Subscriptions:

    Initially, we thought the request/reply pattern would work well for durable subscriptions, as it seemed like a good way to ensure that every request would be retried in case of failure. However, we quickly realized that request/reply is more suited for real-time, immediate communication rather than long-term durability.

    For durability, JetStream turned out to be the better option, as it ensures messages are stored persistently and retried until successfully processed. However, JetStream only delivers each message to a single designated consumer (the one configured to handle it), rather than broadcasting it to all subscribers.

  2. Consumer and Queue Group Names:


    We learned that for durable consumers to function properly, the consumer name and the queue group must be the same. If they don't match, the consumer won't subscribe to the stream, leading to issues in message delivery and distribution.

    This realization came after some trial and error. We tried subscribing to durable subscriptions but encountered errors. To understand what went wrong, we dug into the source code of the SDK and discovered the importance of matching the consumer name and queue group. Surprisingly, we didn’t find this mentioned clearly in the documentation, or perhaps we missed it.

  3. Backoff vs. AckWait:

    At first, we experimented with using both backoff and ackWait together, thinking it would allow us to fine-tune the retry behavior. We expected ackWait to control the waiting period for message acknowledgment, and then back off would manage retries with delays.

    We first applied changes to the settings through Helm, and there were no errors, so we thought the changes were successfully applied. However, during testing, we noticed that the behavior wasn't as expected. When we checked the settings using NATS-Box Kubernetes pod, we found that the changes hadn’t taken effect. We then tried to edit the configurations directly in NATS-Box but encountered an error stating that the settings were not editable. This led to further investigation, as we realized that only one of either ackWait or backoff should be used to make it work.

  4. Manual Acknowledgment:

    One of the key lessons was the importance of manual acknowledgment. During our tests, we encountered situations where, even though the handler failed for some subscriptions, the message was still automatically acknowledged.

    For instance, when an internal server error occurred, the message was considered acknowledged even though it wasn’t fully processed. We initially assumed that the acknowledgment would happen automatically if the message was successfully handled, similar to how HTTP requests typically behave.

    However, when we moved to manual acknowledgment and controlled the timing ourselves, it worked perfectly. This change prevented false positives and ensured that messages weren’t prematurely acknowledged, even when an error or timeout occurred.

  5. Testing with NATS-Box:

    NATS-Box(available as part of NATS deployment) became an invaluable tool for us in testing and creating configurations. It allowed us to experiment and understand the impact of different settings on system behavior, helping us refine our approach to ensure optimal performance in real-world scenarios.

    As we mentioned earlier, it helped us uncover small misunderstandings and nuances that weren't immediately obvious, giving us a deeper insight into how our configurations were being applied.

Conclusion

In conclusion, integrating NATS into our system proved to be a fast and efficient solution for our messaging needs. It wasn't without its challenges, but through testing and exploration, we were able to fine-tune the configurations to fit our needs. While we started with a simple setup, we may expand the use of NATS beyond internal communication to incorporate more features like monitoring and dead-letter queues. Additionally, we are considering replacing more of our internal architecture communication with NATS' pub/sub, and even potentially using NATS for external communication, replacing some of our REST APIs.

Based on our experience, using NATS with JetStream for durable messaging has proven to be a solid solution for ensuring reliable communication in our system. If you're looking to improve your system’s communication and explore event-driven architecture, we recommend considering NATS as a scalable and dependable choice, particularly for internal communication needs.

Next
Next

Building a Strong IT and DevOps Team for Grocery Stores and Retail Businesses During Natural Disasters