Přejít k navigační liště

Zdroják » JavaScript » Používáme Node.js streamy

Používáme Node.js streamy

Články JavaScript

Streamy jako takové jsou tu s námi již hodně dlouhou dobu. Co se týče Node.js, ve verzi 0.10 přišly streamy s novým API, a také díky nadstavbám nad tímto low-level mechanismem můžeme tuto techniku pohodlně používat. A jsou zde pádné důvody, proč nenechat streamy bez povšimnutí.

Nálepky:

Úvod

Není potřeba zabíhat příliš do teorie. Stream je podle wikipedie sekvence datových prvků dostupných v průběhu času (tzn. po “kouscích”, ne jako celek). Navíc mohou být řetězeny do pipeline, kdy se výstup jednoho streamu stane vstupem následujícího, respektive následujících.

Streamy jsou odvozeny z třídy EventEmitter. Mohou být buď Readable, Writable, nebo obojí (Duplex a Transform). Obecně se dá tok dat ve streamech vyjádřit nějak takto: zdroj (Readable)změna (Readable/Writable)cíl (Writable). Uveďme si praktickou ukázku, kde zkomprimujeme soubor:

var fs = require('fs');
var zlib = require('zlib');

fs.createReadStream('large-file.txt')                 //Readable
    .pipe(zlib.createGzip())                          //Transform
    .pipe(fs.createWriteStream('large-file.txt.gz')); //Writable

Jaké jsou výhody oproti tradičnímu řešení plného callbacků, kdy bychom nejprve načetli celý soubor do paměti, poté ho celý zkomprimovali a následně uložili na disk? Jak už jsme si řekli, streamy pracují s daty po kouscích (výchozí je 16kB, u fs.createReadStream 64kB). Náš kód tedy načte prvních 64kB souboru a pošle ho ke komprimaci. Když načítá druhý blok, ten první se již pilně komprimuje. Na nic se nečeká, procesy běží paralelně. Pokud by komprimovací stream nestíhal, je zde implementován mechanismus, který zbrzdí i stream před ním, tak aby se nic nebufferovalo zbytečně.

Představte si, že komprimujete obrovský soubor. Při načítání celého souboru (pokud se vám to vůbec povede, V8 má určitá omezení) budete plýtvat časem i pamětí. To povede ke značnému zpomalení vašeho skriptu a k nespokojenosti uživatele.

Samozřejmě se nemusíme omezovat pouze na jeden Transform stream. Co kdybychom chtěli data nejdříve šifrovat? Není problém, prostě jen před komprimaci vložíme šifrovací stream.

var fs = require('fs');
var crypto = require('crypto');
var zlib = require('zlib');

fs.createReadStream('large-file.txt')                 //Readable
    .pipe(crypto.createCipher('aes256', 'password'))  //Transform
    .pipe(zlib.createGzip())                          //Transform
    .pipe(fs.createWriteStream('large-file.txt.gz')); //Writable

Protože streamy nabízejí konzistentní rozhraní, jsou vhodnou technikou k abstrakci vašeho kódu. Pokud si například chcete vytvořit vlastní Transform stream, který bude logovat velikost dat, je vám úplně jedno zda data přicházejí ze souboru, ze socketu, nebo jsou náhodně generovaná. Kdybychom tedy nechtěli komprimovat soubor, ale data, která nám pošle TCP server, stačí nám k tomu minimální změna:

var fs = require('fs');
var zlib = require('zlib');
var net = require('net');

//fs.createReadStream('large-file.txt')
net.connect(12345)                                    //Duplex (dědí Readable)
    .pipe(zlib.createGzip())                          //Transform
    .pipe(fs.createWriteStream('large-file.txt.gz')); //Writable

Tato flexibilita nám umožňuje i pohodlně posílat data z jednoho zdroje do více cílů. Následující snippet čte vše, co napíšeme do konzole, a to následně uloží do souboru a zároveň pošle na TCP server.

var net = require('net');
var fs = require('fs');

process.stdin.pipe(net.connect(12345));
process.stdin.pipe(fs.createWriteStream('console.txt'));

Shrňme si to do krátkého přehledu. Streamy:

  • jsou paměťově méně náročné
  • jsou rychlé
  • poskytují abstrakci díky jednotnému rozhraní
  • nabízejí příjemnou flexibilitu a čistý a přehledný kód

Object mode

Streamy běžně pracují pouze s řetězci nebo buffery. Pokud to chceme změnit, máme možnost je přepnout do object módu. Můžeme tak pracovat se všemi možnými hodnotami, které v JavaScriptu jsou (objekty, pole, čísla, …).

var Readable = require('stream').Readable;
var stream = Readable({objectMode : true });

Readable

Readable stream představuje zdroj, ze kterého data lezou ven do jiných streamů. Jako příklad mužeme uvést http request na serveru, souborový read stream, nebo standardní vstup (stdin).

