本文内容
本文记录一些关于如何自定义流的内容
如何自定义流
声明一个新的 JavaScript 类,该类继承了四个基本流类之一(stream.Writeable、 stream.Readable、 stream.Duplex 或 stream.Transform),并确保调用了相应的父类构造函数:
const { Writable } = require('stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({
highWaterMark,
autoDestroy: true,
emitClose: true
});
// ...
}
}
当继承流时,在传入基本构造函数之前,务必清楚使用者可以且应该提供哪些选项。 例如,如果实现需要 autoDestroy 和 emitClose 选项,则不允许使用者覆盖这些选项。 应明确要传入的选项,而不是隐式地传入所有选项。
自定义的流需要实现以下方法:
| 用例 | 继承类 | 需要实现的方法 |
|---|---|---|
| 可读流 | Readable | _read() |
| 可写流 | Writable | _write()、_writev()、_final() |
| 可读可写 | Duplex | _read()、_write()、_writev()、_final() |
| 对写入的数据进行操作,然后读取结果 | Transform | _transform()、_flush()、_final() |
简单实现
对于简单的案例,构造流可以不依赖继承。 直接创建 stream.Writable、 stream.Readable、 stream.Duplex 或 stream.Transform 的实例,并传入对应的方法作为构造函数选项。
const { Writable } = require('stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
}
});
注:
也就是说,实现自定义流有 2 种方式:
- 继承方式
- 继承要实现的 4 种流之一
- 自定义流的构造方法中调用基类的构造方法
- 自定义流实现内部必须的方法,例如自定义可读流,需要实现
_read(size)方法- 新建对象方式
- new 基本流
- 传入参数对象中包含
write、read等方法,用来覆盖_write和_read方法。具体有哪些方法,请参考上面的列表这两种方案,下面那种稍微简单一点,但是可配置性较差。上面那种方案,我们可以配置更多的自定义流的属性和行为,更加灵活
自定义可读流
new stream.Readable([options])
可以传入的参数:
highWaterMark从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值:16384(16kb), 对象模式的流默认为16。encoding如果指定了,则使用指定的字符编码将 buffer 解码成字符串。 默认值:null。objectMode流是否可以是一个对象流。 也就是说stream.read(n)会返回对象而不是Buffer。 默认值:false。emitClose流被销毁后是否应该触发'close'。默认值:true。read对stream._read()方法的实现。destroy对stream._destroy()方法的实现。autoDestroy流是否应在结束后自动调用.destroy()。默认值:false。
const { Readable, Writable } = require('stream');
class SerialCharStream extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.num = 0;
}
_read(size) {
console.log('size is: ' + size);
while (this.max > 0) {
console.log(this.push(String.fromCharCode('a'.charCodeAt(0) + this.num % 26)));
this.num++;
this.max--;
size--;
if (size < 1) {
return;
}
}
this.push(null);
}
}
const rns = new SerialCharStream(100, {
highWaterMark: 4
});
console.log(rns.read(5));
如上,就自定义了一个可读流,关键就是构造函数,以及 _read(size) 方法
更多详情,可以参阅 流的缓冲
自定义可写流
new stream.Writable([options])
可以传入的参数:
highWaterMark当调用stream.write()开始返回false时的缓冲大小。 默认为16384(16kb), 对象模式的流默认为16。decodeStrings是否把传入stream._write()的string编码为Buffer,使用的字符编码为调用stream.write()时指定的。 不转换其他类型的数据(即不将Buffer解码为string)。 设置为false将会阻止转换string。 默认值:true。defaultEncoding当stream.write()的参数没有指定字符编码时默认的字符编码。默认值:'utf8'。objectMode是否可以调用stream.write(anyObj)。 一旦设为true,则除了字符串、Buffer或Uint8Array,还可以写入流实现支持的其他 JavaScript 值。默认值:false。emitClose流被销毁后是否触发'close'事件。默认值:true。write对stream._write()方法的实现。writev对stream._writev()方法的实现。destroy对stream._destroy()方法的实现。final对stream._final()方法的实现。autoDestroy此流是否应在结束后自动调用.destroy()。默认值:false.
const { Readable, Writable } = require('stream');
class OutputStream extends Writable {
_write(chunk, enc, done) {
console.log('_write is called: ' + chunk.toString().toUpperCase() + ' | time is: ' + process.uptime());
setTimeout(() => {
done();
}, 10000); // 表示我们输出一次需要10秒钟
}
}
var out = new OutputStream({ highWaterMark: 8 });
console.log(out.write('1111'));
console.log(out.writableBuffer);
console.log(out.write('222'));
console.log(out.write('3333'));
console.log(out.writableBuffer);
如上,就自定义了一个可写流,关键就是构造函数,以及 _write(size) 方法
更多详情,可以参阅 流的缓冲
_write(chunk, encoding, callback) 方法
chunk:要写入的Buffer,从传给stream.write()的string转换而来。 如果流的decodeStrings选项为false或者流在对象模式下运行,则数据块将不会被转换,并且将是传给stream.write()的任何内容。encoding:如果chunk是字符串,则指定字符编码。 如果chunk是Buffer或者流处于对象模式,则无视该选项。callback:当数据块被处理完成后的回调函数。这个回调通知可写流:这次的底层输出的调用已经完成了,可以接着下一次的输出了。当出现错误时,也可以传递一个 error 参数。
这个方法是底层输出调用,只应该在可写流内部使用。当缓冲区有数据时,就会调用这个方法去输出到底层输出。当 callback 回调被调用之后,输出流就知道本次的底层输出完成了,如果缓冲区有数据,就会进行下一次的 _write() 调用去输出缓冲区里的数据,如果缓冲区里没有数据了,就会发送 drain 事件,表示底层输出完毕了,可以接着往缓冲区写数据了。
注意:一定要调用了
callback才能通知流:本次输出完毕了。如果没有调用callback,流就会认为还在输出中(有的输出相当耗时,例如网络IO),从而不会调用下一次输出无论是成功完成写入还是写入失败出现错误,都必须调用
callback。 如果调用失败,则callback的第一个参数必须是Error对象。 如果写入成功,则callback的第一个参数为null
在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 当调用 callback 时,流将会触发 'drain'事件。 如果流的实现需要同时处理多个数据块,则应该实现 writable._writev() 方法。
如果在构造函数选项中设置 decodeStrings 属性为 false,则 chunk 会保持原样传入 .write(),它可能是字符串而不是 Buffer。 这是为了实现对某些特定字符串数据编码的支持。 在这种情况下, encoding 参数将指示字符串的字符编码。 否则,可以安全地忽略编码参数。
writable._writev(chunks, callback)
跟 _write(chunk,encoding,callback) 差不多,只不过是可以传入多个缓冲
chunksObject[] 格式,要写入的多个数据块。 每个数据块的格式为{ chunk: ..., encoding: ... }。callback当全部数据块被处理完成后的回调函数。
writable._destroy(err, callback)
_destroy() 方法会被 writable.destroy()调用。 它可以被子类重写,但不能直接调用
writable._final(callback)
callback当结束写入所有剩余数据时的回调函数。
_final() 方法不能直接调用。 它应该由子类实现,且只能通过内部的 Writable 类的方法调用。
该方法会在流关闭之前被调用,且在 callback 被调用后触发 'finish' 事件。 主要用于在流结束之前关闭资源或写入缓冲的数据。
也就是说,流的执行是如下顺序:
- 先调用
writable.end() - 可写流的所有数据都被写入完毕了
- 自动调用
writable._final(callback)里的callback回调 - 触发
finish事件
自定义双工流
双工流同时实现了可读流和可写流,例如 TCP socket 连接。
因为 JavaScript 不支持多重继承,所以使用 stream.Duplex 类实现双工流(而不是使用 stream.Readable 类和 stream.Writable 类)。
stream.Duplex 类的原型继承自 stream.Readable 和 stream.Writable,但是 instanceof 对这两个基础类都可用,因为重写了 stream.Writable 的 Symbol.hasInstance。
自定义的双工流必须调用 new stream.Duplex([options]) 构造函数并实现 readable._read() 和 writable._write() 方法。
new stream.Duplex(options)
allowHalfOpen如果设为false,则当可读端结束时,可写端也会自动结束。 默认为true。readableObjectMode设置流的可读端为objectMode。 如果objectMode为true,则不起作用。 默认为false。writableObjectMode设置流的可写端为objectMode。 如果objectMode为true,则不起作用。 默认为false。readableHighWaterMark设置流的可读端的highWaterMark。 如果已经设置了highWaterMark,则不起作用。writableHighWaterMark设置流的可写端的highWaterMark。 如果已经设置了highWaterMark,则不起作用。read对stream._read()方法的实现。write对stream._write()方法的实现。
例如,实现一个可以将输入流输出 2 遍的双工流
const { Duplex } = require('stream');
class DoubleStream extends Duplex {
constructor(options) {
super(options);
this._data = [];
}
_read(size) {
while (this._data.length > 0) {
var str = this._data.shift();
if (!this.push(str)) {
return;
}
}
/**
* 这里表示,这个双工流作为可写流,已经被写完了所有的数据
* 也就是说,双工流已经接收完了所有的数据来源
* 此时 this._data.length = 0 ,this.writableFinished = true
* 表示双工流作为可读流已经被读取完了所有的字节了
* 因此可以 this.push(null) 对双工流的可读流的部分触发 end 事件
*/
if (this.writableFinished) {
this.push(null);
}
setTimeout(this._read.bind(this), 100);
}
_write(chunk, encoding, callback) {
// 这里就是将读取进行双份处理
this._data.push(chunk);
this._data.push(chunk);
callback();
}
}
var myDouble = new DoubleStream();
// 标准输入经过双工流处理,然后输出给标准输出,就像一个常规 UNIX 进程那样
process.stdin
.pipe(myDouble)
.pipe(process.stdout);
// 这里 end 触发表示双工流作为可读流,已经被标准输出读走了所有的数据
myDouble.once('end', () => {
// 这里表示,不断检查标准输出是否已经输出完了:process.stdout.writableLength == 0
// 常规的输出流调用 writable.end() 即可,并且还会触发 finish 事件
// 但是 stdout 比较特殊,无法被关闭,也不会触发 finish 事件,因此使用缓冲区长度来判断
(function checkAndExit() {
if (process.stdout.writableLength == 0) {
process.exit(0);
} else {
setTimeout(checkAndExit, 500);
}
})();
});
_data用来保存内部数据_write的时候,push两遍数据到_data_read的时候,从_data中取数据
输出:
(py3.5) czp@:~/workspace/knowledge-base/demos/node_start$ echo '哈哈爱斯达克交换机卡书法家看哈'| node hello.js
哈哈爱斯达克交换机卡书法家看哈
哈哈爱斯达克交换机卡书法家看哈
注:
这种双工流的逻辑是:
- 先作为可写流从上游接收输出,保存在自己内部的某个数据中
- 然后作为可读流为下游提供数据,从上一步保存的数据中进行输出
上面的代码,实际上有很大的问题,后续修改如下:
const { Duplex } = require('stream');
class DoubleStream extends Duplex {
constructor(options) {
super(options);
this._data = [];
}
_read(size) {
while (this._data.length > 0) {
var str = this._data.shift();
if (!this.push(str)) {
return;
}
}
if (this.writableFinished) {
this.push(null);
// 这里必须 return
return;
}
setTimeout(this._read.bind(this), 100);
}
_write(chunk, encoding, callback) {
// 这里就是将读取进行双份处理
this._data.push(chunk);
this._data.push(chunk);
callback();
}
}
var myDouble = new DoubleStream();
// 标准输入经过双工流处理,然后输出给标准输出,就像一个常规 UNIX 进程那样
process.stdin
.pipe(myDouble)
.pipe(process.stdout);
注意:
if (this.writableFinished) {
this.push(null);
// 这里必须 return
return;
}
setTimeout(this._read.bind(this), 100);
不加 return 的话,就会一直存在 timeOut 事件,导致进程无法退出。(所以我第一版代码里是最后用了 process.exit,那时候没有弄明白真正无法退出的原因,以为是 stdout 的问题)
自定义转换流
转换流是一种双工流,它会对输入做些计算然后输出。 例如 zlib 流和 crypto 流会压缩、加密或解密数据。
输出流的大小、数据块的数量都不一定会和输入流的一致。 例如, Hash 流在输入结束时只会输出一个数据块,而 zlib 流的输出可能比输入大很多或小很多。
继承 stream.Transform 类可用于实现一个转换流。
stream.Transform 类继承自 stream.Duplex,并且实现了自有的 writable._write() 和 readable._read() 方法。 自定义的转换流必须实现 transform._transform() 方法,transform._flush() 方法是可选的。
当使用转换流时,如果可读端的输出没有被消费,则写入流的数据可能会导致可写端被暂停。
注:
也就是说:
- 转换流就是双攻流
- 转换流自己实现了
_write()和_read(),用来控制转换的相关逻辑- 我们只需要实现
transform._transform()方法(和可选的.flush()),转换流就可以实现从上游读取,然后经过转换处理输出给下游的逻辑了- 我们上面的自定义双工流其实就可以算是一种转换流了,只不过是我们自己实现的 read 和 write,转换流这两个方法是内置的,重点是 transform 转换
我们用到的绝大部分双工流都是转换流,实现自定义转换流方法如下:
- 继承 Transform 类
- 实现 _transform() 方法
- 实现 _flush() 方法(可以不实现)
new stream.Transform([options])
options 同时传给 Writable 和 Readable 的构造函数。
transform对stream._transform()的实现。flush对stream._flush()的实现。
实现一个和上面双工流一样的程序,将输入流的数据输出 2 遍,使用转换流的话如下:
const { Transform } = require('stream');
class DoubleStream extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
this.push(chunk);
this.push(chunk);
callback();
}
}
var myDouble = new DoubleStream();
process.stdin
.pipe(myDouble)
.pipe(process.stdout);
输出:
(py3.5) czp@:~/workspace/knowledge-base/demos/node_start$ echo '哈哈爱斯达克交换机卡书法家看哈'| node hello.js
哈哈爱斯达克交换机卡书法家看哈
哈哈爱斯达克交换机卡书法家看哈
finish 和 end 事件
finish 事件来自 stream.Writable 类,end 事件来自 stream.Readable 类。 当调用了 stream.end() 并且 stream._transform() 处理完全部数据块之后,触发 'finish' 事件。
(表示作为可写流的部分,已经将所有的数据都写入了)
当调用了 transform._flush() 中的回调函数并且所有数据已经输出之后,触发 'end' 事件。
(表示作为可读流的分布,所有的数据都被读取完毕了)
transform._flush(callback)
callback:当剩余的数据被 flush 后的回调函数
某些情况下,转换操作可能需要在流的末尾发送一些额外的数据。 例如, zlib 压缩流时会储存一些用于优化输出的内部状态。 当流结束时,这些额外的数据需要被 flush 才算完成压缩。
自定义的转换流的 transform._flush() 方法是可选的。 当没有更多数据要被消费时,就会调用这个方法,但如果是在 'end' 事件被触发之前调用则会发出可读流结束的信号。
在 transform._flush() 的实现中, readable.push() 可能会被调用零次或多次。 当 flush 操作完成时,必须调用 callback 函数。
也就是说,这个方法是用来在转换流的末尾添加数据的,改写上面的例子如下:
const { Transform } = require('stream');
class DoubleStream extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
this.push(chunk);
this.push(chunk);
callback();
}
_flush(callback) {
this.push('嘻嘻嘻了呢\n');
callback();
}
}
var myDouble = new DoubleStream();
process.stdin
.pipe(myDouble)
.pipe(process.stdout);
最后输出了 嘻嘻嘻了呢 加上一个换行符
输出如下:
(py3.5) czp@:~/workspace/knowledge-base/demos/node_start$ echo '哈哈爱斯达克交换机卡书法家看哈'|node hello.js
哈哈爱斯达克交换机卡书法家看哈
哈哈爱斯达克交换机卡书法家看哈
嘻嘻嘻了呢
transform._transform(chunk, encoding, callback)
chunk要转换的Buffer,从传给stream.write()的string转换而来。 如果流的decodeStrings选项为false或者流在对象模式下运行,则数据块将不会被转换,并且将是传给stream.write()的任何内容。encoding如果数据块是一个字符串,则这是编码类型。 如果数据块是一个 buffer,则为特殊值'buffer'。在这种情况下忽略它。callback当chunk处理完成时的回调函数。
所有转换流的实现都必须提供 _transform() 方法来接收输入并生产输出。 transform._transform() 的实现会处理写入的字节,进行一些计算操作,然后使用 readable.push() 输出到可读流。
transform.push() 可能会被调用零次或多次用来从每次输入的数据块产生输出,调用的次数取决需要多少数据来产生输出的结果。
输入的数据块有可能不会产生任何输出。
当前数据被完全消费之后,必须调用 callback 函数。 当处理输入的过程中发生出错时, callback 的第一个参数传入 Error 对象,否则传入 null。 如果 callback 传入了第二个参数,则它会被转发到 readable.push()。 就像下面的例子:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
上面这两个是等价的方法
transform._transform() 不能并行调用。 流使用了队列机制,无论同步或异步的情况下,都必须先调用 callback 之后才能接收下一个数据块。