影子的知识库

影子的知识库

  • 知识库
  • GitHub

›stream系列

JVM系列

  • JVM内存区域
  • 对象创建-布局-访问
  • 内存溢出实战
  • 内存区域回收
  • 四大引用
  • 垃圾回收算法
  • HotSpot回收算法细节

Java系列

  • java注解
  • springboot请求参数绑定
  • springboot请求参数校验框架
  • YAML语法
  • 动态代理
  • classpath和java命令
  • springboot-aop编程
  • springboot统一异常处理
  • springboot数据库和事务
  • springboot拦截器
  • springboot中的web配置
  • docker的简单开发
  • springboot自动配置
  • 数据库的隔离级别
  • springboot监控
  • java类加载
  • java-agent的相关内容
  • 类加载器详解
  • java的SecurityManager
  • maven学习

Node

    JS 基础

    • 语法基础和数据类型
    • 数据类型转换
    • 语句 表达式 运算符
    • 变量与对象
    • 函数
    • 数据处理
    • 常用 API
    • 重点知识

    ES6

    • 块级作用域
    • 字符串和正则表达式
    • 函数
    • 对象
    • Symbol
    • Set和Map
    • 迭代器和生成器
    • 类
    • 数组
    • Promise

    Node 基础

    • 模块系统
    • package.json
    • 内置对象
    • npm脚本的使用
    • Buffer
    • Stream
    • 事件循环机制
    • 示例代码

    stream系列

    • 流的缓冲
    • 可读流
    • 可写流
    • 双工流和转换流
    • 自定义流

后期计划

  • 学习计划
  • 专题研究计划
Edit

本文内容

本文主要记录 node 中可写流的相关内容

什么是可写流

可写流是对数据要被写入的目的地的一种抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应。

所有可写流都实现了 stream.Writable 类定义的接口。

一般都是如下的用法:

const myStream = getWritableStreamSomehow();
myStream.write('一些数据');
myStream.write('更多数据');
myStream.end('完成写入数据');

close 事件

当流或其底层资源(比如文件描述符)被关闭时触发。 表明不会再触发其他事件,也不会再发生操作。

如果使用 emitClose 选项创建可写流,则它最终将一定会发出 'close' 事件。

这个事件不一定会发生,取决于流的实现者,所以我们用这个事件比较少

drain 事件

如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。

// 向可写流中写入数据一百万次。
// 留意背压(back-pressure)。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // 最后一次写入。
        writer.write(data, encoding, callback);
      } else {
        // 检查是否可以继续写入。 
        // 不要传入回调,因为写入还没有结束。
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 被提前中止。
      // 当触发 'drain' 事件时继续写入。
      writer.once('drain', write);
    }
  }
}

这个事件是比较核心的事件。实际上 stream.write(chunk) 会在缓冲区满了的时候返回 false ,表示说可写流的处理能力不够,让上游暂停输出。这是一种背压的机制,我们如果继续 stream.write(chunk),那么数据会继续堆积在可写流的缓冲里,突破 highWaterMark 导致内存占用不断升高。

当 stream.write(chunk) 返回 false,于是我们就暂停输出,转而监听 drain 事件。当 drain 事件发生时,就表示,可写流底层已经将缓冲区的数据处理完毕了,咱们可以接着往下输出了。

error 事件

如果在写入或管道数据时发生错误,则会触发 'error' 事件。 当调用时,监听器回调会传入一个 Error 参数。

除非在创建流时将 autoDestroy 选项设置为 true,否则在触发 'error' 事件时不会关闭流。

finish 事件

调用 stream.end() 且缓冲数据都已全部传给底层系统之后触发。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`写入 #${i}!\n`);
}
writer.end('写入结尾\n');
writer.on('finish', () => {
  console.error('写入已完成');
});

这个事件表示我们所有的东西都写完了

pipe 事件

src 通过管道流入到可写流的来源流

当在可读流上调用 stream.pipe() 方法时会发出 'pipe' 事件,并将此可写流添加到其目标集。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.log('有数据正通过管道流入写入器');
  assert.equal(src, reader);
});
reader.pipe(writer);

也就是说,可写流在调用 pipe 方法的时候,在可写流和可读流上,都会触发 pipe 事件

unpipe 事件

src 要移除可写流管道的来源流

