Node.js 核心模块(二)

事件模块

Events 模块是 NodeJS 中非常重要的类,它有一个非常重要的类,即 EventEmitter。对于 NodeJS 来说,它通过 EventEmitter 类实现事件统一管理。

不过在实际开发,我们单独使用该模块的场景也不多,因为 NodeJS 本身就是基于事件驱动来实现异步操作。事件驱动底层的表现就是 EventEmitter 类。

很多核心模块本身就继承了这个类,所以它们也具备事件注册和发布的能力,使用的时候就无需单独引入 Event 模块。

events 与 EventEmitter

  • node.js 是基于事件驱动的异步操作架构,内置 events 模块
  • events 模块提供了 EventEmitter 类
  • node.js 中很多内置核心模块继承自 EventEmitter 类,例如 fs、http 等

EventEmitter 常见 API

  • on:添加事件被触发时调用的回调函数
  • emit:触发事件,按照注册的顺序同步调用每个事件监听器
  • once:添加事件被触发时调用的回调函数,只会执行一次
  • off:移除指定监听器
const EventEmitter = require('events')

const event = new EventEmitter()

// on
event.on('event', () => {
  console.log('event trigger 1')
})
event.on('event', () => {
  console.log('event trigger 2')
})

// 相同事件触发多次,会执行多次
event.emit('event')
event.emit('event')

console.log('---------------------')

// once
event.once('event-one', () => {
  console.log('event one trigger 1')
})
event.once('event-one', () => {
  console.log('event one trigger 2')
})

// 相同事件触发多次,只会执行一次
event.emit('event-one')
event.emit('event-one')

console.log('---------------------')

const callback = (...args) => {
  console.log('event off trigger', args)
}

event.on('event-off', callback)

// 函数传参
event.emit('event-off', 1, 2)
// 取消订阅
event.off('event-off', callback)
event.emit('event-off')

console.log('---------------------')

// 使用 function 定义的函数可以正确接收到 this
event.on('test', function () {
  console.log('event test trigger', this)
})

event.emit('test')

console.log('---------------------')
js

很多内置模块已经继承了 EventEmitter 模块,所以我们将来在使用相关模块实例对象时,可以直接调用上述提到的 API。

const fs = require('fs')

const crt = fs.createWriteStream()

crt.on('pipe', () => {})
js

内置模块通常已经预先定义了很多事件,所以我们可以通过事件驱动的方式来完成代码编写。

发布订阅模式

发布订阅模式定义对象间一对多的依赖关系,不同对象之间可以实现解耦。

发布订阅要素

  • 缓存队列,存放订阅者信息
  • 具有增加、删除订阅的能力
  • 状态改变时通知所有订阅者执行监听

发布订阅与观察者模式

发布订阅存在调度中心,观察者不存在。

状态发生改变时,发布订阅无须主动通知,由调度中心决定订阅内容如何执行。

代码实现

class PubSub {
  constructor() {
    this._events = {}
  }

  subscribe(event, callback) {
    if (!this._events[event]) this._events[event] = []

    this._events[event].push(callback)
  }

  publish(event, ...args) {
    const items = this._events[event]

    Array.isArray(items) &&
      items.forEach(function (callback) {
        callback.call(this, ...args)
      })
  }
}

const ps = new PubSub()

ps.subscribe('event', () => {
  console.log('event trigger 01')
})
ps.subscribe('event', () => {
  console.log('event trigger 02')
})

ps.publish('event')
js

EventEmitter 模拟实现

function $Event() {
  this._events = Object.create(null)
}

$Event.prototype.on = function (type, callback) {
  if (!this._events[type]) this._events[type] = []
  this._events[type].push(callback)
}

$Event.prototype.emit = function (type, ...args) {
  if (this._events && Array.isArray(this._events[type])) {
    this._events[type].forEach(callback => {
      callback.call(this, ...args)
    })
  }
}

