Overview#
The WHATWG Streams Standard (or "web streams") defines an API for handling
streaming data. It is similar to the Node.js Streams API but emerged later
and has become the "standard" API for streaming data across many JavaScript
environments.
There are three primary types of objects:
ReadableStream - Represents a source of streaming data.
WritableStream - Represents a destination for streaming data.
TransformStream - Represents an algorithm for transforming streaming data.
Node.js streams interoperability#
Node.js streams can be converted to web streams and vice versa via the toWeb and fromWeb methods present on stream.Readable, stream.Writable and stream.Duplex objects.
For more details refer to the relevant documentation:
API#
Class: ReadableStream#
new ReadableStream([underlyingSource [, strategy]])#
Added in: v16.5.0
underlyingSource <Object>
start <Function> A user-defined function that is invoked immediately when
the ReadableStream is created.
pull <Function> A user-defined function that is called repeatedly when the
ReadableStream internal queue is not full. The operation may be sync or
async. If async, the function will not be called again until the previously
returned promise is fulfilled.
cancel <Function> A user-defined function that is called when the
ReadableStream is canceled.
reason <any>
- Returns: A promise fulfilled with
undefined.
type <string> Must be 'bytes' or undefined.
autoAllocateChunkSize <number> Used only when type is equal to
'bytes'. When set to a non-zero value a view buffer is automatically
allocated to ReadableByteStreamController.byobRequest. When not set
one must use stream's internal queues to transfer data via the default
reader ReadableStreamDefaultReader.
strategy <Object>
highWaterMark <number> The maximum internal queue size before backpressure
is applied.
size <Function> A user-defined function used to identify the size of each
chunk of data.
readableStream.locked#
Added in: v16.5.0
The readableStream.locked property is false by default, and is
switched to true while there is an active reader consuming the
stream's data.
readableStream.cancel([reason])#
Added in: v16.5.0
reason <any>
- Returns: A promise fulfilled with
undefined once cancelation has
been completed.
readableStream.pipeThrough(transform[, options])#
Added in: v16.5.0
transform <Object>
readable <ReadableStream> The ReadableStream to which
transform.writable will push the potentially modified data
it receives from this ReadableStream.
writable <WritableStream> The WritableStream to which this
ReadableStream's data will be written.
options <Object>
preventAbort <boolean> When true, errors in this ReadableStream
will not cause transform.writable to be aborted.
preventCancel <boolean> When true, errors in the destination
transform.writable do not cause this ReadableStream to be
canceled.
preventClose <boolean> When true, closing this ReadableStream
does not cause transform.writable to be closed.
signal <AbortSignal> Allows the transfer of data to be canceled
using an <AbortController>.
- Returns: <ReadableStream> From
transform.readable.
Connects this <ReadableStream> to the pair of <ReadableStream> and
<WritableStream> provided in the transform argument such that the
data from this <ReadableStream> is written in to transform.writable,
possibly transformed, then pushed to transform.readable. Once the
pipeline is configured, transform.readable is returned.
Causes the readableStream.locked to be true while the pipe operation
is active.
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
const {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
})();
readableStream.pipeTo(destination[, options])#
Added in: v16.5.0
destination <WritableStream> A <WritableStream> to which this
ReadableStream's data will be written.
options <Object>
preventAbort <boolean> When true, errors in this ReadableStream
will not cause destination to be aborted.
preventCancel <boolean> When true, errors in the destination
will not cause this ReadableStream to be canceled.
preventClose <boolean> When true, closing this ReadableStream
does not cause destination to be closed.
signal <AbortSignal> Allows the transfer of data to be canceled
using an <AbortController>.
- Returns: A promise fulfilled with
undefined
Causes the readableStream.locked to be true while the pipe operation
is active.
readableStream.tee()#
Returns a pair of new <ReadableStream> instances to which this
ReadableStream's data will be forwarded. Each will receive the
same data.
Causes the readableStream.locked to be true.
readableStream.values([options])#
Added in: v16.5.0
Creates and returns an async iterator usable for consuming this
ReadableStream's data.
Causes the readableStream.locked to be true while the async iterator
is active.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
Async Iteration#
The <ReadableStream> object supports the async iterator protocol using
for await syntax.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
The async iterator will consume the <ReadableStream> until it terminates.
By default, if the async iterator exits early (via either a break,
return, or a throw), the <ReadableStream> will be closed. To prevent
automatic closing of the <ReadableStream>, use the readableStream.values()
method to acquire the async iterator and set the preventCancel option to
true.
The <ReadableStream> must not be locked (that is, it must not have an existing
active reader). During the async iteration, the <ReadableStream> will be locked.
Transferring with postMessage()#
A <ReadableStream> instance can be transferred using a <MessagePort>.
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
Class: ReadableStreamDefaultReader#
By default, calling readableStream.getReader() with no arguments
will return an instance of ReadableStreamDefaultReader. The default
reader treats the chunks of data passed through the stream as opaque
values, which allows the <ReadableStream> to work with generally any
JavaScript value.
readableStreamDefaultReader.cancel([reason])#
Added in: v16.5.0
reason <any>
- Returns: A promise fulfilled with
undefined.
Cancels the <ReadableStream> and returns a promise that is fulfilled
when the underlying stream has been canceled.
readableStreamDefaultReader.closed#
Added in: v16.5.0
- Type: <Promise> Fulfilled with
undefined when the associated
<ReadableStream> is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.
readableStreamDefaultReader.read()#
Added in: v16.5.0
- Returns: A promise fulfilled with an object:
Requests the next chunk of data from the underlying <ReadableStream>
and returns a promise that is fulfilled with the data once it is
available.
readableStreamDefaultReader.releaseLock()#
Added in: v16.5.0
Releases this reader's lock on the underlying <ReadableStream>.
Class: ReadableStreamBYOBReader#
The ReadableStreamBYOBReader is an alternative consumer for
byte-oriented <ReadableStream>s (those that are created with
underlyingSource.type set equal to 'bytes' when the
ReadableStream was created).
The BYOB is short for "bring your own buffer". This is a
pattern that allows for more efficient reading of byte-oriented
data that avoids extraneous copying.
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)#
Added in: v16.5.0
Creates a new ReadableStreamBYOBReader that is locked to the
given <ReadableStream>.
readableStreamBYOBReader.cancel([reason])#
Added in: v16.5.0
reason <any>
- Returns: A promise fulfilled with
undefined.
Cancels the <ReadableStream> and returns a promise that is fulfilled
when the underlying stream has been canceled.
readableStreamBYOBReader.closed#
Added in: v16.5.0
- Type: <Promise> Fulfilled with
undefined when the associated
<ReadableStream> is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.
readableStreamBYOBReader.read(view[, options])#
view <Buffer> | <TypedArray> | <DataView>
options <Object>
min <number> When set, the returned promise will only be
fulfilled as soon as min number of elements are available.
When not set, the promise fulfills when at least one element
is available.
- Returns: A promise fulfilled with an object:
Requests the next chunk of data from the underlying <ReadableStream>
and returns a promise that is fulfilled with the data once it is
available.
Do not pass a pooled <Buffer> object instance in to this method.
Pooled Buffer objects are created using Buffer.allocUnsafe(),
or Buffer.from(), or are often returned by various node:fs module
callbacks. These types of Buffers use a shared underlying
<ArrayBuffer> object that contains all of the data from all of
the pooled Buffer instances. When a Buffer, <TypedArray>,
or <DataView> is passed in to readableStreamBYOBReader.read(),
the view's underlying ArrayBuffer is detached, invalidating
all existing views that may exist on that ArrayBuffer. This
can have disastrous consequences for your application.
readableStreamBYOBReader.releaseLock()#
Added in: v16.5.0
Releases this reader's lock on the underlying <ReadableStream>.
Class: ReadableStreamDefaultController#
Added in: v16.5.0
Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableStreamDefaultController is the default controller
implementation for ReadableStreams that are not byte-oriented.
readableStreamDefaultController.close()#
Added in: v16.5.0
Closes the <ReadableStream> to which this controller is associated.
readableStreamDefaultController.desiredSize#
Added in: v16.5.0
Returns the amount of data remaining to fill the <ReadableStream>'s
queue.
readableStreamDefaultController.enqueue([chunk])#
Added in: v16.5.0
Appends a new chunk of data to the <ReadableStream>'s queue.
readableStreamDefaultController.error([error])#
Added in: v16.5.0
Signals an error that causes the <ReadableStream> to error and close.
Class: ReadableByteStreamController#
Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableByteStreamController is for byte-oriented ReadableStreams.
readableByteStreamController.byobRequest#
Added in: v16.5.0
readableByteStreamController.close()#
Added in: v16.5.0
Closes the <ReadableStream> to which this controller is associated.
readableByteStreamController.desiredSize#
Added in: v16.5.0
Returns the amount of data remaining to fill the <ReadableStream>'s
queue.
readableByteStreamController.enqueue(chunk)#
Added in: v16.5.0
Appends a new chunk of data to the <ReadableStream>'s queue.
readableByteStreamController.error([error])#
Added in: v16.5.0
Signals an error that causes the <ReadableStream> to error and close.
Class: ReadableStreamBYOBRequest#
When using ReadableByteStreamController in byte-oriented
streams, and when using the ReadableStreamBYOBReader,
the readableByteStreamController.byobRequest property
provides access to a ReadableStreamBYOBRequest instance
that represents the current read request. The object
is used to gain access to the ArrayBuffer/TypedArray
that has been provided for the read request to fill,
and provides methods for signaling that the data has
been provided.
readableStreamBYOBRequest.respond(bytesWritten)#
Added in: v16.5.0
Signals that a bytesWritten number of bytes have been written
to readableStreamBYOBRequest.view.
readableStreamBYOBRequest.respondWithNewView(view)#
Added in: v16.5.0
Signals that the request has been fulfilled with bytes written
to a new Buffer, TypedArray, or DataView.
readableStreamBYOBRequest.view#
Added in: v16.5.0
Class: WritableStream#
The WritableStream is a destination to which stream data is sent.
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])#
Added in: v16.5.0
underlyingSink <Object>
start <Function> A user-defined function that is invoked immediately when
the WritableStream is created.
write <Function> A user-defined function that is invoked when a chunk of
data has been written to the WritableStream.
close <Function> A user-defined function that is called when the
WritableStream is closed.
- Returns: A promise fulfilled with
undefined.
abort <Function> A user-defined function that is called to abruptly close
the WritableStream.
reason <any>
- Returns: A promise fulfilled with
undefined.
type <any> The type option is reserved for future use and must be
undefined.
strategy <Object>
highWaterMark <number> The maximum internal queue size before backpressure
is applied.
size <Function> A user-defined function used to identify the size of each
chunk of data.
writableStream.abort([reason])#
Added in: v16.5.0
reason <any>
- Returns: A promise fulfilled with
undefined.
Abruptly terminates the WritableStream. All queued writes will be
canceled with their associated promises rejected.
writableStream.close()#
Added in: v16.5.0
- Returns: A promise fulfilled with
undefined.
Closes the WritableStream when no additional writes are expected.
writableStream.getWriter()#
Added in: v16.5.0
Creates and returns a new writer instance that can be used to write
data into the WritableStream.
writableStream.locked#
Added in: v16.5.0
The writableStream.locked property is false by default, and is
switched to true while there is an active writer attached to this
WritableStream.
Transferring with postMessage()#
A <WritableStream> instance can be transferred using a <MessagePort>.
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
Class: WritableStreamDefaultWriter#
new WritableStreamDefaultWriter(stream)#
Added in: v16.5.0
Creates a new WritableStreamDefaultWriter that is locked to the given
WritableStream.
writableStreamDefaultWriter.abort([reason])#
Added in: v16.5.0
reason <any>
- Returns: A promise fulfilled with
undefined.
Abruptly terminates the WritableStream. All queued writes will be
canceled with their associated promises rejected.
writableStreamDefaultWriter.close()#
Added in: v16.5.0
- Returns: A promise fulfilled with
undefined.
Closes the WritableStream when no additional writes are expected.
writableStreamDefaultWriter.closed#
Added in: v16.5.0
- Type: <Promise> Fulfilled with
undefined when the associated
<WritableStream> is closed or rejected if the stream errors or the writer's
lock is released before the stream finishes closing.
writableStreamDefaultWriter.desiredSize#
Added in: v16.5.0
The amount of data required to fill the <WritableStream>'s queue.
writableStreamDefaultWriter.ready#
Added in: v16.5.0
- Type: <Promise> Fulfilled with
undefined when the writer is ready
to be used.
writableStreamDefaultWriter.releaseLock()#
Added in: v16.5.0
Releases this writer's lock on the underlying <ReadableStream>.
writableStreamDefaultWriter.write([chunk])#
Added in: v16.5.0
chunk <any>
- Returns: A promise fulfilled with
undefined.
Appends a new chunk of data to the <WritableStream>'s queue.
Class: WritableStreamDefaultController#
The WritableStreamDefaultController manages the <WritableStream>'s
internal state.
writableStreamDefaultController.error([error])#
Added in: v16.5.0
Called by user-code to signal that an error has occurred while processing
the WritableStream data. When called, the <WritableStream> will be aborted,
with currently pending writes canceled.
writableStreamDefaultController.signal#
Class: TransformStream#
A TransformStream consists of a <ReadableStream> and a <WritableStream> that
are connected such that the data written to the WritableStream is received,
and potentially transformed, before being pushed into the ReadableStream's
queue.
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
Added in: v16.5.0
transformer <Object>
start <Function> A user-defined function that is invoked immediately when
the TransformStream is created.
transform <Function> A user-defined function that receives, and
potentially modifies, a chunk of data written to transformStream.writable,
before forwarding that on to transformStream.readable.
flush <Function> A user-defined function that is called immediately before
the writable side of the TransformStream is closed, signaling the end of
the transformation process.
readableType <any> the readableType option is reserved for future use
and must be undefined.
writableType <any> the writableType option is reserved for future use
and must be undefined.
writableStrategy <Object>
highWaterMark <number> The maximum internal queue size before backpressure
is applied.
size <Function> A user-defined function used to identify the size of each
chunk of data.
readableStrategy <Object>
highWaterMark <number> The maximum internal queue size before backpressure
is applied.
size <Function> A user-defined function used to identify the size of each
chunk of data.
transformStream.readable#
Added in: v16.5.0
transformStream.writable#
Added in: v16.5.0
Transferring with postMessage()#
A <TransformStream> instance can be transferred using a <MessagePort>.
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
};
port2.postMessage(stream, [stream]);
Class: TransformStreamDefaultController#
The TransformStreamDefaultController manages the internal state
of the TransformStream.
transformStreamDefaultController.desiredSize#
Added in: v16.5.0
The amount of data required to fill the readable side's queue.
transformStreamDefaultController.enqueue([chunk])#
Added in: v16.5.0
Appends a chunk of data to the readable side's queue.
transformStreamDefaultController.error([reason])#
Added in: v16.5.0
Signals to both the readable and writable side that an error has occurred
while processing the transform data, causing both sides to be abruptly
closed.
transformStreamDefaultController.terminate()#
Added in: v16.5.0
Closes the readable side of the transport and causes the writable side
to be abruptly closed with an error.
Class: ByteLengthQueuingStrategy#
new ByteLengthQueuingStrategy(init)#
Added in: v16.5.0
byteLengthQueuingStrategy.highWaterMark#
Added in: v16.5.0
byteLengthQueuingStrategy.size#
Added in: v16.5.0
Class: CountQueuingStrategy#
new CountQueuingStrategy(init)#
Added in: v16.5.0
countQueuingStrategy.highWaterMark#
Added in: v16.5.0
countQueuingStrategy.size#
Added in: v16.5.0
Class: TextEncoderStream#
new TextEncoderStream()#
Added in: v16.6.0
Creates a new TextEncoderStream instance.
textEncoderStream.encoding#
Added in: v16.6.0
The encoding supported by the TextEncoderStream instance.
textEncoderStream.readable#
Added in: v16.6.0
textEncoderStream.writable#
Added in: v16.6.0
Class: TextDecoderStream#
new TextDecoderStream([encoding[, options]])#
Added in: v16.6.0
encoding <string> Identifies the encoding that this TextDecoder instance
supports. Default: 'utf-8'.
options <Object>
fatal <boolean> true if decoding failures are fatal.
ignoreBOM <boolean> When true, the TextDecoderStream will include the
byte order mark in the decoded result. When false, the byte order mark
will be removed from the output. This option is only used when encoding is
'utf-8', 'utf-16be', or 'utf-16le'. Default: false.
Creates a new TextDecoderStream instance.
textDecoderStream.encoding#
Added in: v16.6.0
The encoding supported by the TextDecoderStream instance.
textDecoderStream.fatal#
Added in: v16.6.0
The value will be true if decoding errors result in a TypeError being
thrown.
textDecoderStream.ignoreBOM#
Added in: v16.6.0
The value will be true if the decoding result will include the byte order
mark.
textDecoderStream.readable#
Added in: v16.6.0
textDecoderStream.writable#
Added in: v16.6.0
Class: CompressionStream#
new CompressionStream(format)#
format <string> One of 'deflate', 'deflate-raw', 'gzip', or 'brotli'.
compressionStream.readable#
Added in: v17.0.0
compressionStream.writable#
Added in: v17.0.0
Class: DecompressionStream#
new DecompressionStream(format)#
format <string> One of 'deflate', 'deflate-raw', 'gzip', or 'brotli'.
decompressionStream.readable#
Added in: v17.0.0
decompressionStream.writable#
Added in: v17.0.0
Utility Consumers#
Added in: v16.7.0
The utility consumer functions provide common options for consuming
streams.
They are accessed using:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');