在可读流上调用 stream.unpipe() 方法时会发出 'unpipe'事件,从其目标集中移除此可写流。

当可读流通过管道流向可写流发生错误时,也会触发此事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.log('已移除可写流管道');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

也就是说,这个事件也是同时在两个流上触发的

writable.cork()

writable.cork() 方法强制把所有写入的数据都缓冲到内存中。 当调用 stream.uncork() 或 stream.end() 时,缓冲的数据才会被输出。

当写入大量小块数据到流时,内部缓冲可能失效,从而导致性能下降, writable.cork() 主要用于避免这种情况。 对于这种情况,实现了 writable._writev() 的流可以用更优的方式对写入的数据进行缓冲。

也就是说:有时候一个输出流,我们可能在往里面一直不停的写小块的数据,输出流底层其实是内部缓存只要有数据就往底层写的,不是缓冲到了最大值才写。所以有可能导致输出流一直调用底层调用写小块数据。这个 cork() 方法,就是强制将我们后续调用 writable.write(chunk) 的数据进行缓冲,不触发底层的输出,直到我们调用 uncork() 或者 writable.end() 时,才一次性输出内存里的缓冲

writable.destroy([error])

  • error 可选,使用 'error' 事件触发的错误。
  • 返回: this

销毁流。 可选地触发 'error',并且触发 'close' 事件(除非将 emitClose 设置为 false)。 调用该方法后,可写流就结束了,之后再调用 write() 或 end() 都会导致 ERR_STREAM_DESTROYED 错误。 这是销毁流的最直接的方式。 前面对 write() 的调用可能没有耗尽,并且可能触发 ERR_STREAM_DESTROYED 错误。 如果数据在关闭之前应该刷新,则使用 end() 而不是销毁,或者在销毁流之前等待 'drain' 事件。 实现者不应该重写此方法,而应该实现 writable._destroy()。

说白了,一般别用这个方法,一般就用 end 完事儿了

writable.destroyed

在调用了 writable.destroy() 之后为 true。

writable.end([chunk[, encoding]] [, callback])
  • chunk 要写入的数据。 对于非对象模式的流, chunk 必须是字符串、 Buffer、或 Uint8Array。 对于对象模式的流, chunk 可以是任何 JavaScript 值,除了 null。
  • encoding 如果 chunk 是字符串,则指定字符编码。
  • callback 当流结束时的回调函数。
  • 返回: this

调用 writable.end() 表明已没有数据要被写入可写流。 可选的 chunk 和 encoding 参数可以在关闭流之前再写入一块数据。 如果传入了 callback 函数,则会做为监听器添加到 'finish' 事件。

调用 stream.end() 之后再调用 stream.write() 会导致错误。

writable.end([chunk[, encoding]] [, callback])

  • chunk 要写入的数据。 对于非对象模式的流, chunk 必须是字符串、 Buffer、或 Uint8Array。 对于对象模式的流, chunk 可以是任何 JavaScript 值,除了 null。
  • encoding 如果 chunk 是字符串,则指定字符编码。
  • callback 当流结束时的回调函数。
  • 返回: this

调用 writable.end() 表明已没有数据要被写入可写流。 可选的 chunk 和 encoding 参数可以在关闭流之前再写入一块数据。 如果传入了 callback 函数,则会做为监听器添加到 'finish' 事件。

调用 stream.end()之后再调用 stream.write()会导致错误。

writable.setDefaultEncoding(encoding)

  • encoding 默认的字符编码。
  • 返回: this

为可写流设置默认的 encoding

writable.uncork()

writable.uncork() 方法将调用 stream.cork() 后缓冲的所有数据输出到目标。

当使用 writable.cork() 和 writable.uncork() 来管理流的写入缓冲时,建议使用 process.nextTick() 来延迟调用 writable.uncork()。 通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 进行批处理。

stream.cork();
stream.write('一些 ');
stream.write('数据 ');
process.nextTick(() => stream.uncork());

如果一个流上多次调用 writable.cork(),则必须调用同样次数的 writable.uncork() 才能输出缓冲的数据。

stream.cork();
stream.write('一些 ');
stream.cork();
stream.write('数据 ');
process.nextTick(() => {
  stream.uncork();
  // 数据不会被输出,直到第二次调用 uncork()。
  stream.uncork();
});