$Event.prototype.off = function (type, callback) {
  if (this._events && this._events[type]) {
    this._events[type] = this._events[type].filter(
      item => item !== callback && item.link != callback
    )
  }
}

$Event.prototype.once = function (type, callback) {
  const _callback = function (...args) {
    callback.call(this, ...args)
    this.off(type, _callback)
  }
  _callback.link = callback
  this.on(type, _callback)
}

console.log('---------------------------------------')

const ev = new $Event()

const fn = function (...args) {
  console.log(`event01 process ${args}`)
}

ev.on('event01', fn)
ev.on('event01', () => {
  console.log('event01 process')
})

ev.emit('event01', 1, 2)
ev.off('event01', fn)

ev.emit('event01', 1, 2)

console.log('---------------------------------------')

ev.once('event02', fn)
ev.emit('event02', 1, 2)
ev.emit('event02', 1, 2)

console.log('---------------------------------------')

ev.once('event03', fn)
ev.off('event03', fn)
ev.emit('event03', 1, 2)
ev.emit('event03', 1, 2)
js

Stream 模块

基本概念

ls | grep *.js // 流操作的一种应用
js

上述代码将左侧执行的数据交由右侧进行处理,这种通过流操作数据的方式,无论是在空间还是时间上都会存在明显的效率提升。

Node.js 诞生之初就是为了提升 IO 性能,其中文件操作系统和网络模块就实现了流接口。

Node.js 中的流就是处理流式数据的抽象接口,Node.js 的 Stream 模块实现了用于处理流数据的对象。

优势

为什么在应用程序中要用流来处理数据?

  • 同步读取资源文件,用户需要等待数据读取完成
  • 资源文件最终一次性加载至内存,开销较大

针对上述问题,我们可以用流来操作数据,将资源文件分段处理,或者配合管道进行处理。

流处理数据的优势:

  • 时间效率:流的分段处理可以同时操作多个数据 chunk
  • 空间效率:同一时间流无需占据大内存空间
  • 使用方便:流配置管道,使扩展程序变的更简单

分类

Node.js 内置了 stream,它实现了流操作对象。

  • Readable:可读流,能够实现数据的读取
  • Writeable:可写流,能够实现数据的写操作
  • Duplex:双工流,即可读又可写
  • Transform:转换流,可读可写,还能实现数据转换

上述流操作对象是 Stream 实现的四个具体的抽象,所有流都继承自 EventEmitter 模块。

案例

const fs = require('fs')

const rs = fs.createReadStream('test.txt')
const ws = fs.createWriteStream('test1.txt')

rs.pipe(ws)
js

上述案例就是一个简单的读取写入操作。

四种流类型

可读流

可读流是专门生产供程序消费数据的流。

Node.js 最常见的数据生产方式就是读取磁盘文件或者取网络请求中的内容。

自定义可读流
  • 继承 stream 里的 Readable 类
  • 重写 _read 方法调用 push 产出数据

自定义可读流问题:

  • 底层数据读取完成之后如何处理?
    • 可以在读取的时候传递 null 值,告知数据已经读取完毕
  • 消费者如何获取可读流中的数据?
    • Readable 提供两种事件,readable 事件和 data 事件
    • Readable 存在两种模式,分别是流动模式和暂停模式,对于使用者来说,两者的区别在于消费数据的时候是否需要主动调用 read 方法来读取数据
 const { Readable } = require('stream')

// 定义数组存放数据,模拟底层数据
const source = ['yueluo', 'heora', 'yzq']

class $Readable extends Readable {
  constructor(source) {
    super()
    this.source = source
  }

  _read() {
    this.push(this.source.shift() || null)
  }
}

const readIns = new $Readable(source)

// readIns.on('readable', () => {
//   let data = null

