Nodejs Streams
Steven Chetwynd
1729 Words … ⏲ Reading Time: 7 Minutes, 51 Seconds
2023-03-05 00:00 +0000
I’ve recently been working with streams in Node.js, or more accurately over a year ago I was working with streams and have recently looked back on the code and realised it needs some major refactoring.
As part of that work I feel I need to explain:
- how to use a stream
- the advantages for my use case
- a cleaner implementation for my use case.
How to use a stream
In node there are 2 ways to implement a stream:
- Extend from the stream object,
- Via the stream constructor
Options 2 is the easiest way if the stream doesn’t require extra instance properties.
I’ve been using only Transform streams, to alter incoming data before outputting it to another stream, so all examples will be using Transform stream. Readable, Writable, Transform, and Duplex streams are all documented in the Node.js API docs, I would fully recommend using them as reference when implementing streams.
Extend from stream object:
import { Transform } from 'node:stream';
class MyStream extends Transform {
constructor() {
// Very usefull for working on object, otherwise the chunk can only be
// buffers or strings
super({ objectMode: true })
}
_transform(chunk, encoding, callback) {
// do some alteration to the chunk,
const result = chunk * 5;
// push the result to the next stream
this.push(result);
// use the callback to pull the next chunk and recall this function
return callback();
}
}
const myStream = new MyStream();
Using constructor
import { Transform } from 'node:stream';
const myStream = new Transform({
transform(chunk, encoding, callback) {
// do some alteration to the chunk,
const result = chunk * 5;
// push the result to the next stream
this.push(result);
// use the callback to pull the next chunk and recall this function
return callback();
},
// Very usefull for working on object, otherwise the chunk can only be
// buffers or strings
objectMode: true,
});
Its important to note here, the change between _transform
and transform
for
the method name, and also using the constructor creates an instance of the class
when extending the class creates a new class which can be instantiated many
times.
My Use Case
I need to load a large file, perform a process on each line of the file, write the results to another file. The input file would be too big to load fully into memory, so I’ve used streams to process a small amount of data at a time.
Streams have this handy thing call back pressure built into them, what this does is prevent data from building up in memory behind a process which is running slow. If my process stream took 50ms to run but the input stream could read every 25ms we would get a load of data built up in memory waiting to be processed. Back pressure stops this by only reading off the previous stream once the current processing is finished.
This is an over simplification, there will be lots of optimisations done by v8 behind the curtain, stuff which I don’t understand.
Cool, so my processing can look like this:
import { Readable, Transform, pipeline } from 'node:stream';
const numbers = {
data: ['1', '2', '3', '4', '5'],
*[Symbol.iterator]() {
for (const number_ of numbers.data) {
yield number_;
}
},
};
const inputStream = Readable.from(numbers);
const myStream = new Transform({
transform(chunk, encoding, callback) {
const result = chunk * 5;
this.push(result);
return callback();
},
});
pipeline(inputStream, myStream, () => console.log('Done'));
I’m using Readable.from(Array)
to simulate an input stream here, pipeline
to
link all the streams together, and I’ve not included a writeable stream at the
end because for this demo its not required.
That’s great, now I can process each item in the stream one by one, but my
processing involves an async operation, I can stick an async
on the
transform
method:
...
const myStream = new Transform({
async transform (chunk, encoding, callback) {
const result = await processChunk(chunk);
this.push(result);
return callback();
},
});
...
Also great, however the async function can handle multiple requests concurrently, this could be a call to an API or using Workers on different threads. Above would still work but if I have 4 workers, I’m only making use of one at a time.
If we batch up the chunks, and when we get to the myStream
we iterate over the
chunks processing each of them like so:
...
const batcher = new Transform({
transform(chunk, encoding, callback) {
batcherBuffer.push(chunk);
if (batcherBuffer.length === 4) {
this.push([...batcherBuffer]);
batcherBuffer = [];
}
return callback();
},
flush(callback) {
this.push(batcherBuffer);
return callback();
},
objectMode: true,
});
const myStream = new Transform({
async transform(chunk, encoding, callback) {
const promises = [];
for (const data of chunk) {
const promise = processChunk(data);
promises.push(promise);
}
await Promise.all(promises);
this.push(chunk);
return callback();
},
objectMode: true,
});
pipeline(inputStream, batcher, myStream, () => console.log('Done'));
We’re getting closer here, but what we’ll see is every thing will be done in groups of 4 where the time take for that group is the time taken to do the longest process. If the chunks take different times to process;
- takes 25ms
- takes 25ms
- takes 1000ms
- takes 25ms
With our current code, we have 3 chunk processors idle for 975ms waiting for the long process to finish, if we could have them processing more chunks during this time we would speed up the overall processing of the file.
First thing is to delete the batcher we just added.
We’re now going to need a processor with state that can say if its finished its process, and a pool to manage those processors.
class Processor<T> {
ready = true;
promise: Promise<T | undefined> = Promise.resolve(undefined);
process(data: number) {
this.ready = false;
this.promise = createTimeout(data);
this.promise.then((result) => {
this.ready = true
return result;
});
return this.promise;
}
}
class ProcessorPool {
private processors: Processor[];
constructor(processors: Processor[]) {
this.processors = processors;
}
async process(data: number) {
const processor = await this.getReadyProcessor();
return processor?.process(data);
}
getProcessorPromises() {
return this.processors.map((processor) => processor.promise);
}
async getReadyProcessor() {
const promises = this.getProcessorPromises();
await Promise.any(promises);
return this.processors.find((processor) => processor.ready);
}
async waitForFreeProcessor() {
await this.getReadyProcessor();
return true;
}
flush() {
return Promise.all(this.getProcessorPromises());
}
}
const processorPool = new ProcessorPool([
new Processor(),
new Processor(),
new Processor(),
new Processor(),
]);
The Processor
just wraps the promise so we can get the state. The
ProcessorPool
manages these processes when process
is called it waits for a
free processor and passes the data onto it.
const myStream = new Transform({
async transform(chunk, encoding, callback) {
await processorPool.waitForFreeProcessor();
processorPool.process(chunk).then(() => this.push(chunk));
return callback();
},
async flush(callback) {
await processorPool.flush();
return callback();
},
objectMode: true,
});
Before passing work to the processorPool
we must wait for a free processor,
this cannot be done as part of the process
call because:
- adding an await to the
process
call will result in processing only 1 chunk at a time - not having the await will result in a buffer in the
processorPool
with the entire file being read and being put into that buffer.
A flush
method must be added because when the input stream is empty the
pipeline will try to close, but we still have data in the processors, the flush
method, empties that buffer. The then
in the main transform
method is then
called pushing the result into the output stream.
There is one last maybe problem, taking our example from above:
- takes 25ms
- takes 25ms
- takes 1000ms
- takes 25ms
Because we are no longer doing the processing in order the output would also be out of order: 1, 2, 4, 3, with 3 taking the longest so it ends up last. If order isn’t important, then we’re grand, if order is important we need a re-ordered.
Re-ordering takes 2 steps, adding the index to the data coming into the streams, and then using that order to correct the output order.
class Orderer extends Transform {
ordererIndex = 0;
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
this.push({ index: this.ordererIndex, data: chunk });
this.ordererIndex++;
return callback();
},
});
const orderer = new Orderer();
class ReOrderer extends Transform {
reOrdererIndex = 0;
reOrdererBuffer = new Map();
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
this.reOrdererBuffer.set(chunk.index, chunk.data);
while (this.reOrdererBuffer.has(this.reOrdererIndex)) {
this.push(reOrdererBuffer.get(this.reOrdererIndex));
this.reOrdererBuffer.delete(this.reOrdererIndex);
this.reOrdererIndex++;
}
return callback();
}
});
const reOrderer = new ReOrderer();
In myStream
we alter the processorPool.process
call to send in chunk.data
,
then change the pipeline order to include the orderer
and reOrderer
and
we’re all set.
There is on final problem, one which I’ve not solved, what if the reOrderer
buffer gets too much data? We need to link together the reOderer
and the
orderer
so that when the buffer size inf reOrderer
get too large it prevents
the orderer
from reading more data.
But that for another day.
The final file looks like:
import { Readable, Transform, pipeline } from 'node:stream';
const numbers = [
'1000',
'900',
'800',
'700',
'600',
'500',
'400',
'300',
'200',
'100',
'100',
'200',
'300',
'400',
'500',
'600',
'700',
'800',
'900',
'1000',
];
class Processor {
ready = true;
promise = Promise.resolve(1);
process(data: number) {
this.ready = false;
this.promise = createTimeout(data);
this.promise.then((result) => {
this.ready = true;
return result;
});
return this.promise;
}
}
class ProcessorPool {
private processors: Processor[];
constructor(processors: Processor[]) {
this.processors = processors;
}
async process(data: number) {
const processor = await this.getReadyProcessor();
return processor?.process(data);
}
getProcessorPromises() {
return this.processors.map((processor) => processor.promise);
}
async getReadyProcessor() {
const promises = this.getProcessorPromises();
await Promise.any(promises);
return this.processors.find((processor) => processor.ready);
}
async waitForFreeProcessor() {
await this.getReadyProcessor();
return true;
}
flush() {
return Promise.all(this.getProcessorPromises());
}
}
const processorPool = new ProcessorPool([
new Processor(),
new Processor(),
new Processor(),
new Processor(),
]);
const inputStream = Readable.from(numbers);
function createTimeout(wait: number): Promise<1> {
return new Promise((resolve) =>
setTimeout(() => {
return resolve(1);
}, wait)
);
}
class Orderer extends Transform {
ordererIndex = 0;
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
this.push({ index: this.ordererIndex, data: chunk });
this.ordererIndex++;
return callback();
}
}
const myStream = new Transform({
async transform(chunk, encoding, callback) {
await processorPool.waitForFreeProcessor();
processorPool
.process(chunk.data)
.then(() => this.push({ index: chunk.index, data: chunk.data }));
return callback();
},
async flush(callback) {
await processorPool.flush();
return callback();
},
objectMode: true,
});
class ReOrderer extends Transform {
reOrdererBuffer = new Map();
reOrdererIndex = 0;
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
this.reOrdererBuffer.set(chunk.index, chunk.data);
while (this.reOrdererBuffer.has(this.reOrdererIndex)) {
this.push(this.reOrdererBuffer.get(this.reOrdererIndex));
this.reOrdererBuffer.delete(this.reOrdererIndex);
this.reOrdererIndex++;
}
return callback();
}
}
pipeline(
inputStream,
new Orderer(),
myStream,
new ReOrderer(),
console.log
);