跪拜 Guibai
← Back to the summary

NSQ: The Go Message Queue That Replaces Kafka's Complexity With Three Binaries

Today I want to talk about an open-source project that is both veteran and practical — NSQ. If you do backend development, especially if you work a lot with the Go language, this name should not be unfamiliar. Even if you haven't used it, NSQ is definitely worth knowing about in the message queue technology selection.

This article will detail NSQ's core concepts, pros and cons, comparisons with other message queues, and quickly set up an environment via Docker, finally providing complete Go code examples.

Without further ado, let's get started!

1. Why Pay Attention to NSQ?

Have you ever encountered a situation where, in an instant, your business system hits a traffic peak, the database is instantly overwhelmed, and requests just time out? At this point, you need a message queue to shave peaks and fill valleys.

NSQ is a real-time distributed messaging platform open-sourced by Bitly. Written in Go, it was officially open-sourced in 2013, initially used to support the high throughput demands of Bitly's short-link service, and later gradually adopted by companies like Docker and Stripe.

NSQ's design goal is to handle billions of messages per day at scale, featuring a distributed and decentralized topology that has no single point of failure, fault tolerance, high availability, and guarantees reliable message delivery.

2. NSQ's Three Core Components

Before we get hands-on, we need to get to know the three key roles in the NSQ family. Think of NSQ as a modern logistics center:

3. Core Concepts: Topic & Channel

Before writing code, understanding NSQ's soul design — Topic and Channel — is crucial:

A major benefit of this design: different channels are isolated from each other. Even if consumers in one channel process slowly, it does not affect the normal consumption of other channels.

4. Analysis of NSQ's Pros and Cons

4.1 Core Advantages

According to official documentation and community feedback, NSQ has the following significant advantages:

Advantage Description
Decentralized Architecture No single point of failure (SPOF), supports distributed topology
Horizontal Scaling No central broker, seamlessly add more nodes to the cluster
Low Latency Uses push mode, message real-timeness is very good
Simple Deployment Compiled binary has no runtime dependencies, all parameters specified via command line
Built-in Admin Interface nsqadmin provides intuitive Web monitoring and management
Data Format Agnostic Messages can be JSON, MsgPack, Protocol Buffers, or any format
Multi-language Support Official Go and Python libraries, community provides various clients
Hybrid Memory+Disk Storage Messages exceeding the memory high-water mark are transparently saved to disk

4.2 Limitations

Of course, NSQ is not a panacea; it has some design trade-offs to be aware of:

Limitation Description
Messages Not Persisted by Default Primarily an in-memory messaging platform, but can be configured for persistence
At-Least-Once Delivery Messages may be delivered multiple times, requiring consumers to handle idempotency
No Ordering Guarantees Messages may be out of order due to requeue, memory and disk storage, etc.
No Message Replication No built-in replication mechanism; node failure may cause data loss
No Dead Letter Queue No built-in dead letter handling mechanism for failed messages
Messages Not Replayable Messages are deleted after consumption acknowledgment, cannot be re-consumed like Kafka

5. Simple Comparison with Other Message Queues

To give a clearer picture of NSQ's positioning, here's a simple comparison with several mainstream message queues:

Feature NSQ Kafka RabbitMQ NATS
Language Go Scala/Java Erlang Go
Message Model Topic-Channel Topic-Partition Exchange-Queue Subject
Push/Pull Push Pull Push Push
Persistence Memory+Disk Full Disk Memory/Disk Memory/JetStream
Ordering Guarantee Not Supported Ordered within Partition Guaranteed within Queue Ordered per Connection
Delivery Guarantee At-Least-Once At-Least-Once/Exactly-Once At-Least-Once At-Most-Once/At-Least-Once
Delayed Messages Supported (in-memory priority queue, up to 2 hours) Not Supported Supported (requires plugin) Not Supported
Dead Letter Queue Not Supported None (managed via offset) Supported Supported
Admin Interface Built-in nsqadmin Requires third-party tools Built-in Built-in
Use Cases Real-time push, microservice decoupling Log collection, big data stream processing Enterprise applications, complex routing Cloud-native, high-performance real-time communication

From the table above, NSQ's strengths lie in being simple, low-latency, and easy to deploy, suitable for small to medium-scale scenarios requiring high real-timeness without strict message ordering.

