A lightweight, high-performance task queue built on Node JS, Redis and RSMQ. Includes a broker for queue management and routing, plus a worker framework for processing messages.
See the Why do I need a task queue? section for more information on using a task queue.
Overview
- Messages are stored in a Redis queue
- Messages are received by the broker which routes them to workers for processing
- Messages are hidden from the queue while being processed
- Messages are unhidden from the queue after timeout or failure
- Messages are deleted only after successful processing or after exceeding the maximum retry value
- Super-fast async task processing
- Optional throttling to avoid congestion
- Scheduled processing to defer specific messages
- Logging and robust error handling
Requirements
- Node.js version 4.7 or greater - Redis version 2.6 or greater, due to RSMQ's use of the Redis EVAL command (LUA scripts)Installation
- Install the @dadi/queue module to your project:
`npm install @dadi/queue --save`
- Ensure you have a Redis server running and accessible
- config, log and workers directories will be automatically created in the root of your project, if they don't already exist
- Amend the configuration file config/config.development.json according to the following section
- Require the @dadi/queue module from your project:
`require('@dadi/queue')`
- Run the project to start listening for messages:
`npm start`
Configuration
- queue
- broker
- workers
- logging
* **enabled**: (*boolean; default = false*) Enable or disable access logging
Message specific rate limiting
It is possible to rate limit message processing based on message content speifically. To do this, add configurations tobroker.throttle.messages
in the following format:{
"name": "example-throttle",
"regex": "^example-.*$",
"regexOpts": "i",
"unit": "second",
"value": 1,
"discard": true
}
Example config with multiple message specific configurations:
{
"queue": {
"host": "127.0.0.1",
"port": 6379
},
"broker": {
"queue": "myqueue",
"interval": [ 0, 1, 5, 10 ],
"retries": 10,
"timeout": 30,
"throttle": {
"workers": 5,
"queue": {
"unit": "second",
"value": 1
},
"messages": [
{
"name": "ten-per-second",
"regex": "^tps-.*$",
"regexOpts": "i",
"unit": "second",
"value": 10,
"discard": true
},
{
"name": "one-per-minute",
"regex": "^opm-.*$",
"regexOpts": "i",
"unit": "minute",
"value": 1
}
]
}
},
"workers": {
"path": "./workers"
},
"logging": {
"enabled": true,
"level": "info",
"path": "./log",
"filename": "myQueue",
"extension": "log",
"accessLog": {
"enabled": false
}
}
}
Sending messages
In most cases, the easiest way to send a message to the queue is to use @dadi/queue-wrapper from within your app.See the following related projects for other ways to interact with the queue:
Receiving messages
Messages sent to the queue will be received by the broker and routed to a worker. During this time the message will be unavailable to other worker processes.Simple addressing
A message will be routed to a worker module in the workers directory if one exists with a matching filename.For example, in the @dadi/queue module there is an example worker in a file called hello-world.js.
This worker would be executed when the broker receives the message:
hello-world
.Compound addressing
Messages can optionally contain multiple addresses separated by a colon. In this case, the broker will attempt to traverse a hierarchy in the workers folder.For example, the following message…
sms:send-reminder
…would be routed to the following worker…
sms/send-reminder.js
Message data
In addition to an address, messages can also contain data that will be passed to a worker when the message is processed.Any part of the message following the worker address is passed as data to the worker.
To continue the example above, the following message…
sms:send-reminder:123456
…would be routed to the following worker…
sms/send-reminder.js
…with the string '123456' passed as data.
Workers
A worker should export a function that receives 3 parameters:req
message
: The full message
address
: The parts of the message containing its route
data
: Any additional parts of the message
retries
: The remaining number of times that this message will be retried before being deleted
timeout
: A Date object of when this message will be placed back on the queue
age
: A Date object of when this message was received by the broker
sent
: A Date object of when this message was sentqueue
: An instance of the queue itself for sending further messagesdone
: A function to call when processing is complete
An example worker
// ./workers/hello-world.js
module.exports = (req, queue, done) => {
console.log('hello world')
done()
}
Success
On success, a worker should calldone()
, which will notify the broker to delete the message. This will also release the throttle if it is currently in operation.Error
On error, a worker should calldone(err)
, passing either an error string or an Error
object, which will notify the broker to log the error. The message will remain in the queue and will be retried if the message has any attempts remaining.Failure
Messages are deleted after they exceed the maximum number of retry attempts. Workers that need to perform additional processing when a message fails should testif (!req.retries)
in their error handling.Timeout
Workers must restrict their processing time to less then the timeout value specified in the config. After the timeout value the message will be unhidden from the queue and may be processed by other workers.Be aware of any 3rd party APIs and ensure the appropriate timeout values are set.
Why do I need a task queue?
Common uses
- Image processing
- Push notifications
- Big Data processing
- API integrations
- Sending email / SMS
- Replacing CRON
- Processing webhooks
Benefits
- Decoupling: by creating a layer in-between processes with an implicit, data-based interface
- Redundancy: by persisting data until it has been fully processed
- Scalability: increase the processing rate by simply adding another process
- Resiliency: messages can still be added to the queue even if the processing worker is offline
- Guarantees: delivery is guaranteed and messages are processed in the order received
- Scheduling: processing can be deferred until system resources are optimal
Case study
An online shop may interact with a number of external APIs when a customer places an order: CRM system, payment gateway, fraud protection, email confirmation, newsletter signup. Performing these interactions synchronously makes the checkout process slow, tightly coupled to the external APIs and error prone, due to the number of failure points.Using a task queue, each API interaction can become a worker module. On order confirmation, the checkout process simply sends the relevant messages to the queue, e.g. create-customer, create-transaction, etc., then shows the confirmation page. The user experience is fast, the API code is decoupled from the checkout and the workers are retried on error (and can perform an action after a number of attempts, such as sending a notification email).
License
DADI is a data centric development and delivery stack, built specifically in support of the principles of API first and COPE.Copyright notice (C) 2019 DADI+ Limited All rights reserved
This product is part of DADI.
DADI is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version ("the AGPL").
If you wish to use DADI outside the scope of the AGPL, please contact us at info@dadi.co for details of alternative licence arrangements.
This product may be distributed alongside other components available under different licences (which may not be AGPL). See those components themselves, or the documentation accompanying them, to determine what licences are applicable.
DADI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
The GNU Affero General Public License (AGPL) is available at http://www.gnu.org/licenses/agpl-3.0.en.html.
A copy can be found in the file license.md distributed with these files.
This copyright notice MUST APPEAR in all copies of the product!