//   // 打印值可能存在与预期不符的情况
//   // 这其实是因为 read 的工作机制问题
//   // 调用 read 时缓存区已经存在值,所以第一次打印的时候会打印出两个值
//   // read 方法可以传入指定数据长度,这样打印时会和预期会一致
//   // 暂停模式,我们需要手动调用 read 读取数据
//   while ((data = readIns.read()) !== null) {
//     console.log('readable', data.toString())
//   }
// })

// 流动模式,这种读取方式会依次读取数据,更符合预期
readIns.on('data', data => {
  console.log('data', data.toString())
})
js
消费数据
  • readable 事件:当流中存在可读取数据时触发
  • data 事件:当流中数据块传给消费者时触发
总结
  • 明确数据生产与消费流程
  • 利用 API 实现自定义的可读流
  • 明确数据消费的事件使用

可写流

可读流用来生产数据,可写流用来消费数据。通过可写流可以把数据写入到指定的地方, 常见的操作就是往磁盘文件中写入内容或者对 TCP、HTTP 的网络响应进行操作。

基本使用

const fs = require('fs')

// 1. 创建可读流,生产数据
const rs = fs.createReadStream('test.txt')

// 2. 修改字符编码,便于后续使用
rs.setEncoding('utf-8')

// 3. 创建可写流,消费数据
const ws = fs.createWriteStream('test2.txt')

// 4. 监听事件调用方法完成数据的消费
rs.on('data', chunk => {
  // 执行数据写入
  ws.write(chunk)
})
js
自定义可写流
  • 继承 stream 模块的 Writeable
  • 重写 _write 方法,调用 write 执行写入
const { Writable } = require('stream')

class $Writeable extends Writable {
  constructor() {
    super()
  }

  _write(chunk, _, done) {
    process.stdout.write(chunk.toString() + '-')
    process.nextTick(done)
  }
}

// 创建可写流用于消费数据
const ws = new $Writeable()

ws.write('yzq is a boy', 'utf-8', () => {
  console.log('write success')
})
js

可写流事件

  • pipe 事件:可读流调用 pipe() 方法向可写流传输数据时就会触发可写流的 pipe 事件,从而完成最终的数据写入操作
  • unpipe 事件:可读流调用 unpipe() 方法时触发,会在 read 方法返回 false,数据又可以继续写入的时候被触发,不会存在内存溢出等问题

双工流和转换流

双工流和转换流(Duplex && Transform)

Node.js 中 stream 是流操作的抽象接口集合。可读、可写、双工、转换是单一抽象具体实现。

流操作的核心功能就是处理数据,Node.js 诞生的初衷就是解决密集型 IO 事务。Node.js 中处理数据模块继承了流和 EventEmitter 模块。

双工流

Duplex 是双工流,既能生产又能消费。

自定义双工流:

  • 继承 Duplex 类
  • 重写 _read 方法,调用 push 生产数据
  • 重写 _write 方法,调用 write 消费数据
const { Duplex } = require('stream')

const source = ['heora', 'yueluo', 'yzq']

class $Duplex extends Duplex {
  constructor(source) {
    super(source)
    this.source = source
  }

  _read() {
    this.push(this.source.shift() || null)
  }

  _write(chunk, enc, next) {
    if (Buffer.isBuffer(chunk)) {
      chunk = chunk.toString()
    }
    process.stdout.write(chunk + '--')
    process.nextTick(next)
  }
}

const duplex = new $Duplex(source)

duplex.on('data', chunk => {
  console.log(chunk.toString())
})

// duplex.write('test', 'utf-8', () => {
//   console.log('duplex test: readable and writeable')
// })
js
转换流

Transform 也是一个双工流。Duplex 的读和写是相互独立的,它的读操作创建的数据不能被直接当作数据源使用,但是在 Transform 里这种操作是可以的。

自定义实现

  • 继承 Transform 类
  • 重写 _transform 方法,调用 push 和 callback
  • 重写 _flush 方法,处理剩余数据
const { Transform } = require('stream')

