Nodejs Streams

Steven Chetwynd

JavascriptNodejs

Development

1729 Words … ⏲ Reading Time: 7 Minutes, 51 Seconds

2023-03-05 00:00 +0000


I’ve recently been working with streams in Node.js, or more accurately over a year ago I was working with streams and have recently looked back on the code and realised it needs some major refactoring.

As part of that work I feel I need to explain:

  • how to use a stream
  • the advantages for my use case
  • a cleaner implementation for my use case.

How to use a stream

In node there are 2 ways to implement a stream:

  1. Extend from the stream object,
  2. Via the stream constructor

Options 2 is the easiest way if the stream doesn’t require extra instance properties.

I’ve been using only Transform streams, to alter incoming data before outputting it to another stream, so all examples will be using Transform stream. Readable, Writable, Transform, and Duplex streams are all documented in the Node.js API docs, I would fully recommend using them as reference when implementing streams.

Extend from stream object:

import { Transform } from 'node:stream';

class MyStream extends Transform {
    constructor() {
        // Very usefull for working on object, otherwise the chunk can only be
        // buffers or strings
        super({ objectMode: true })
    }

    _transform(chunk, encoding, callback) {
        // do some alteration to the chunk,
        const result = chunk * 5;

        // push the result to the next stream
        this.push(result);

        // use the callback to pull the next chunk and recall this function
        return callback();
    }
}

const myStream = new MyStream();

Using constructor

import { Transform } from 'node:stream';

const myStream = new Transform({
    transform(chunk, encoding, callback) {
        // do some alteration to the chunk,
        const result = chunk * 5;

        // push the result to the next stream
        this.push(result);

        // use the callback to pull the next chunk and recall this function
        return callback();
    },
    // Very usefull for working on object, otherwise the chunk can only be
    // buffers or strings
    objectMode: true,
});

Its important to note here, the change between _transform and transform for the method name, and also using the constructor creates an instance of the class when extending the class creates a new class which can be instantiated many times.

My Use Case

I need to load a large file, perform a process on each line of the file, write the results to another file. The input file would be too big to load fully into memory, so I’ve used streams to process a small amount of data at a time.

Streams have this handy thing call back pressure built into them, what this does is prevent data from building up in memory behind a process which is running slow. If my process stream took 50ms to run but the input stream could read every 25ms we would get a load of data built up in memory waiting to be processed. Back pressure stops this by only reading off the previous stream once the current processing is finished.

This is an over simplification, there will be lots of optimisations done by v8 behind the curtain, stuff which I don’t understand.

Cool, so my processing can look like this:

import { Readable, Transform, pipeline } from 'node:stream';

const numbers = {
    data: ['1', '2', '3', '4', '5'],
    *[Symbol.iterator]() {
        for (const number_ of numbers.data) {
            yield number_;
        }
    },
};

const inputStream = Readable.from(numbers);

const myStream = new Transform({
    transform(chunk, encoding, callback) {
        const result = chunk * 5;
        this.push(result);
        return callback();
    },
});

pipeline(inputStream, myStream, () => console.log('Done'));

I’m using Readable.from(Array) to simulate an input stream here, pipeline to link all the streams together, and I’ve not included a writeable stream at the end because for this demo its not required.

That’s great, now I can process each item in the stream one by one, but my processing involves an async operation, I can stick an async on the transform method:

...
const myStream = new Transform({
    async transform (chunk, encoding, callback) {
        const result = await processChunk(chunk);
        this.push(result);
        return callback();
    },
});
...

Also great, however the async function can handle multiple requests concurrently, this could be a call to an API or using Workers on different threads. Above would still work but if I have 4 workers, I’m only making use of one at a time.

If we batch up the chunks, and when we get to the myStream we iterate over the chunks processing each of them like so:

