本文内容
本文主要记录 node 中的流(Stream)的缓冲的相关内容(仅包括普通的流,不涵盖对象流的内容)
可读流缓冲区
node 中的可读流和可写流都有缓冲区,实际上来说就是一个 BufferList,可以分别使用的 writable.writableBuffer 或 readable.readableBuffer 来获取。
这个缓冲区就是一系列的 Buffer 的列表,可读流的 BufferList 中的每个 Buffer 都是我们在调用 readable.push(chunk) 时的那个 chunk 缓冲。对于普通的流来说,highWaterMark 指定了缓冲区字节的总数。 对于对象模式的流, highWaterMark 指定了对象的总数。
_read(size) 方法
这个方法是流的底层读取方法,表示可读流要去数据源获取多少个字节的数据。因为一个可读流必然是能被读取出一系列字节的流,那么这些字节其实也是有一个来源的,可能是从其它地方读取的,可能是自己生成的,总之是有一个来源途径。这个 _read(size) 就表示:可读流从底层来源获取到 size 个字节的数据存储在自己的缓冲区中。实际上我们都是使用 readable.read(size) 去读取可读流的,而我们读取的其实就是可读流的缓冲区,如果缓冲区的内容不够我们读的,那自然需要可读流去底层读取内容填充到缓冲区中去(调用 _read(size)),可读流的工作方式就是类似于这种生产者消费者模型的方式。
当我们调用 readable.read(size) 的时候,如果缓冲区没有到达 highWaterMark 字节(其实只有第一次读和读到末尾时才会出现这种情况),流就会调用底层的 _read(size0) 方法,这里的 size0 规则:大于等于 size 但最接近 size 的2的n次方、highWaterMark,这两者中的较大的值,并且将新的 highWaterMark 更新为本次读取的值,也就是确保本次读取不会减少缓冲区的大小,而且 highWaterMark 会动态增大。
也就是说:
highWaterMark = 4,size = 5,那么将会调用_read(8),同时highWaterMark调整为 8highWaterMark = 4,size = 2,那么将会调用_read(4)
_read(size0)读取完毕后,缓冲区大小有可能仍旧小于 highWaterMark 字节,此时会再次调用 _read(size0) 确保缓冲区缓冲了 highWaterMark 字节。
highWaterMark
它定义了缓冲区应该保存的字节数量,当缓冲区即将满溢时,我们调用 readable.push(chunk) 将返回 false ,告知我们不应该再读取数据到流的缓冲区去了。例如:
const { Readable, Writable } = require('stream');
class SerialCharStream extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.num = 0;
}
_read(size) {
console.log('size is: ' + size);
while (this.max > 0) {
console.log(this.push(String.fromCharCode('a'.charCodeAt(0) + this.num % 26)));
this.num++;
this.max--;
size--;
if (size < 1) {
return;
}
}
this.push(null);
}
}
const rns = new SerialCharStream(100, {
highWaterMark: 4
});
console.log(rns.read(5));
上述代码定义了一个可读流,它会循环地生成 abcd...abcd 字符。每次底层读取时都会输出读取 readable.push 时的返回值
const rns = new SerialCharStream(100, {highWaterMark: 4 }); 定义了一个可以读取100个字符的可读流,缓冲区大小定义为 4
console.log(rns.read(5)); 表示我们要从流中读取 5 个字节。
最后输出如下:
size is: 8
true
true
true
true
true
true
true
false
<Buffer 61 62 63 64 65>
size is: 8
true
true
true
true
false
false
false
false
size is: 8 表示 highWaterMark 此次调整为 8 了,因为 rns.read(5) 超出了 highWaterMark 的大小,所以 highWaterMark 会自动调大。
第二次 size is: 8 是表示:经过第一次的底层读取 8 个字节后,再被 rns.read(5) 读取走了 5 个字节,此时缓冲区大小未满,那么就会再次调用 _read(highWaterMark) 去缓存数据。而在此次读取途中就会读满缓冲区,因此第二次读取的时候输出了前面 4 个 true,后面 4 个 false。符合标准的流在 _read(size) 方法中,第一次返回 this.push(chunk) 为 false 时就应该停止读取,从方法中返回,以此保证不会超出缓冲区的大小。
从以上可以看出,可读流的缓冲区基本上会保持在 highWaterMark 的大小,除非是第一次读取、以及读取到了流的末尾。
我们在自定义流的时候,如果要实现最标准的流,一定要确保 _read(size) 方法在第一次 this.push(chunk) 为 false 时就返回。
push(chunk) 方法
可读流的底层方法,将 Buffer 添加到可读流的缓冲区。可能这么说会让人比较疑惑,为什么可读流要有往里面添加数据的方法,但是再仔细想想就能明白:可读流是需要产生字节内容的,而这个内容本身是怎么来的呢?肯定是从其它的流或者内容中添加进来的,而 push(chunk) 就是可读流的这么一个添加机制。我们往可读流里 push(chunk) 了什么内容,就决定了我们可以从可读流中读取到什么内容。
正常来说,我们只应该在自定义可读流的 _read(size) 方法里使用 push(chunk) 方法,在通常的使用一个现成的流的时候,基本不应该使用这个方法,除非你确实很清楚你在干什么。
当可读流处在暂停模式时,使用 readable.push() 添加的数据可以在触发 'readable' 事件时通过调用 readable.read()读取。
当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。但是这种情况下,流的读取可能不是按照顺序来的了,因为我们是先触发了 data 事件,后 push ,那么后面 push 触发的 data 事件我们无法控制顺序。
push(chunk)会往可读流的BufferList添加这个chunk,也就是相当于可读流读取到了底层的数据到缓冲区。会触发data事件或者readable事件push(chunk)返回 false 的时候表示缓冲区已经满了,这时候在我们自定义可读流的_read(size)方法中,就应该立刻返回push(null)表示我们要结束这个可读流了,会触发end事件。所以我们自定义流里面可以在需要结束流时进行push(null)调用。
可写流缓冲区
和可读流的缓冲区一样,可写流的内部也有一个缓冲区 WritableBuffer,它是一个 BufferList 的结构,存储了一系列的 Buffer 对象。每个 Buffer 对象其实就是咱们在调用 writable.write(chunk) 时的 chunk。
可写流的作用就是:我们调用 write 之类的方法,往可写流中写数据,而可写流最终将数据输出到了某个地方,例如标准输出流、文件等。
可写流的缓冲区的作用就是:平衡我们往可写流的写入速度和可写流往目的地的输出速度。例如可写流向文件输出时,速度会比较慢,因为需要经过底层的系统调用,而且磁盘IO本就很慢,通常来说比咱们内存速度低一两个数量级。有缓冲区之后,我们就往缓冲区写入数据,可写流从缓冲区中拿走数据进行底层的系统调用进行输出。这样有以下好处:
- 减少了系统调用的次数,因为每次会拿走一整块的缓冲内容,单次写入多,那么写入的次数肯定少。这样的话性能自然会有提升(底层系统调用相当耗费资源和性能)
- 如果底层写入慢,因为有了缓冲区,我们在缓冲区满了的时候,就知道不再往可写流里写入数据了,那么就可以去处理进程的其它任务,这样就不需要浪费CPU资源
_write(chunk, encoding, callback) 方法
chunk:要写入的Buffer,从传给stream.write()的string转换而来。 如果流的decodeStrings选项为false或者流在对象模式下运行,则数据块将不会被转换,并且将是传给stream.write()的任何内容。encoding:如果chunk是字符串,则指定字符编码。 如果chunk是Buffer或者流处于对象模式,则无视该选项。callback:当数据块被处理完成后的回调函数。这个回调通知可写流:这次的底层输出的调用已经完成了,可以接着下一次的输出了。当出现错误时,也可以传递一个 error 参数。
这个方法是底层输出调用,只应该在可写流内部使用。当缓冲区有数据时,就会调用这个方法去输出到底层输出。当 callback 回调被调用之后,输出流就知道本次的底层输出完成了,如果缓冲区有数据,就会进行下一次的 _write() 调用去输出缓冲区里的数据,如果缓冲区里没有数据了,就会发送 drain 事件,表示底层输出完毕了,可以接着往缓冲区写数据了。
注意:一定要调用了
callback才能通知流:本次输出完毕了。如果没有调用callback,流就会认为还在输出中(有的输出相当耗时,例如网络IO),从而不会调用下一次输出
writable.write(chunk[, encoding] [, callback])
chunk:要写入的数据。 对于非对象模式的流,chunk必须是字符串、Buffer或Uint8Array。 对于对象模式的流,chunk可以是任何 JavaScript 值,除了null。encoding:如果chunk是字符串,则指定字符编码。callback:当数据块被输出到目标后的回调函数。- 返回:如果流需要等待
drain事件触发才能继续写入更多数据,则返回false,否则返回true。其实就是说,返回false时,我们需要停止继续往输出流中写数据了,此时缓冲的数据还未写完,继续写缓冲区将会溢出。实际上继续写也是可以,最终来看就是内存占用越来越高,直到最后内存实在撑不住了就炸裂了。如果内存足够的话,可能数据最终还是会写完而且内存不会溢出,从内存占用来看就是内存一开始越来越高(因为输出流中积压了太多数据而写入的底层调用过慢),后来内存慢慢开始降低(可能往输出流写入的数据此时已经全部在输出流的缓冲区了,也就是缓冲区不会再变大了,内存就会开始降低,因为底层还在一直写出),直到最后完成
writable.write() 写入数据到流,并在数据被完全处理之后调用 callback。 如果发生错误,则 callback 可能被调用也可能不被调用。 为了可靠地检测错误,可以为 'error' 事件添加监听器。
在接收了 chunk 后,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true 。 如果返回 false ,则应该停止向流写入数据,直到 'drain' 事件被触发。也就是说:一旦 write 返回了 false,我们就应该停止写入,直到触发了 'drain' 事件。
const { Readable, Writable } = require('stream');
class OutputStream extends Writable {
_write(chunk, enc, done) {
console.log('_write is called: ' + chunk.toString().toUpperCase() + ' | time is: ' + process.uptime());
setTimeout(() => {
done();
}, 10000); // 表示我们输出一次需要10秒钟
}
}
var out = new OutputStream({ highWaterMark: 8 });
console.log(out.write('1111'));
console.log(out.writableBuffer);
console.log(out.write('222'));
console.log(out.write('3333'));
console.log(out.writableBuffer);
输出如下:
_write is called: 1111 | time is: 0.138032964
true
[]
true
false
[
{
chunk: <Buffer 32 32 32>,
encoding: 'buffer',
isBuf: true,
callback: [Function: nop],
next: {
chunk: <Buffer 33 33 33 33>,
encoding: 'buffer',
isBuf: true,
callback: [Function: nop],
next: null
}
},
{
chunk: <Buffer 33 33 33 33>,
encoding: 'buffer',
isBuf: true,
callback: [Function: nop],
next: null
}
]
_write is called: 222 | time is: 10.15297613
_write is called: 3333 | time is: 20.158242229
可以看到,第一次调用 write,立刻就会进行底层的输出,然后后续调用都加入到了缓冲区,直到 10 秒后,第一次底层输出完毕了,才会继续输出缓冲区的数据
highWaterMark
定义了可写流的缓冲区字节数。当我们 write(chunk) 时缓冲区即将大于 highWaterMark 时,就将会返回 false 以此机制来控制写入速度。pipe() 方法之所以能协调两个流的速度,也是靠了这种背压机制。