class $Transform extends Transform {
  constructor() {
    super()
  }

  _transform(chunk, _, callback) {
    this.push(chunk.toString().toUpperCase())
    callback(null)
  }
}

const transform = new $Transform()

transform.write('a')
transform.write('b')
transform.end('c')

transform.on('data', chunk => {
  console.log(chunk.toString())
})

transform.pipe(process.stdout)
js

总结

  • Readable 可读流
    • 专门生产数据的流
    • 常见的操作就是监听 readable 事件和 data 事件,其中 readable 事件需要我们主动调用 read 事件消耗数据,data 是流动模式,会一直读取数据
  • Writeable 可写流
  • 专门消费数据的流
  • 主要的方式就是调用 write 方法,然后再把数据源中的数据写入到指定位置
  • Duplex 双工流
    • 既可读又可写
    • 读写之间相互独立
  • Transform 转换流
    • 既可读又可写
    • 读写之间可以相互转换,可以自定义转换操作

文件可读流

创建和消费

// data 事件消费数据

const fs = require('fs')

const rs = fs.createReadStream('test.txt', {
  flags: 'r',
  encoding: null,
  fd: null,
  mode: 438,
  autoClose: true,
  start: 0,
  // end: 3,
  highWaterMark: 2
})

rs.on('data', chunk => {
  console.log(chunk.toString())

  rs.pause() // 切换暂停模式

  setTimeout(() => {
    rs.resume() // 切换流动模式
  }, 1000)
})
js
const fs = require('fs')

const rs = fs.createReadStream('test.txt', {
  flags: 'r',
  encoding: null,
  fd: null,
  mode: 438,
  autoClose: true,
  start: 0,
  // end: 3,
  highWaterMark: 4
})

rs.on('readable', () => {
  let data

  // while ((data = rs.read(2)) !== null) {
  //   console.log(data.toString())
  // }

  // while ((data = rs.read(4)) !== null) {
  //   console.log(data.toString())
  // }

  while ((data = rs.read(1)) !== null) {
    // _readableState 长度与 highWaterMark 密切相关
    console.log(data.toString(), rs._readableState.length)
  }
})
js

事件与应用

const fs = require('fs')

const rs = fs.createReadStream('test.txt', {
  flags: 'r',
  encoding: null,
  fd: null,
  mode: 438,
  autoClose: true,
  start: 0,
  // end: 3,
  highWaterMark: 4
})

rs.on('open', fd => {
  // open 操作并不是在数据被消费之后才被处理
  // 当我们调用 createReadStream 时就会触发 open 事件
  console.log(fd, 'file open')
})

rs.on('close', () => {
  // 默认情况下并不会被触发
  // 默认情况下为暂停模式,close 必须在数据被消费之后才会被触发
  console.log('file close')
})

let bufferArr = []

rs.on('data', chunk => {
  console.log(chunk)

  bufferArr.push(chunk)
})

rs.on('end', () => {
  // end 在 close 之间被执行
  console.log('file clear')

  console.log(Buffer.concat(bufferArr).toString())
})

rs.on('error', err => {
  console.log('has error', err)
})
js

文件可写流

基础使用

const fs = require('fs')

const ws = fs.createWriteStream('test3.txt', {
  flags: 'w',
  mode: 438,
  fd: null,
  encoding: 'utf8',
  start: 0,
  highWaterMark: 3 // default 16
})

// 因为我们使用过的是 fs 下的可写流
// 所以写入的内容通常是 字符串或者 buffer
ws.write('月落01', () => {
  console.log('write success')
})
ws.write('月落02', () => {
  console.log('write success')
})
js

事件相关

const fs = require('fs')

const ws = fs.createWriteStream('test3.txt', {
  flags: 'w',
  mode: 438,
  fd: null,
  encoding: 'utf8',
  start: 0,
  highWaterMark: 3 // default 16
})

ws.on('open', fd => {
  console.log('file open', fd)
})

