본문 바로가기
Node.js

Stream

by 오늘만 사는 여자 2021. 9. 15.
728x90
반응형

Steam

- file

- network

 

Stream은 스트림 가능한 소스로부터 핸들러에게 해당 데이터를 여러 개의 작은 청크로 쪼개 처리할 수 있다.

큰 데이터를 처리해야 하거나, 비동기적으로만 얻을 수 있는 데이터를 처리해야 할 때 유용하다.

 

stream의 일반적인 구현 형태

const fs = require('fs')
const rs = fs.createReadStream('test.txt')

rs.on('data', data => {
  // Do Something with data..
})

rs.on('error', error => { })

rs.on('end', () => {})

data, error, end 등의 이벤트 핸들러를 달아 처리한다.

특별히 지정하지 않으면 data는 Buffer가 된다.

 

stream의 종류

1. Readable

- 스트림으로부터 읽을 수 있다.

* fs.createReadStream

* process.stdin

* 서버 입장의 HTTP 요청

* 클라이언트 입장의 HTTP 응답

 

 Writable

- 스트림에 출력할 수 잇다.

* fs.createWriteStream

* process.stdout

* 클라이언트 입장의 HTTP 요청

* 서버 입장의 HTTP 응답

 

2. Duplex

- 이 스트림에 입려을 받을 수 있고, 출력을 보낼 수도 있다.

* TCP sockets

* zlib streams

* crypto streams (암호화 알고리즘)

 

Transform

- 입력받은 스트림을 변환해 새로운 스트림으로 만든다.

* zlib streams (duplex 도 해당)

* crypto streams (duplex 도 해당)

 

스트림으로 큰 파일 처리하기 

예제 1

write-file-stream.js

// @ts-check

const fs = require('fs')

const ws = fs.createWriteStream('D:/node_workspace/src/test.txt')

ws.write('Hello, world!')

const NUM_MBYTES = 500
for (let i = 0; i < NUM_MBYTES; i += 1) {
  ws.write('a'.repeat(1024 * 1024))
}

read-file-stream.js

// @ts-check

const fs = require('fs')

const rs = fs.createReadStream('src/test.txt', {
  encoding: 'utf-8',
})

let chunkCount = 0

rs.on('data', (data) => {
  chunkCount += 1
  console.log(data[0])
})

rs.on('end', () => {
  console.log('Event: end')
  console.log(chunkCount)
})

 

예시 2

write-file-stream.js

// @ts-check

// aaaaaaaaaaaabbbbbbbbbbbbbbbbbbaaaaaaaaaaabbbbbbbbbbbbbaaaaaa... aaabbbb
// 위와 같은 파일에서, a의 연속 구간(a block)의 개수와, b의 연속 구간(b block)의 개수를 세는 프로그램
const fs = require('fs')

const ws = fs.createWriteStream('D:/node_workspace/src/test.txt')

const NUMB_BLOCKS = 500

/** @type {Object.<string, number>} */
const numBlocksPerCharacter = {
  a: 0,
  b: 0,
}
for (let i = 0; i < NUMB_BLOCKS; i += 1) {
  const blockLengths = Math.floor(800 + Math.random() * 200)
  const character = i % 2 === 0 ? 'a' : 'b'
  ws.write(character.repeat(1024 * blockLengths))

  numBlocksPerCharacter[character] += 1
}

console.log(numBlocksPerCharacter)

 

read-file-stream.js

// @ts-check

const { log } = console
const fs = require('fs')

const rs = fs.createReadStream('src/test.txt', {
  encoding: 'utf-8',
  highWaterMark: 65536 * 2,
})

/** @type {Object.<string, number>} */
const numBlocksPerCharacter = {
  a: 0,
  b: 0,
}

/**
 * @type {string | undefined}
 */
let preveCharacter
rs.on('data', (data) => {
  if (typeof data !== 'string') {
    return
  }
  for (let i = 0; i < data.length; i += 1) {
    if (data[i] !== preveCharacter) {
      const newCharacter = data[i]

      if (!newCharacter) {
        /* eslint-disable-next-line no-continue*/
        continue
      }

      preveCharacter = newCharacter
      numBlocksPerCharacter[newCharacter] += 1
    }
  }
})

rs.on('end', () => {
  log('Event: end')
  log(numBlocksPerCharacter)
})

 

stream 처리 시 유의점 - JSON 스트림 처리기 예시

 

[목표] 줄바꿈으로 분리된 JSON들을 읽어서, 정상 JSON들에 한해 data 값 모두 더하기

// @ts-check

const { log } = console
const fs = require('fs')

const rs = fs.createReadStream('src/jsons', {
  encoding: 'utf-8',
  highWaterMark: 20,
})

let totalSum = 0
let accumulatedJsonStr = ''

rs.on('data', (chunk) => {
  if (typeof chunk !== 'string') {
    return
  }

  accumulatedJsonStr += chunk
  log(chunk)
  const lastNewLineIdx = accumulatedJsonStr.lastIndexOf('\n')

  const jsonLinesStr = accumulatedJsonStr.substring(0, lastNewLineIdx)
  accumulatedJsonStr = accumulatedJsonStr.substring(lastNewLineIdx)

  totalSum += jsonLinesStr
    .split('\n')
    .map((jsonLine) => {
      try {
        return JSON.parse(jsonLine)
      } catch {
        return undefined
      }
    })
    .filter((json) => json)
    .map((json) => json.data)
    .reduce((sum, curr) => sum + curr, 0)
})

rs.on('end', () => {
  log(totalSum)
})

 

pipeline 과 promise

 

pipeline은 transform stream을 쉽게 활용하게 도와준다.

stream.pipeline(
 fs.createReadStream('input'),
 zlib.createGzip(),
 fs.createWriteStream('compressed.gz')
)

예제 pipeline

// @ts-check

const { log, error } = console
const fs = require('fs')
const stream = require('stream')
const zlib = require('zlib')

stream.pipeline(
  fs.createReadStream('src/jsons'),
  zlib.createGzip(),
  fs.createWriteStream('D:/node_workspace/src/jsons.gz'),
  (err) => {
    if (err) {
      error('pipeline failed', err)
    } else {
      log('Pipeline succeeded')

      //압축 풀어서 기존 파일과 확인
      stream.pipeline(
        fs.createReadStream('src/jsons.gz'),
        zlib.createGunzip(),
        fs.createWriteStream('src/jsons.unzipped'),
        (_err) => {
          if (_err) {
            error('Gunzip failed', _err)
          } else {
            log('Gunzip succeeded')
          }
        }
      )
    }
  }
)

상단 소스를 promise를  추가하여 간결하게 코드 구현

// @ts-check

const fs = require('fs')
const stream = require('stream')
const zlib = require('zlib')
const util = require('util')

async function gzip() {
  return util.promisify(stream.pipeline)(
    fs.createReadStream('src/jsons'),
    zlib.createGzip(),
    fs.createWriteStream('src/jsons.gz')
  )
}

async function unzip() {
  return util.promisify(stream.pipeline)(
    fs.createReadStream('src/jsons.gz'),
    zlib.createGunzip(),
    fs.createWriteStream('src/jsons.unzipped')
  )
}

async function main() {
  await gzip()
  await unzip()
}

main()

 

728x90
반응형

'Node.js' 카테고리의 다른 글

Express  (0) 2021.09.15
리팩토링 프로젝트  (0) 2021.09.15
Node.js 내장 객체  (0) 2021.09.15
npm, yarn 등 패키지 매니저  (0) 2021.09.14
Node.js 핵심 개념 정리  (0) 2021.09.10

댓글