Pojďme si jeden vlastní vytvořit. Můžeme si třeba vygenerovat Fibonacciho posloupnost do čísla 100 000. Nejjednodušší způsob vypadá nějak takto:

var Readable = require('stream').Readable;

//generuje n-tý prvek fibonacciho posloupnosti
function fib(n) {
    if (n === 0) return 0;
    if (n === 1) return 1;
    return fib(n - 1) + fib(n - 2);
}

var fibonacci = Readable();

//zdroj dat pro další streamy
fibonacci._read = function (size) {
    var i = 0, num = 0;

    while (true) {
        num = fib(i++);
        if (num > 100000) break;
        this.push(num + '\r\n');
    }

    this.push(null); //již nebudeme posílat data
};

fibonacci.pipe(process.stdout); //vypíšeme čísla do konzole

Jak vidíme, vše, co potřebujeme, je implementovat metodu _read, kde voláme posíláme data pomocí push. Parametrem size v metodě _read() říkají následující streamy, kolik dat od našeho streamu vyžadují. V našem příkladu toto ignorujeme.

Druhým způsobem je napsat si vlastní třídu, která bude z Readable dědit. Takto například mužeme posílat parametry do konstruktoru naší třídy a ovlivňovat tak chování streamu. Zápis se jen trochu prodlouží.

var Readable = require('stream').Readable;

function fib(n) {
    if (n === 0) return 0;
    if (n === 1) return 1;
    return fib(n - 1) + fib(n - 2);
}

require('util').inherits(Fibonacci, Readable);

function Fibonacci(to) {
    if (!(this instanceof Fibonacci)) return new Fibonacci(to);
    Readable.call(this);

    this._to = to || 100000;
}

Fibonacci.prototype._read = function (size) {
    var i = 0, num = 0;

    while (true) {
        num = fib(i++);
        if (num > this._to) break;
        this.push(num + '\r\n');
    }

    this.push(null);
};

var fibonacci = new Fibonacci(300000);

fibonacci.pipe(process.stdout);

Tento skript nám vypíše všechna čísla fibonacciho posloupnosti menší než 300 000.

Pojďme si ještě přiblížit metodu .pipe(destination, options). Jejím úkolem je propojit výstup z Readable streamu (vytáhnout data) se vstupem následujícího writable streamu (zapsat je do něj). Automaticky řídí tok dat tak, aby nezahltila cíl daty z příliš rychlého zdroje. Její návratovou hodnotou je destination stream, což nám dává možnost streamy pohodlně řetězit za sebou, tak jak jsme si ukazovali v úvodu.

Pokud neřekneme jinak, .pipe po předání všech dat uzavře destination stream, takže už do něj nelze zapisovat data (pokus skončí chybou). Pokud chceme stream používat opakovaně, stačí předat argument options s vlastností end nastavenou na true: input.pipe(output, { end : false }).

Jak už víme, Readable, stejně jako ostatní streamy, je odvozen od EventEmitter, tudíž nám nic nebrání navázat listenery na různé události:

  • readable – nastane, když jsou data připravena ke čtení
  • data – data se předávají listeneru hned, jak jsou k dispozici (nejrychlejší způsob, jak číst data ze streamu)
readable.on('data', function (chunk) {
    console.log('%d bajtů dat', chunk.length);
});
  • end – nastane, když již byla všechna data předána
  • close – pokud se například uzavře soubor před tím, než se stihla přečíst všechna data, nastane tato událost
  • error – chyba při čtení se předá listeneru této události

Writable

Writable stream představuje cíl, do kterého data zapisujeme. Příkladem může být http response na serveru, souborový write stream, nebo standardní výstup (stdout).

Čistý Writable stream nemá metodu .pipe, protože neumožňuje, aby z něho byla data čtena. Vše, co potřebujeme k tomu, abychom si vytvořili vlastní Writable stream, je implementovat metodu ._write(chunk, enc, done).

var Writable = require('stream').Writable;

var timelog = Writable();

timelog._write = function (chunk, enc, done) {
    var time = (new Date()).toLocaleTimeString();
    console.log('[%s]: %s', time, chunk.toString());
    done();
};

process.stdin.pipe(timelog);

V parametru chunk dostaneme blok dat, která nám předal předchozí stream. V enc je uloženo kódování řetězce nebo slovo buffer, pokud se pracuje s Buffery. A jako poslední nesmí chybět callback, kterým řekneme, že jsme hotovi.

Samozřejmě i z Writable streamu můžeme odvodit naší třídu. Postup je téměř totožný, jak jsme si ukázali u Readable, jen místo _read() implementujeme _write().

Do Writable streamu můžeme zapsat data nejen pomocí readable.pipe(), ale i “ručně” pomocí writable.write(data, [encoding], [done]) respektive writable.end([data], [encoding], [done]). Jak už z názvu vyplývá, druhá jmenovaná zápis do streamu zároveň ukončí. Zavolání write po end vyvolá chybu.