ws.write('heora')

// 对于文件可写流来说,只有在调用 end 数据写入操作全部完成之后才会执行
ws.on('close', () => {
  console.log('file close')
})

// end 执行意味数据写入操作完成
// 不能再 end 之后执行写入操作,否则会报错
ws.end()

ws.on('error', () => {
  console.log('file error')
})
js

write 执行流程

const fs = require('fs')

const ws = fs.createWriteStream('test.txt', {
  highWaterMark: 3
})

let flag = ws.write('1')
console.log(flag) // true

flag = ws.write('2')
console.log(flag) // true

flag = ws.write('3')
console.log(flag) // false

// 如果 flag 为 false,并不意味当前数据不能被执行写入(flag 值仅代表上游产量问题)

// 1. 第一次调用 write 时,会把数据直接写入到文件中
// 2. 第二次调用 write,会把数据写入到缓存中
// 3. 生产速度和消费速度是不同的,一般情况下生产速度要比消费速度快很多
//    例如 highWaterMark 设置为 3 字节,假设生产者给出 5 个字节执行写入,那么在某个时间点就会超过水位线。
//    一旦超出水位线,write 结果就会返回 false 告知,仅代表警戒作用,不代表会溢出
// 4. 当 flag 之后,并不意味着当前次数据不能被写入,但是我们应该告知数据生产者,当前的消费速度已经跟不上生产速度,
//    所以这个时候,一般我们会将可读流的模式修改为暂停模式
// 5. 当数据生产者暂停之后,消费者会慢慢消费缓存中数据,直到可以再次执行写入操作
// 6. 当缓冲区可以继续写入数据时,应该如何告知生产者? 使用 drain 事件

ws.on('drain', () => {
  console.log('drain trigger')
})
js

可以对上述代码进行调试,阅读其源码实现。

{
  // 使用 IntelliSense 了解相关属性。
  // 悬停以查看现有属性的描述。
  // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
  "version": "0.2.0",
  "configurations": [
    {
      "type": "node",
      "request": "launch",
      "name": "启动程序",
      "skipFiles": [
        // "<node_internals>/**"
      ],
      "program": "${workspaceFolder}/node/core_module/_stream/write_drain.js"
    }
  ]
}
json

需要注意的是,我们需要注释掉 skipFiles 中的默认配置,只有这样只能单步调试进入源码中。

控制写入速度

实际业务开发中 drain 并不常用,通常我们会使用 pipe 方法。

drain 主要的作用是控制写入速度或按需完成限流。具体的手段就是指定 highWaterMark,然后配置 write 函数的返回值进行使用。

// 假设我们有一组数据,我们需要讲数据写入指定文件
// 1. 一次性写入,调用 write 方法
// 2. 分批写入

const fs = require('fs')

// 1. 一次性写入。针对大内存来说,这种操作是不友好的,会存在短时间溢出,瞬间撑满的情况
// const ws = fs.createWriteStream('test.txt')
// ws.write('月落森森')

// ------------------------

// 2. 分批执行写入
// const ws = fs.createWriteStream('test.txt')

// const source = '月落森森'.split('')

// // 其实还是一次性写入
// const executeWrite = () => {
//   while (source.length) {
//     ws.write(source.shift())
//   }
// }

// executeWrite()

// ------------------------

// 3. 控制写入速度
const ws = fs.createWriteStream('test.txt', {
  highWaterMark: 3
})

const source = '月落森森'.split('')

let falg

const executeWrite = () => {
  falg = true
  while (source.length && falg) {
    falg = ws.write(source.shift())
  }
}

executeWrite()

ws.on('drain', () => {
  console.log('darin trigger')

  setTimeout(executeWrite, 1 * 1000)
})
js

使用 drain 控制写入速度并不是最好的方案,通常我们使用的还是 pipe。