6. One-Click NSQ Environment Setup via Docker

6.1 Preparation

Ensure Docker and Docker Compose are installed on your machine.

6.2 Write docker-compose.yml

We will start three services on the same machine to simulate a small cluster:

version: '3'
services:
  # Service discovery and coordination center
  nsqlookupd:
    image: nsqio/nsq:latest  # Use official image
    container_name: nsqlookupd
    command: /nsqlookupd      # Start lookupd service
    ports:
      - "4160:4160"           # tcp port, used by nsqd
      - "4161:4161"           # http port, used by admin and client queries
    networks:
      - nsq-network

  # Message core daemon
  nsqd:
    image: nsqio/nsq:latest
    container_name: nsqd
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160  # Tell nsqd where to register
    ports:
      - "4150:4150"           # tcp port, for sending/receiving messages
      - "4151:4151"           # http port, can send messages directly via API
    depends_on:
      - nsqlookupd            # Ensure lookupd starts first
    networks:
      - nsq-network

  # Web admin interface
  nsqadmin:
    image: nsqio/nsq:latest
    container_name: nsqadmin
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # Connect to lookupd's http port
    ports:
      - "4171:4171"           # Port accessed via browser
    depends_on:
      - nsqlookupd
    networks:
      - nsq-network

networks:
  nsq-network:
    driver: bridge

6.3 Start and Verify

# Execute in the directory containing docker-compose.yml
docker-compose up -d

# Check container status
docker ps

Seeing all three containers with status Up indicates successful startup. Now open a browser and visit http://localhost:4171, you should see the beautiful NSQAdmin interface.

7. Hands-On: Complete Go Code Example

7.1 Preparation

First, install the Go client library:

go get -u github.com/nsqio/go-nsq

7.2 Producer Code

Below is a complete producer example that reads messages from standard input and sends them to NSQ:

// producer/main.go
package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"

	"github.com/nsqio/go-nsq"
)

// NSQ Producer Demo

var producer *nsq.Producer

// Initialize producer
func initProducer(addr string) (err error) {
	config := nsq.NewConfig()
	// Some parameters can be configured
	// config.MaxAttempts = 5
	
	producer, err = nsq.NewProducer(addr, config)
	if err != nil {
		fmt.Printf("create producer failed, err:%v\n", err)
		return err
	}
	
	// Check if connection is healthy
	err = producer.Ping()
	if err != nil {
		fmt.Printf("producer ping failed, err:%v\n", err)
		return err
	}
	
	return nil
}

func main() {
	// nsqd's TCP address
	nsqdAddr := "127.0.0.1:4150"
	err := initProducer(nsqdAddr)
	if err != nil {
		fmt.Printf("init producer failed, err:%v\n", err)
		return
	}
	
	fmt.Println("producer started, please input messages (input 'Q' to quit):")
	
	reader := bufio.NewReader(os.Stdin) // Read from standard input
	for {
		data, err := reader.ReadString('\n')
		if err != nil {
			fmt.Printf("read string from stdin failed, err:%v\n", err)
			continue
		}
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q" { // Input Q to quit
			break
		}
		
		// Publish data to 'topic_demo'
		topicName := "topic_demo"
		err = producer.Publish(topicName, []byte(data))
		if err != nil {
			fmt.Printf("publish msg to nsq failed, err:%v\n", err)
			continue
		}
		
		fmt.Printf("published message: %s\n", data)
	}
	
	// Stop the producer
	producer.Stop()
	fmt.Println("producer stopped")
}

7.3 Consumer Code

Below is a complete consumer example that receives and processes messages from NSQ:

// consumer/main.go
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/nsqio/go-nsq"
)

// NSQ Consumer Demo

// MyHandler is a consumer type that needs to implement the HandleMessage interface
type MyHandler struct {
	name string
}

// HandleMessage processes messages
// This function is called when a message is pushed to this consumer
func (h *MyHandler) HandleMessage(message *nsq.Message) error {
	// Message content is in message.Body, in []byte format
	// message.ID is the unique ID of the message
	// message.Attempts is the number of retry attempts for the message
	
	log.Printf("[%s] received message: %s (id: %s, attempts: %d)", 
		h.name, string(message.Body), message.ID, message.Attempts)
	
	// Simulate business processing time
	time.Sleep(100 * time.Millisecond)
	
	// Returning nil indicates the message was processed successfully, NSQ will mark it as complete
	// If an error is returned, NSQ will retry based on configuration
	return nil
}