writable.writable

如果调用 writable.write() 是安全的,则为 true。

writable.writableEnded

在调用了 writable.end() 之后为 true。 此属性不表明数据是否已刷新,对此请使用 writable.writableFinished

也就是说,这个方法只是表示是否调用过了 writable.end() ,并不表示数据已经写完了(可能底层还在写最后一批数据)

writable.writableFinished

在刚要触发 'finish' 事件之前,立即设置为 true。(也就是 finish 事件的上个瞬间)

writable.writableHighWaterMark

返回构造可写流时传入的 highWaterMark 的值。

这个值就是缓冲区的字节数大小

writable.writableLength

此属性包含准备写入的队列中的字节数(或对象)。 该值提供有关 highWaterMark 状态的内省数据。

也就是,当前缓冲了的字节数量。

只有已经完全输出成功了的才会从缓冲区移除,也就是说,

const { Readable, Writable } = require('stream');
class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // console.log('_write is called: ' + chunk.toString().toUpperCase() + ' | time is: ' + process.uptime());
        setTimeout(() => {
            done();
        }, 10000); // 表示我们输出一次需要10秒钟
    }
}
var out = new OutputStream({ highWaterMark: 8 });
console.log(out.write('1111'));
console.log(out.writableLength);
console.log(out.write('222'));
console.log(out.write('3333'));
setTimeout(()=>{
    console.log(out.writableLength);
},1000);

将会打印:缓冲区长度:11。我们一共输出了 11 个字节,一共包含了 3 轮输出。在第一秒之内,就会开始输出第一轮的 4 个字节。再一秒过后才输出 out.writableLength,这时候第一轮的 4 个字节的底层输出还没有输出完毕,因此仍然计算在 writableLength 内。

writable.writableObjectMode

获取用于给定 Writable 流的 objectMode 属性。

writable.write(chunk[, encoding] [, callback])

  • chunk 要写入的数据。 对于非对象模式的流, chunk 必须是字符串、 Buffer 或 Uint8Array。 对于对象模式的流, chunk 可以是任何 JavaScript 值,除了 null。
  • encoding 如果 chunk 是字符串,则指定字符编码。
  • callback 当数据块被输出到目标后的回调函数。
  • 返回: 布尔值,如果流需要等待 'drain' 事件触发才能继续写入更多数据,则返回 false,否则返回 true。

writable.write() 写入数据到流,并在数据被底层完全输出之后调用 callback。 如果发生错误,则 callback 可能被调用也可能不被调用。 为了可靠地检测错误,可以为 'error' 事件添加监听器。

在接收了 chunk 后,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true 。 如果返回 false ,则应该停止向流写入数据,直到 'drain' 事件被触发。

这就是流的背压机制,当我们收到 false 后,应该立刻停止向可写流继续输出输出,直到接收到 drain 事件,他表示可写流已经将所有的数据输出完毕了,我们可以继续往里面输出数据了。

当流还未被排空时,也是可以调用 write(),Node.js 会缓冲所有被写入的数据块,直到达到最大内存占用,这时它会无条件中止。这种就失去了流最大的好处,导致内存占用高,而且垃圾回收效率更低,性能更差。

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// 在回调函数被执行后再进行其他的写入。
write('hello', () => {
  console.log('完成写入,可以进行更多的写入');
});
Last updated on 11/8/2020
← 可读流双工流和转换流 →
  • 什么是可写流
  • close 事件
  • drain 事件
  • error 事件
  • finish 事件
  • pipe 事件
  • unpipe 事件
  • writable.cork()
  • writable.destroy([error])
  • writable.destroyed
  • writable.end([chunk[, encoding]] [, callback])
  • writable.setDefaultEncoding(encoding)
  • writable.uncork()
  • writable.writable
  • writable.writableEnded
  • writable.writableFinished
  • writable.writableHighWaterMark
  • writable.writableLength
  • writable.writableObjectMode
  • writable.write(chunk[, encoding] [, callback])
影子的知识库
Docs
Getting Started (or other categories)Guides (or other categories)API Reference (or other categories)
Community
User ShowcaseStack OverflowProject ChatTwitter
More
BlogGitHub
Copyright © 2020 Cen ZhiPeng