它是一个管道,我们只需要按照相应的类型,把可读流的东西交给可写流、转换流、双工流或自己去进行处理。

背压机制

Node.js 的 stream 已经实现了一种可以保证数据平滑流动过的背压机制。

我们需要了解的是 Node.js 数据读写的过程以及背压机制解决了什么问题,还有就是 pipe 方法内部实现原理。

数据读写可能存在的问题:

const fs = require('fs')

const rs = fs.createReadStream('test1.txt')
const ws = fs.createWriteStream('test2.txt')

rs.on('data', chunk => {
  ws.write(chunk)
})
js

上述代码的作用就是实现文件拷贝。这样写理论上不存在问题,但是数据读取的速度是远远大于数据写入速度的,消费者的速度往往跟不上生产者的速度。

这样就会出现产能过剩的问题,writeable 内部维护了一个队列,当它不能实时消费上游传递的数据时,它就会尝试把不能被消化掉的数据,先缓存到队列中,但是因为队列大小存在上限。因此读写的过程中,如果不去实现背压机制,那么很有可能就会出现以下问题:

  • 内存溢出
  • GC 频繁调用
  • 影响其他进程工作

基于这些问题我们就存在一套可以使数据平滑流动的机制,这其实就是背压机制。

默认 Readable 缓存空间是 16kb,在我们的文件可读流中进行了重新定义,为 64 kb。

从消费者来看,可读流就是一个水池,在它之中装满了想要消费的数据,在池子外部有一个开关,如果我们采用流动模式相当于一直防水,直到流完为止。

如果在这个过程中,用水的人跟不上放水的速度,所以这时用水的人在处理不了的情况下,就会想办法告诉可读流,当前实在处理不了,需要先停一会儿,消化消化。此时,可读流就可以调用 pause 方法将流动模式切换为暂停模式,然后消费者就会慢慢处理缓存的水资源,等水资源消费差不多之后,就会告诉水池可以继续放水。

但是用水的人应该如何通知放水的人?我们先来看一下数据的写操作。

数据从上游生产者传递过来,然后可写流调用 write 方法消费数据,在可写流内部同样存在内存空间充当缓存队列,他同样也具有水位线。如果某个时刻上游的数据超过上限,说明无法消费更多的水资源,此时 write 方法调用之后会返回 false 给上游的生产者,让他暂停放水。等到可写流将缓存中的数据消费差不多之后,就会触发 drain 事件告诉上游的生产者可以继续放水。这个时候就可以调用 resume 方法再次打开阀门即可。如此往复,保证数据的平滑流动,既不会出现内存被撑爆的情况,也不会在某个时刻无水可用。这其实也是 pipe 方法内部的实现原理。

代码实现:

const fs = require('fs')

const rs = fs.createReadStream('test1.txt', {
  highWaterMark: 4 // 默认为 64 kb,文件可写流默认为 16 kb,两者之间存在 4:1 的关系
})
const ws = fs.createWriteStream('test2.txt', {
  highWaterMark: 1
})

// 1. 流动模式,一次性写入
// rs.on('data', chunk => {
//   ws.write(chunk, () => {
//     console.log('write done')
//   })
// })

// ----------------------

let flag = true

rs.on('data', chunk => {
  flag = ws.write(chunk, () => {
    console.log('write done')
  })

  if (!flag) rs.pause()
})

ws.on('drain', () => {
  rs.resume()
})
js

上述代码就是 pipe 方法的实现原理,实际开发过程中我们直接这样使用的机会并不多,除非是我们自己想拿到每一个数组,单独对数据进行处理。 最常用的还是使用 pipe 方法。

const fs = require('fs')

const rs = fs.createReadStream('test1.txt', {
  highWaterMark: 4 // 默认为 64 kb,文件可写流默认为 16 kb,两者之间存在 4:1 的关系
})
const ws = fs.createWriteStream('test2.txt', {
  highWaterMark: 1
})

rs.pipe(ws)
js