@heroku/kinesis

A stream implementation of Amazon's Kinesis forked from mhart/kinesis

Downloads in past

Stats

StarsIssuesVersionUpdatedCreatedSize
@heroku/kinesis
102.0.27 years ago8 years agoMinified + gzip package size for @heroku/kinesis in KB

Readme

kinesis

Build Status
A Node.js stream implementation of Amazon's Kinesis.
Allows the consumer to pump data directly into (and out of) a Kinesis stream.
This makes it trivial to setup Kinesis as a logging sink with Bunyan, or any other logging library.
For setting up a local Kinesis instance (eg for testing), check out Kinesalite.

NB: API has changed from 0.x to 1.x

Example

var fs = require('fs'),
    Transform = require('stream').Transform,
    kinesis = require('kinesis'),
    KinesisStream = kinesis.KinesisStream

// Uses credentials from process.env by default

kinesis.listStreams({region: 'us-west-1'}, function(err, streams) {
  if (err) throw err

  console.log(streams)
  // ["http-logs", "click-logs"]
})


var kinesisSink = kinesis.stream('http-logs')
// OR new KinesisStream('http-logs')

fs.createReadStream('http.log').pipe(kinesisSink)


var kinesisSource = kinesis.stream({name: 'click-logs', oldest: true})

// Data is retrieved as Record objects, so let's transform into Buffers
var bufferify = new Transform({objectMode: true})
bufferify._transform = function(record, encoding, cb) {
  cb(null, record.Data)
}

kinesisSource.pipe(bufferify).pipe(fs.createWriteStream('click.log'))


// Create a new Kinesis stream using the raw API
kinesis.request('CreateStream', {StreamName: 'test', ShardCount: 2}, function(err) {
  if (err) throw err

  kinesis.request('DescribeStream', {StreamName: 'test'}, function(err, data) {
    if (err) throw err

    console.dir(data)
  })
})

API

kinesis.stream(options)

new KinesisStream(options)

Returns a readable and writable Node.js stream for the given Kinesis stream
options include:
- region: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default) - credentials: an object with accessKeyId/secretAccessKey properties (resolved from env, file or IAM by default) - shards: an array of shard IDs, or shard objects. If not provided, these will be fetched and cached. - oldest: if truthy, then will start at the oldest records (using TRIM_HORIZON) instead of the latest - writeConcurrency: how many parallel writes to allow (1 by default) - cacheSize: number of PartitionKey-to-SequenceNumber mappings to cache (1000 by default) - agent: HTTP agent used (uses Node.js defaults otherwise) - timeout: HTTP request timeout (uses Node.js defaults otherwise) - initialRetryMs: first pause before retrying under the default policy (50 by default) - maxRetries: max number of retries under the default policy (10 by default) - errorCodes: array of Node.js error codes to retry on (`'EADDRINFO',
'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE']` by default)
- errorNames: array of Kinesis exceptions to retry on
(`['ProvisionedThroughputExceededException', 'ThrottlingException']` by default)
- retryPolicy: a function to implement a retry policy different from the default one - logger: an object which implements a log method, e.g. console.

kinesis.listStreams(options, callback)

Calls the callback with an array of all stream names for the AWS account

kinesis.request(action, data, options, callback)

Makes a generic Kinesis request with the given action (eg, ListStreams) and data as the body.
options include:
- region: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default) - credentials: an object with accessKeyId/secretAccessKey properties (resolved from env, file or IAM by default) - agent: HTTP agent used (uses Node.js defaults otherwise) - timeout: HTTP request timeout (uses Node.js defaults otherwise) - initialRetryMs: first pause before retrying under the default policy (50 by default) - maxRetries: max number of retries under the default policy (10 by default) - errorCodes: array of Node.js error codes to retry on (`'EADDRINFO',
'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE']` by default)
- errorNames: array of Kinesis exceptions to retry on
(`['ProvisionedThroughputExceededException', 'ThrottlingException']` by default)
- retryPolicy: a function to implement a retry policy different from the default one