Diagrama de integração falha

Streams o que são e como usar no NodeJS

Netflix, Prime Video, Crunchyroll todos serviços de streaming, mas você sabe o que uma stream? E mais, você sabe o que uma stream no NodeJS?

Streams

Pensei na Netflix, você não espera um episódio inteiro de uma série baixar para você começar a assistir, por que você está consumindo uma stream de um vídeo.

Uma stream é um fluxo de dados. Em vez de você consumir um dado inteiro de uma vez, como um conteúdo de um arquivo ou um vídeo, você consume o dado em partes.

Memória

Mas, por que eu preciso consumir os dados em partes em vez de consumir tudo de uma vez?

Hoje pode não parece tanto, mas memória era algo escasso. Então quando você criava software, o quanto de memória era usado era uma grande preocupação. Se você precisasse criar um software que para inicializar ele consumisse 30MB de memória e analisasse um arquivo que tivesse 2GB de tamanho o seu software pararia de funcionar por não ter memória suficiente para carregar um arquivo tão grande.

Diagrama de integração falha

Agora se fosse utilizado um stream em vez de carregar o arquivo completamente não teria esse problema de falta de memoria, pois a arquivo seria enviado em partes, chamadas chunk, e processado a medida que chegam.

Streams no NodeJS

O NodeJS possui um módulo para trabalhar com streams desde a primeira versão: node:stream.

O NodeJS possui 3 tipos de streams diferentes:

Readable Stream

Esse tipo de stream é utilizado quando queremos ler de uma fonte de dados. A stream pode operar em dois modos:

  • flowing mode: nesse modo os dados são lidos automaticamente através de eventos “data”
  • paused mode: nesse modo o método read() deve ser chamado explicitamente para ler cada dado da stream.

Por padrão toda Readable Stream começa no paused mode, mas isso pode ser mudado ao chamar o métodos resume(), pipe() ou criando um handler para evento data.

Para criar uma stream de leitura (readable) precisamos importar Readable do módulo node:stream e podemos usar o método from() que recebe um Iterable ou um AsyncIterable.

As streams herdam da classe EventEmitter então podemos usar o método on para lidar com eventos da stream. Alguns eventos que ocorrem com uma Readable Stream são:

  • data: dados são recebidos.
  • end: todos os dados foram lidos. Não há mais dados para consumir.
  • close: a stream foi fechada.
  • error: algum erro ocorreu durante o fluxo de dados
import { Readable } from "node:stream";

const readableStream = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

readableStream.on("data", (chunk) => {
    console.log("Dado recebido:", chunk);

    if (chunk === 7) {
        // Simula uma condição para fechar a stream
        if (Math.random() > 0.5) {
            // fecha a stream sem erro (evento "close")
            readableStream.destroy();
        } else {
            // fecha a stream com erro (evento "error")
            readableStream.destroy(new Error("Erro forçado na stream"));
        }
    }
});

readableStream.on("end", () => {
    console.log("Todos os dados consumidos.");
});

readableStream.on("close", () => {
    console.log("Stream foi fechada.");
});

readableStream.on("error", (err) => {
    console.error("Erro na stream:", err);
});

Se for necessário criar sua própria Readable Stream personalizada, você precisa implementar a interface da classe Readable. Isso pode ser feito herdando a classe Readable e implementando o método _read().

import { Readable } from "node:stream";

class MyStream extends Readable {
    data = [1, 2, 3, 4, 5];

       constructor(options = {}) {
          super(options);
       }

    _read() {
        const chunk = this.data.shift();
        if (chunk) {
            const dataToSend = String(chunk);
            this.push(dataToSend); // envia os dados do tipo string ou Buffer
            return;
        }

        this.push(null); // sinaliza o fim da stream
    }
}

const myStream = new MyStream();

myStream.on("data", (chunk) => {
    // Os dados chegam como um tipo Buffer
    console.log(chunk.toString()); // convertendo para string
});

Para casos mais simples pode ser diretamente uma instancia de Readable e implementando o método read().

import { Readable } from "node:stream";

const data = [1, 2, 3, 4, 5];

const myStream = new Readable({
    read() {
        const chunk = data.shift();
        if (chunk) {
            const dataToSend = String(chunk);
            this.push(dataToSend);
            return;
        }
    }
});

myStream.on("data", (chunk) => {
    console.log(chunk.toString());
});

Writable Stream

Esse tipo de stream é utilizado quando queremos escrever dados em algum lugar.

