Šta su Streams u Node.js?
Streams omogućava efikasniju obradu velikih količina podataka, i može se posmatrati kao neka “reka” podataka. Streams “čuva” memoriju jer obradom podataka u komadićima rad s velikim količinama podataka ne zauzimanja velike količine memorije odjednom. Streams brzo obradjuje podatake, jer se podaci mogu obraditi čim prvi komad stigne, bez čekanja na kompletan set podataka, što ubrzava proces obrade.
Vrste Streams-a
- Readable Streams: Omogućavaju čitanje podataka, poput vode koja teče iz česme.
- Writable Streams: Omogućavaju pisanje podataka, poput sipanja vode u sudoperu.
- Duplex Streams: Mogu istovremeno čitati i pisati podatke, slično korišćenju telefona.
- Transform Streams: Poseban tip Duplex Streams koji može menjati podatke dok prolaze kroz stream, slično filteru za vodu.
Streams u Node.js su snažan alat za efikasnu obradu podataka u realnom vremenu. Njihova sposobnost da obrade velike količine podataka u komadićima, uz istovremeno čuvanje resursa i ubrzavanje procesa, čini ih nezamenjivim u razvoju aplikacija.
Upisivanje
Upisivanje uz pomoć streama se najbolje objašnjava na primerima, te ćemo napisati programe (sa i bez korišćenja streams-a) koji imaju cilj da upišu u odgovarajući fajl milion brojeva svaki u novi red.
NAPOMENA: Izrvšenje koda sa korišćenjem streams-a je brže i do 30 puta!!!
Primer bez korišćenja streams-a
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
const fs = require('fs').promises; (async () => { try { const fd = await fs.open('bezStreamsPromisesBuffer.txt', 'w'); for (let i = 0; i < 1000000; i++) { // Kreiramo buffer sa podatkom koji želimo da upišemo const buffer = Buffer.from(`Broj: ${i}\n`, 'utf8'); // Asinhrono upisujemo buffer u fajl await fs.write(fd, buffer); } await fd.close(); console.log('Fajl je uspešno sačuvan koristeći promises i Buffer.'); } catch (error) { console.error('Došlo je do greške:', error); } })(); |
Primer sa korišćenjem streams-a
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
const fs = require('fs'); const stream = fs.createWriteStream('saStreams.txt'); for (let i = 0; i < 1000000; i++) { stream.write(`Broj: ${i}\n`); } stream.end(); stream.on('finish', () => { console.log('Fajl je uspešno sačuvan sa Streams!'); }); stream.on('error', (err) => { console.error('Došlo je do greške:', err); }); |
U prethodnom primeru ne koristimo buffer pa se string “Broj: ${i}\n” implicitno konvertuje u binarne brojeve pre nego što se upiše u tok, ali ovo može da se napiše i drugačije (“efikasnije”), ako se eksplicitno kreira Buffer objekat koristeći Buffer.from() metod i onda se on iskoristi da konvertuje stringove u Buffer objekte pre upisivanja u tok:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
const fs = require('fs'); const stream = fs.createWriteStream('saStreams.txt'); for (let i = 0; i < 1000000; i++) { // Eksplicitno kreiranje Buffer-a const buffer = Buffer.from(`Broj: ${i}\n`, 'utf8'); stream.write(buffer); } stream.end(); stream.on('finish', () => { console.log('Fajl je uspešno sačuvan sa Streams!'); }); stream.on('error', (err) => { console.error('Došlo je do greške:', err); }); |
Drain dogadjaj
Ako koristite writable stream (tok za pisanje) da biste upisali podatke u neki izlaz, može se desiti da pišete brže nego što izlaz može da prihvati podatke, pa to može dovesti do gomilanja podataka u memorijskom baferu. Da bi se to izbeglo postupak je sledeći: pri pisanju podataka u writable stream, možemo pratiti da li je strim ispraznio svoj bafer pomoću događaja “drain”. Kada se emituje dogadjaj “drain” to znači da strim signalizira da je spreman da prihvati još podataka, pa tek tada možemo nastaviti sa pisanjem podataka u strim bez straha od preopterećenja bafera.
Primer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
const stream = nekiWritableStream(); function writeData(data) { if (!stream.write(data)) { console.log('Bafer je pun, čekam na događaj "drain"...'); stream.once('drain', () => { console.log('Bafer je ispražnjen, možete nastaviti sa pisanjem.'); writeData(noviPodaci); // Nastavljamo sa pisanjem nakon što se bafer isprazni }); } } // Primer korišćenja writeData(nekiPodaci); |
U ovom primeru, kada stream.write(data) vrati false, to znači da je bafer strima pun i treba sačekati na “drain” događaj pre nego što nastavimo sa pisanjem. Kada se događaj “drain” emituje, možemo sigurno nastaviti sa pisanjem podataka.