...
const batcher = new Transform({
    transform(chunk, encoding, callback) {
        batcherBuffer.push(chunk);

        if (batcherBuffer.length === 4) {
            this.push([...batcherBuffer]);
            batcherBuffer = [];
        }

        return callback();
    },
    flush(callback) {
        this.push(batcherBuffer);
        return callback();
    },
    objectMode: true,
});

const myStream = new Transform({
    async transform(chunk, encoding, callback) {
        const promises = [];

        for (const data of chunk) {
            const promise = processChunk(data);
            promises.push(promise);
        }

        await Promise.all(promises);

        this.push(chunk);
        return callback();
    },
    objectMode: true,
});

pipeline(inputStream, batcher, myStream, () => console.log('Done'));

We’re getting closer here, but what we’ll see is every thing will be done in groups of 4 where the time take for that group is the time taken to do the longest process. If the chunks take different times to process;

  1. takes 25ms
  2. takes 25ms
  3. takes 1000ms
  4. takes 25ms

With our current code, we have 3 chunk processors idle for 975ms waiting for the long process to finish, if we could have them processing more chunks during this time we would speed up the overall processing of the file.

First thing is to delete the batcher we just added.

We’re now going to need a processor with state that can say if its finished its process, and a pool to manage those processors.

class Processor<T> {
    ready = true;
    promise: Promise<T | undefined> = Promise.resolve(undefined);

    process(data: number) {
        this.ready = false;
        this.promise = createTimeout(data);
        this.promise.then((result) => {
            this.ready = true
            return result;
        });
        return this.promise;
    }
}

class ProcessorPool {
    private processors: Processor[];

    constructor(processors: Processor[]) {
        this.processors = processors;
    }

    async process(data: number) {
        const processor = await this.getReadyProcessor();
        return processor?.process(data);
    }

    getProcessorPromises() {
        return this.processors.map((processor) => processor.promise);
    }

    async getReadyProcessor() {
        const promises = this.getProcessorPromises();
        await Promise.any(promises);

        return this.processors.find((processor) => processor.ready);
    }

    async waitForFreeProcessor() {
        await this.getReadyProcessor();
        return true;
    }

    flush() {
        return Promise.all(this.getProcessorPromises());
    }
}

const processorPool = new ProcessorPool([
    new Processor(),
    new Processor(),
    new Processor(),
    new Processor(),
]);

The Processor just wraps the promise so we can get the state. The ProcessorPool manages these processes when process is called it waits for a free processor and passes the data onto it.

const myStream = new Transform({
    async transform(chunk, encoding, callback) {
        await processorPool.waitForFreeProcessor();

        processorPool.process(chunk).then(() => this.push(chunk));

        return callback();
    },
    async flush(callback) {
        await processorPool.flush();
        return callback();
    },
    objectMode: true,
});

Before passing work to the processorPool we must wait for a free processor, this cannot be done as part of the process call because:

  • adding an await to the process call will result in processing only 1 chunk at a time
  • not having the await will result in a buffer in the processorPool with the entire file being read and being put into that buffer.

A flush method must be added because when the input stream is empty the pipeline will try to close, but we still have data in the processors, the flush method, empties that buffer. The then in the main transform method is then called pushing the result into the output stream.

There is one last maybe problem, taking our example from above:

  1. takes 25ms
  2. takes 25ms
  3. takes 1000ms
  4. takes 25ms

Because we are no longer doing the processing in order the output would also be out of order: 1, 2, 4, 3, with 3 taking the longest so it ends up last. If order isn’t important, then we’re grand, if order is important we need a re-ordered.

Re-ordering takes 2 steps, adding the index to the data coming into the streams, and then using that order to correct the output order.

class Orderer extends Transform {
    ordererIndex = 0;

    constructor() {
        super({ objectMode: true });
    }

    _transform(chunk, encoding, callback) {
        this.push({ index: this.ordererIndex, data: chunk });
        this.ordererIndex++;
        return callback();
    },
});
const orderer = new Orderer();

