pipeline-pipe

Creates a parallel transform from async function

Downloads in past

Stats

StarsIssuesVersionUpdatedCreatedSize
pipeline-pipe
1870.3.03 years ago5 years agoMinified + gzip package size for pipeline-pipe in KB

Readme

pipeline-pipe Node CI npm version downloads
Parallel transform and some utilities for Node Object Stream lovers

Why

Install

npm install pipeline-pipe

pipe(fn, opts)

Example usage:
// Example to scrape HTML and store titles of them in DB:

const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3]),

    // Request HTML asynchronously in 16 parallel
    pipe(async postId => {
      const json = await getPost(postId);
      return json;
    }, 16),

    // Synchronous transformation as Array.prototype.map
    pipe(json => parseHTML(json.postBody).document.title),

    // Synchronous transformation as Array.prototype.filter
    pipe(title => title.includes('important') ? title : null),

    // Asynchronous in 4 parallel
    pipe(async title => {
      const result = await storeInDB(title), 4);
      console.info(result);
    }, 4)

    (err) => console.info('All done!')
);

Types:
import { Transform, TransformOptions } from 'stream';

type ParallelTransformOpitons =
  | number
  | TransformOptions & { maxParallel?: number, ordered?: boolean };

export default function pipe(
    fn: (data: any) => Promise<any> | any,
    opts?: ParallelTransformOptions,
): Transform;
 ```

| Option property | Default value | description |
| --- | --- | --- |
| **`maxParallel`**  | `10` | Number of maximum parallel executions. |
| **`ordered`**      | `true` | Preserving order of streaming chunks. |

A number can be passed to `opts`. `pipe(fn, 20)` is same as `pipe(fn, {maxParallel: 20})`.

## Some utility functions

### pipeline(stream, stream, ...)

A promisified version of `require('stream').pipeline`. Equivalent to:

```js
const { promisify } = require('util');
const { pipeline: _pipeline } = require('stream');
const pipeline = promisify(_pipeline);

Example:
const { pipeline, pipe } = require('pipeline-pipe');

await pipeline(
    readable,
    pipe(chunk => chunk.replace('a', 'z')),
    pipe(chunk => storeInDB(chunk)),
);
console.log('All done!');

concat(size)

It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.
Example:
const { pipeline } = require('stream');
const { concat, pipe } = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3, 4, 5]),
    concat(2),
    pipe(console.log),  // [ 1, 2 ]
                        // [ 3, 4 ]
                        // [ 5 ]
    (err) => console.info('All done!'),
);

split()

Creates a Transform to split incoming Array chunk into pieces to subsequent streams.
const { pipeline } = require('stream');
const { split, pipe } = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3]),
    pipe(page => getPostsByPage(page)),
    pipe(json => json.posts),               // Returns an array of posts
    split(),                                // Splits the array into each posts
    pipe(post => storeInDB(post.title)),    // Now the argument is a post
    (err) => console.info('All done!')
);

License

MIT