Para criar uma stream de escrita (writable), importamos Writable do módulo node:stream e criamos uma instância com um método write().

O método write() recebe 3 argumentos:

  • chunk: os dados que serão escritos em algum lugar. O chunk é uma variável do tipo Buffer.
  • encoding: a codificação usada.
  • callback: função para continuar o fluxo ou parar caso tenho um error.
import { Writable } from "node:stream";

const writableStream = new Writable({
    write(chunk, _encoding, callback) {
        console.log("Dado recebido:", chunk.toString());

        callback();
    }
});

writableStream.write("Olá");
writableStream.write("Mundo!");

Streams operam com string, Buffer, TypedArray ou DataView. Caso seja necessário trabalhar com outros tipos é preciso usar objectMode: true.

import { Writable } from "node:stream";

const writableStream = new Writable({
    objectMode: true,
    write(chunk, _encoding, callback) {
                // chunk agora é um object
        console.log("Dado recebido:", chunk);

        callback();
    }
});

writableStream.write({ message: "Olá" });
writableStream.write({ message: "Mundo!" });

Para criar uma writable stream customizada é necessário herdar de Writable e implementar o método _write e/ou _writev.

import { Writable } from "node:stream";

class MyStream extends Writable {
    constructor(options = {}) {
        super(options);
    }

    _write(chunk, _encoding, callback) {
        console.log("Dado recebido:", chunk);
        callback();
    }
}

const writableStream = new MyStream({ objectMode: true });

writableStream.write("Olá");
writableStream.write({ message: "Mundo!" });

Duplex e Transform Stream

As duplex streams são streams que implementam o protocolo Readable e Writable, ou seja, ela funciona como uma stream de escrita e leitura.

Dentro das Duplex streams, temos as Transform streams que são streams de transformação. Essas são usadas quando queremos manipular um dados que está passando pelo nossa stream.

Vamos a um exemplo, vamos criar uma stream de leitura (readable) e conectar a uma stream de transformação (transform) usando o método pipe e vamos mandar para o console usando o process.stdout que é uma stream de escrita.

import { Readable, Transform } from "node:stream";

const dados = ["javascript", "nodejs", "java", "elixir", "c++"];
const readableStream = Readable.from(dados);

const transformStream = new Transform({
    transform(chunk, _encoding, callback) {
        const dado = chunk.toString();
        const dadoTransformado = `${dado.toUpperCase()}n`;

        // callback recebe dois parâmetros: erro e o dado transformado
        callback(null, dadoTransformado);
    }
});

// o método pipe conecta streams
readableStream.pipe(transformStream).pipe(process.stdout);

Resumindo: lemos os dados com uma readable stream, transformamos os dados com uma stream duplex stream (Transform) e escrevemos o conteúdo no console usando uma writable stream (process.stdout).

Por que usar?

Vamos usar numa situação mais real para você entender o porquê usar uma stream no NodeJS.

Vamos fazer a análise de uma base de dados csv, baixei uma base de dados sobre o preço do combustível.

Vamos usar o node:fs para ler o arquivo e escrever no console.

import fs from "node:fs"

const arquivo = fs.readFileSync("ca-2004-01.csv");
const linhas = arquivo.toString().split("n");
linhas.map(linha => {
  console.log(linha);
});

node index.js

Ao rodar o código funciona perfeitamente.

Vamos tentar novamente, mas limitaremos a memória usada pelo programa para 100MB. Afinal, não teremos gigas de memória em todo ambiente de execução.

node --max-old-space-size=100 index.js

Dessa vez tivemos um erro do motor v8, usado pelo NodeJS para executar o javascript, por falta de memória para o nosso programa.

Erro v8

Agora vamos usar as streams. O módulo node:fs tem uma função para ler um arquivo como uma stream.

import fs from "node:fs"

const readableStream = fs.createReadStream("ca-2004-01.csv");

readableStream.pipe(process.stdout);

node --max-old-space-size=100 index.js

Agora o código funcionou e você vai ver os dados no console.

No primeiro caso, como limitamos a memória do nosso programa, ele quebra ao carregar o arquivo, porque ele precisa de mais de 100MB para isso.

Já com streams lemos e processamos o arquivo em partes e não todo de uma vez. Carregamos 64KB do arquivo e mandamos escrever no console, depois carregamos mais 64KB e mandamos para o console e repetimos até o fim dos dados. Com isso, não usamos o nosso limite de 100MB inteiro e o programa finaliza com sucesso.

Similar Posts