pulls
pulls - readable streams for Node.js and Browsers.Naturally readable streams are everything that implements async iterable protocol or iterable protocol.
Streams are:
- Array
- Async Iterable
- Iterable objects
- Generators
- and Node.js readable streams.
It's lightweight with 200LOC and fast - more than 2M/s throughput.
Basic usage examples
Consume array as stream for ex. and pipeline asmap -> filter -> reduce
import {pipeline, stream} from "pulls";
import {map} from "pulls/map";
import {filter} from "pulls/filter";
import {reduce} from "pulls/reduce";
import {tap} from "pulls/tap";
import {sequence} from "pulls/sequence";
const expected = await pipeline(
stream([1, 2, 3]), // automatically converts array to stream as async interable
map((x) => x * 2), // multiple; = 2, 4, 6
filter((x) => x > 2), // filter only x > 2; = 4, 6
// types input:number, output:number[]
reduce<number, number[]>((acc, value) => [...acc, value], []), // collect to array as expected
tap(console.log) // prints output
);
// expected: [4, 6]
expect(expected).toEqual([4, 6])
One more example with async stream.
It takes 1s to calculate 1M dataset.// consume async iterator as stream and sum some values of 1m
const expected = await pipeline(
sequence(1000000), // creates low CPU/memory stream
map((x) => x * 2), // multiple; = 2, 4, 6...
filter((x) => x > 2), // filter only x > 2; = 4, 6....
// types input:number, output:number
reduce<number, number>((acc, value) => acc + value, 0), // sum values
tap(console.log) // prints output; 999998999998
);
// expected: 999998999998
expect(expected).toEqual(999998999998)
It's pipeline to consume async generator as stream as ex.
import {pipeline} from "pulls";
import {tap} from "pulls/tap";
// consume generator/async iterable protocol as stream
async function* Generate123() {
yield 1
yield 2
yield Promise.resolve(3)
}
const expected: number[] = [];
const collect = (value:number) => expected.push(value);
await pipeline(
Generate123(), // consume generator as stream
tap<number>(collect) // collect output to array as expected
)
// expected: [1, 2, 3]
expect(expected).toEqual([1, 2, 3])
Guide
TBDDocumentation
TBDStream examples
To get a point for a readable sources just check the information that I mentioned before. async iterable protocol or iterable protocol and more about iterating over async iterables. Or it could be enough to check examples here.// stream 1
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
i: 0,
next() {
if (this.i < 3) {
return Promise.resolve({ value: this.i++, done: false });
}
return Promise.resolve({ done: true });
}
};
}
};
// stream 2
const myAsyncIterable = {
async* [Symbol.asyncIterator]() {
yield "hello";
yield "async";
yield "iteration!";
}
};
// stream 3
let mySyncIterator = {
next: function() {
// ...
},
[Symbol.iterator]: function() { return this; }
};
// stream 4 / Node.js
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
// stream 5 / Node.js
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
// stream 6 / Node.js
import {Readable} from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
// stream 7 / Node.js
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
Node.js streams cooperation
Node.js streams are readable and implements async iterable protocol and could be consumed in pipelines.import {pipeline} from "pulls";
import {Readable} from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = Readable.from(gen(), {encoding: 'utf8'});
// example
await pipeline(readableStream, /* map(...), filter(...) */)
Good article how to convert async iterable to Node.js readable and back could help you to manage Node.js streams.