class ReOrderer extends Transform {
    reOrdererIndex = 0;
    reOrdererBuffer = new Map();

    constructor() {
        super({ objectMode: true });
    }

    _transform(chunk, encoding, callback) {
        this.reOrdererBuffer.set(chunk.index, chunk.data);

        while (this.reOrdererBuffer.has(this.reOrdererIndex)) {
            this.push(reOrdererBuffer.get(this.reOrdererIndex));
            this.reOrdererBuffer.delete(this.reOrdererIndex);
            this.reOrdererIndex++;
        }

        return callback();
    }

});

const reOrderer = new ReOrderer();

In myStream we alter the processorPool.process call to send in chunk.data, then change the pipeline order to include the orderer and reOrderer and we’re all set.

There is on final problem, one which I’ve not solved, what if the reOrderer buffer gets too much data? We need to link together the reOderer and the orderer so that when the buffer size inf reOrderer get too large it prevents the orderer from reading more data.

But that for another day.

The final file looks like:

import { Readable, Transform, pipeline } from 'node:stream';

const numbers = [
    '1000',
    '900',
    '800',
    '700',
    '600',
    '500',
    '400',
    '300',
    '200',
    '100',
    '100',
    '200',
    '300',
    '400',
    '500',
    '600',
    '700',
    '800',
    '900',
    '1000',
];

class Processor {
    ready = true;
    promise = Promise.resolve(1);

    process(data: number) {
        this.ready = false;
        this.promise = createTimeout(data);
        this.promise.then((result) => {
            this.ready = true;
            return result;
        });
        return this.promise;
    }
}

class ProcessorPool {
    private processors: Processor[];

    constructor(processors: Processor[]) {
        this.processors = processors;
    }

    async process(data: number) {
        const processor = await this.getReadyProcessor();
        return processor?.process(data);
    }

    getProcessorPromises() {
        return this.processors.map((processor) => processor.promise);
    }

    async getReadyProcessor() {
        const promises = this.getProcessorPromises();
        await Promise.any(promises);

        return this.processors.find((processor) => processor.ready);
    }

    async waitForFreeProcessor() {
        await this.getReadyProcessor();
        return true;
    }

    flush() {
        return Promise.all(this.getProcessorPromises());
    }
}

const processorPool = new ProcessorPool([
    new Processor(),
    new Processor(),
    new Processor(),
    new Processor(),
]);

const inputStream = Readable.from(numbers);

function createTimeout(wait: number): Promise<1> {
    return new Promise((resolve) =>
        setTimeout(() => {
            return resolve(1);
        }, wait)
    );
}

class Orderer extends Transform {
    ordererIndex = 0;

    constructor() {
        super({ objectMode: true });
    }

    _transform(chunk, encoding, callback) {
        this.push({ index: this.ordererIndex, data: chunk });
        this.ordererIndex++;
        return callback();
    }
}

const myStream = new Transform({
    async transform(chunk, encoding, callback) {
        await processorPool.waitForFreeProcessor();

        processorPool
            .process(chunk.data)
            .then(() => this.push({ index: chunk.index, data: chunk.data }));

        return callback();
    },
    async flush(callback) {
        await processorPool.flush();
        return callback();
    },
    objectMode: true,
});

class ReOrderer extends Transform {
    reOrdererBuffer = new Map();
    reOrdererIndex = 0;

    constructor() {
        super({ objectMode: true });
    }

    _transform(chunk, encoding, callback) {
        this.reOrdererBuffer.set(chunk.index, chunk.data);

        while (this.reOrdererBuffer.has(this.reOrdererIndex)) {
            this.push(this.reOrdererBuffer.get(this.reOrdererIndex));
            this.reOrdererBuffer.delete(this.reOrdererIndex);
            this.reOrdererIndex++;
        }

        return callback();
    }
}

pipeline(
    inputStream,
    new Orderer(),
    myStream,
    new ReOrderer(),
    console.log
);