Steam
- file
- network
Stream은 스트림 가능한 소스로부터 핸들러에게 해당 데이터를 여러 개의 작은 청크로 쪼개 처리할 수 있다.
큰 데이터를 처리해야 하거나, 비동기적으로만 얻을 수 있는 데이터를 처리해야 할 때 유용하다.
stream의 일반적인 구현 형태
const fs = require('fs')
const rs = fs.createReadStream('test.txt')
rs.on('data', data => {
// Do Something with data..
})
rs.on('error', error => { })
rs.on('end', () => {})
data, error, end 등의 이벤트 핸들러를 달아 처리한다.
특별히 지정하지 않으면 data는 Buffer가 된다.
stream의 종류
1. Readable
- 스트림으로부터 읽을 수 있다.
* fs.createReadStream
* process.stdin
* 서버 입장의 HTTP 요청
* 클라이언트 입장의 HTTP 응답
Writable
- 스트림에 출력할 수 잇다.
* fs.createWriteStream
* process.stdout
* 클라이언트 입장의 HTTP 요청
* 서버 입장의 HTTP 응답
2. Duplex
- 이 스트림에 입려을 받을 수 있고, 출력을 보낼 수도 있다.
* TCP sockets
* zlib streams
* crypto streams (암호화 알고리즘)
Transform
- 입력받은 스트림을 변환해 새로운 스트림으로 만든다.
* zlib streams (duplex 도 해당)
* crypto streams (duplex 도 해당)
스트림으로 큰 파일 처리하기
예제 1
write-file-stream.js
// @ts-check
const fs = require('fs')
const ws = fs.createWriteStream('D:/node_workspace/src/test.txt')
ws.write('Hello, world!')
const NUM_MBYTES = 500
for (let i = 0; i < NUM_MBYTES; i += 1) {
ws.write('a'.repeat(1024 * 1024))
}
read-file-stream.js
// @ts-check
const fs = require('fs')
const rs = fs.createReadStream('src/test.txt', {
encoding: 'utf-8',
})
let chunkCount = 0
rs.on('data', (data) => {
chunkCount += 1
console.log(data[0])
})
rs.on('end', () => {
console.log('Event: end')
console.log(chunkCount)
})
예시 2
write-file-stream.js
// @ts-check
// aaaaaaaaaaaabbbbbbbbbbbbbbbbbbaaaaaaaaaaabbbbbbbbbbbbbaaaaaa... aaabbbb
// 위와 같은 파일에서, a의 연속 구간(a block)의 개수와, b의 연속 구간(b block)의 개수를 세는 프로그램
const fs = require('fs')
const ws = fs.createWriteStream('D:/node_workspace/src/test.txt')
const NUMB_BLOCKS = 500
/** @type {Object.<string, number>} */
const numBlocksPerCharacter = {
a: 0,
b: 0,
}
for (let i = 0; i < NUMB_BLOCKS; i += 1) {
const blockLengths = Math.floor(800 + Math.random() * 200)
const character = i % 2 === 0 ? 'a' : 'b'
ws.write(character.repeat(1024 * blockLengths))
numBlocksPerCharacter[character] += 1
}
console.log(numBlocksPerCharacter)
read-file-stream.js
// @ts-check
const { log } = console
const fs = require('fs')
const rs = fs.createReadStream('src/test.txt', {
encoding: 'utf-8',
highWaterMark: 65536 * 2,
})
/** @type {Object.<string, number>} */
const numBlocksPerCharacter = {
a: 0,
b: 0,
}
/**
* @type {string | undefined}
*/
let preveCharacter
rs.on('data', (data) => {
if (typeof data !== 'string') {
return
}
for (let i = 0; i < data.length; i += 1) {
if (data[i] !== preveCharacter) {
const newCharacter = data[i]
if (!newCharacter) {
/* eslint-disable-next-line no-continue*/
continue
}
preveCharacter = newCharacter
numBlocksPerCharacter[newCharacter] += 1
}
}
})
rs.on('end', () => {
log('Event: end')
log(numBlocksPerCharacter)
})
stream 처리 시 유의점 - JSON 스트림 처리기 예시
[목표] 줄바꿈으로 분리된 JSON들을 읽어서, 정상 JSON들에 한해 data 값 모두 더하기
// @ts-check
const { log } = console
const fs = require('fs')
const rs = fs.createReadStream('src/jsons', {
encoding: 'utf-8',
highWaterMark: 20,
})
let totalSum = 0
let accumulatedJsonStr = ''
rs.on('data', (chunk) => {
if (typeof chunk !== 'string') {
return
}
accumulatedJsonStr += chunk
log(chunk)
const lastNewLineIdx = accumulatedJsonStr.lastIndexOf('\n')
const jsonLinesStr = accumulatedJsonStr.substring(0, lastNewLineIdx)
accumulatedJsonStr = accumulatedJsonStr.substring(lastNewLineIdx)
totalSum += jsonLinesStr
.split('\n')
.map((jsonLine) => {
try {
return JSON.parse(jsonLine)
} catch {
return undefined
}
})
.filter((json) => json)
.map((json) => json.data)
.reduce((sum, curr) => sum + curr, 0)
})
rs.on('end', () => {
log(totalSum)
})
pipeline 과 promise
pipeline은 transform stream을 쉽게 활용하게 도와준다.
stream.pipeline(
fs.createReadStream('input'),
zlib.createGzip(),
fs.createWriteStream('compressed.gz')
)
예제 pipeline
// @ts-check
const { log, error } = console
const fs = require('fs')
const stream = require('stream')
const zlib = require('zlib')
stream.pipeline(
fs.createReadStream('src/jsons'),
zlib.createGzip(),
fs.createWriteStream('D:/node_workspace/src/jsons.gz'),
(err) => {
if (err) {
error('pipeline failed', err)
} else {
log('Pipeline succeeded')
//압축 풀어서 기존 파일과 확인
stream.pipeline(
fs.createReadStream('src/jsons.gz'),
zlib.createGunzip(),
fs.createWriteStream('src/jsons.unzipped'),
(_err) => {
if (_err) {
error('Gunzip failed', _err)
} else {
log('Gunzip succeeded')
}
}
)
}
}
)
상단 소스를 promise를 추가하여 간결하게 코드 구현
// @ts-check
const fs = require('fs')
const stream = require('stream')
const zlib = require('zlib')
const util = require('util')
async function gzip() {
return util.promisify(stream.pipeline)(
fs.createReadStream('src/jsons'),
zlib.createGzip(),
fs.createWriteStream('src/jsons.gz')
)
}
async function unzip() {
return util.promisify(stream.pipeline)(
fs.createReadStream('src/jsons.gz'),
zlib.createGunzip(),
fs.createWriteStream('src/jsons.unzipped')
)
}
async function main() {
await gzip()
await unzip()
}
main()
'Node.js' 카테고리의 다른 글
Express (0) | 2021.09.15 |
---|---|
리팩토링 프로젝트 (0) | 2021.09.15 |
Node.js 내장 객체 (0) | 2021.09.15 |
npm, yarn 등 패키지 매니저 (0) | 2021.09.14 |
Node.js 핵심 개념 정리 (0) | 2021.09.10 |
댓글