func main() {
	// 1. Configure consumer
	config := nsq.NewConfig()
	// Set maximum inflight messages
	config.MaxInFlight = 100
	// Set retry attempts
	config.MaxAttempts = 5
	
	// 2. Create consumer instance
	// Parameters: topic name, channel name, config
	// Note: Even if the channel doesn't exist, it will be automatically created upon subscription
	topicName := "topic_demo"
	channelName := "channel_demo"
	consumer, err := nsq.NewConsumer(topicName, channelName, config)
	if err != nil {
		log.Fatal(err)
	}
	
	// 3. Add our custom handler
	handler := &MyHandler{name: "Worker-1"}
	consumer.AddHandler(handler)
	
	// Multiple handlers can also be added (not recommended)
	// consumer.AddHandler(&MyHandler{name: "Worker-2"})
	
	// 4. Set log level
	consumer.SetLoggerLevel(nsq.LogLevelInfo)
	
	// 5. Connect to nsqlookupd (recommended way, can auto-discover all nsqd producers)
	// Here we connect to the nsqlookupd address we started with Docker
	lookupdAddr := "127.0.0.1:4161"
	err = consumer.ConnectToNSQLookupd(lookupdAddr)
	if err != nil {
		log.Fatal(err)
	}
	
	// Can also connect directly to nsqd (not recommended for production)
	// err = consumer.ConnectToNSQD("127.0.0.1:4150")
	// if err != nil {
	//     log.Fatal(err)
	// }
	
	fmt.Println("consumer started, waiting for messages...")
	
	// 6. Listen for exit signals
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan
	
	// 7. Graceful stop
	fmt.Println("stopping consumer...")
	consumer.Stop()
	
	// Wait for consumer to fully stop
	<-consumer.StopChan
	fmt.Println("consumer stopped")
}

7.4 Run Test

  1. Start the consumer:
cd consumer
go run main.go

You will see the output: "consumer started, waiting for messages..."

  1. Start the producer (open a new terminal):
cd producer
go run main.go
  1. Input messages in the producer terminal:
hello NSQ!
published message: hello NSQ!

this is my first message
published message: this is my first message

Q
producer stopped
  1. Observe the consumer terminal:
2025/03/09 15:30:45 [Worker-1] received message: hello NSQ! (id: 7fd8e2a1b3c4d5e6, attempts: 1)
2025/03/09 15:30:45 [Worker-1] received message: this is my first message (id: 8fe9f3b2c4d5e6f7, attempts: 1)

7.5 Send Messages via HTTP Interface

A very convenient feature of NSQ is its support for an HTTP interface, allowing messages to be sent without a client library:

# Send a message to topic_demo
curl -d 'Hello from HTTP!' 'http://127.0.0.1:4151/pub?topic=topic_demo'

Observe the consumer terminal; you should see this message received.

8. Visual Monitoring

Refresh http://localhost:4171 in your browser, and you will see:

9. Production Environment Recommendations

If you plan to use NSQ in a production environment, here are a few suggestions for reference:

  1. Message Persistence Configuration: If you don't want to lose messages, you can set --mem-queue-size=0, so all messages will be saved to disk.
  2. Deploy Multiple nsqlookupd: Although nsqlookupd nodes do not coordinate with each other, deploying multiple instances improves the availability of the discovery service.
  3. Consumer Idempotency: Since NSQ guarantees "at-least-once" delivery, consumers must handle idempotency.
  4. Clean Up Unused topics/channels Promptly: Once created, topics and channels persist indefinitely. Clear invalid ones promptly via the admin console or code to avoid resource waste.
  5. Monitoring and Alerting: Monitor the Depth metric in real-time via nsqadmin and set up backlog alerts.
Comments

Top 1 from juejin.cn, machine-translated. The original thread is authoritative.

ckshop

Look at NSQ's updates over the past decade or so. Even though it's been continuously updated, the number of features added is shockingly zero.