nsqjs

The official NodeJS client for the nsq client protocol. This implementation attempts to be fully compliant and maintain feature parity with the official Go (go-nsq) and Python (pynsq) clients.

Greenkeeper badge Build Status

NPM

Usage

new Reader(topic, channel, options)

The topic and channel arguments are strings and must be specified. The options argument is optional. Below are the parameters that can be specified in the options object.

Reader events are:

Reader.MESSAGE and Reader.DISCARD both produce Message objects. Reader.NSQD_CONNECTED and Reader.NSQD_CLOSED events both provide the host and port of the nsqd to which the event pertains.

These methods are available on a Reader object:

Message

The following properties and methods are available on Message objects produced by a Reader instance.

new Writer(nsqdHost, nsqdPort, options)

Allows messages to be sent to an nsqd.

Available Writer options:

Writer events are:

These methods are available on a Writer object:

Simple example

Start nsqd and nsqdlookupd

# nsqdLookupd Listens on 4161 for HTTP requests and 4160 for TCP requests
$ nsqlookupd &
$ nsqd --lookupd-tcp-address=127.0.0.1:4160 &
const nsq = require('nsqjs')

const reader = new nsq.Reader('sample_topic', 'test_channel', {
  lookupdHTTPAddresses: '127.0.0.1:4161'
})

reader.connect()

reader.on('message', msg => {
  console.log('Received message [%s]: %s', msg.id, msg.body.toString())
  msg.finish()
})

Publish a message to nsqd to be consumed by the sample client:

$ curl -d "it really tied the room together" http://localhost:4151/pub?topic=sample_topic

Example with message timeouts

This script simulates a message that takes a long time to process or at least longer than the default message timeout. To ensure that the message doesn’t timeout while being processed, touch events are sent to keep it alive.

const nsq = require('nsqjs')

const reader = new nsq.Reader('sample_topic', 'test_channel', {
  lookupdHTTPAddresses: '127.0.0.1:4161'
})

reader.connect()

reader.on('message', msg => {
  console.log('Received message [%s]', msg.id)

  const touch = () => {
    if (!msg.hasResponded) {
      console.log('Touch [%s]', msg.id)
      msg.touch()

      // Touch the message again a second before the next timeout.
      setTimeout(touch, msg.timeUntilTimeout() - 1000)
    }
  }

  const finish = () => {
    console.log('Finished message [%s]: %s', msg.id, msg.body.toString())
    msg.finish()
  }

  console.log('Message timeout is %f secs.', msg.timeUntilTimeout() / 1000)
  setTimeout(touch, msg.timeUntilTimeout() - 1000)

  // Finish the message after 2 timeout periods and 1 second.
  setTimeout(finish, msg.timeUntilTimeout() * 2 + 1000)
})

Enable nsqjs debugging

nsqjs uses debug to log debug output.

To see all nsqjs events:

$ DEBUG=nsqjs:* node my_nsqjs_script.js

To see all reader events:

$ DEBUG=nsqjs:reader:* node my_nsqjs_script.js

To see a specific reader’s events:

$ DEBUG=nsqjs:reader:<topic>/<channel>:* node my_nsqjs_script.js

Replace <topic> and <channel>

To see all writer events:

$ DEBUG=nsqjs:writer:* node my_nsqjs_script.js

A Writer Example

The writer sends a single message and then a list of messages.

const nsq = require('nsqjs')

const w = new nsq.Writer('127.0.0.1', 4150)

w.connect()

w.on('ready', () => {
  w.publish('sample_topic', 'it really tied the room together')
  w.deferPublish('sample_topic', ['This message gonna arrive 1 sec later.'], 1000)
  w.publish('sample_topic', [
    'Uh, excuse me. Mark it zero. Next frame.', 
    'Smokey, this is not \'Nam. This is bowling. There are rules.'
  ])
  w.publish('sample_topic', 'Wu?',  err => {
    if (err) { return console.error(err.message) }
    console.log('Message sent successfully')
    w.close()
  })
})

w.on('closed', () => {
  console.log('Writer closed')
})

Changes