var fs = require('fs');
var file = fs.createWriteStream('file.txt');

file.write('Ahoj světe!\r\n');
file.write('Foo bar');
file.end();

I zde můžeme sledovat různé události. Kromě error to jsou:

  • drain – pokud stream nestíhá zapisovaná data zpracovávat a musí si je ukládat do interního bufferu, tato událost nastane vždy, když se buffer vyprázdní a stream je připraven přijímat další data
  • finish – když je zavolána metoda end() a všechna data jsou zpracována, nastane tato událost
  • pipe – nastane vždy, když je stream “připajpován” k nějakému Readable a ten je také předán listenerovi
  • unpipe – stejný případ jako u pipe, pouze se to týká metody readable.unpipe().

Duplex

Duplex streamy implementují jak Readable, tak Writable rozhraní. Ta jsou však od sebe oddělená, mají každý samostatný buffer; čtení a zápis se provádí nezávisle. Příkladem je například třída Socket. U implementace Duplex streamu musíte nadefinovat jak _read([size]), tak _write(chunk, enc, done).

Transform

Transform streamy jsou speciální odvozeninou Duplex streamů. Cílem je na vstupních datech něco vykonat a poslat je na výstup (transformovat). Jako příklad mužeme uvést všechny streamy v zlib a crypto.

Místo _read a _write jsou zde metody _transform(chunk, enc, done), volaná nad každým dostupným blokem dat, a nepovinná _flush(done), volaná, když stream končí.

var Transform = require('stream').Transform;
var fs = require('fs');

require('util').inherits(FindAndReplace, Transform);

function FindAndReplace(find, replace) {
    if (!(this instanceof FindAndReplace))
        return new FindAndReplace(find, replace);
    Transform.call(this);

    this._find = new RegExp(find, 'g');
    this._replace = replace;
    this.count = 0;
}

FindAndReplace.prototype._transform = function (chunk, enc, done) {
    var str = chunk.toString();
    this.count += str.match(this._find).length;
    str = str.replace(this._find, this._replace);

    this.push(new Buffer(str));
    done();
};

FindAndReplace.prototype._flush = function (done) {
    console.log('%d krát nahrazeno', this.count);
    done();
};

fs.createReadStream('file.txt')
    .pipe(new FindAndReplace('PHP', 'JavaScript'))
    .pipe(fs.createWriteStream('file.txt', { flags : 'r+' }));

Účelem Transform streamu nemusí být pouze data změnit. Můžeme například data jen analyzovat a posílat je dál tak, jak jsou. I v metodě _flush můžeme data přidávat pomocí _push. To se hodí například, když potřebujete provést operaci nad daty jako celkem. V tom případě si v _transform ukládáte bloky do proměnné a nikam je nepředáváte. To uděláte až ve _flush.

Nadstavby

Existuje řada užitečných knihoven, které tvorbu streamů zjednodušují. Zde jsou některé z nich:

  • from – zjednodušuje vytváření Readable streamů
  • node-writable – usnadňuje vytváření Writable streamů
  • through2 – zjednodušuje vytváření Transform streamů
  • duplexer – z jednoho Readable a jednoho Writable vytvoří jeden Duplex stream
  • event-stream – nabízí řadu užitečných funkcí pro práci se streamy

Závěr

Cílem tohoto článku nebylo probrat streamy úplně do hloubky. Pokud vás na to však navnadil, doporučuji přečíst si Stream handbook a samozřejmě dokumentaci. Pokud chcete vidět příklad užití streamů v praxi, tento článek je přesně pro vás.

Ještě poznámka na úplný závěr. Pokud chcete nové Streams API i ve starších verzích Node.js než je 0.10, je tu pro vás fallback v podobě knihovny readable-stream.

Komentáře

Subscribe
Upozornit na
guest
1 Komentář
Nejstarší
Nejnovější Most Voted
Inline Feedbacks
View all comments
Netolish

Při kombinaci šifrování + komprese je lépe nejdříve použít kompresi a až potom šifrování. Šifrování totiž větišinou vnese takou entropii, že je účinnost komprese šifrovaných dat protí původnímu ‚plain-textu‘ mizivá.

Enum a statická analýza kódu

Mám jednu univerzální radu pro začínající programátorty. V učení sice neexistují rychlé zkratky, ovšem tuhle radu můžete snadno začít používat a zrychlit tak tempo učení. Tou tajemnou ingrediencí je statická analýza kódu. Ukážeme si to na příkladu enum.

Pocta C64

Za prvopočátek své programátorské kariéry vděčím počítači Commodore 64. Tehdy jsem genialitu návrhu nemohl docenit. Dnes dokážu lehce nahlédnout pod pokličku. Chtěl bych se o to s vámi podělit a vzdát mu hold.