本文内容
本文主要记录 node 中可读流的相关内容
什么是可读流
可读流就是一个能够产生字节序列,并且被我们读取的流。
所有可写流都实现了 stream.Readable 类定义的接口。
重点就是:可读流会产生字节序列被我们读取。这个字节序列可能是从别处来的,也可能是可读流自己生成的,总之可读流可以产生字节序列。
两种读取模式
可读流有两种读取模式:流动模式(flowing)或暂停模式(paused)
- 在流动模式中,数据自动从底层系统读取,并通过
EventEmitter接口的事件尽可能快地被提供给应用程序。 - 在暂停模式中,必须显式调用
stream.read()读取数据块。
也就是说,流动模式中,可读流自动的进行 read 调用,并且将读取到的数据,通过 data 事件发送出来。我们的程序就监控 data 事件即可获取到读取的内容。而暂停模式中我们要在触发 readable 事件时手动读取,我们可以对读取做出更多的控制(例如控制读取的字节数,对于一些二进制协议可能很有必要)
当我们创建可读流时,流最开始都处于暂停模式,也就是说,不会自动开始读取。
我们可以通过以下方式切换到流动模式:
- 添加
'data'事件句柄,也就是调用readable.on('data',callback)添加了对data的监听后,就会切换为流动模式。 - 调用
stream.resume()方法。 - 调用
stream.pipe()方法将数据发送到可写流
可以通过以下方式切换回暂停模式:
- 如果没有管道目标,则调用
stream.pause() - 如果有管道目标,则移除所有管道目标。调用
stream.unpipe()可以移除多个管道目标
只有提供了消费或忽略数据的机制后,可读流才会产生数据。 如果消费的机制被禁用或移除,则可读流会停止产生数据。
为了向后兼容,移除 'data' 事件句柄不会自动地暂停流。 如果有管道目标,一旦目标变为 drain 状态并请求接收数据时,则调用 stream.pause() 也不能保证流会保持暂停模式(总之就是:移除管道目标,然后调用了 pause 才能保证流暂停了)
如果可读流切换到流动模式,且没有可用的消费者来处理数据,则数据将会丢失。 例如,当调用 readable.resume() 时,没有监听 'data' 事件或 'data' 事件句柄已移除。
添加 'readable' 事件句柄会使流自动停止流动,并通过 readable.read() 消费数据。 如果 'readable' 事件句柄被移除,且存在 'data' 事件句柄,则流会再次开始流动。
选择一种 API 风格
可读流的 API 有很多个版本混合在其中,混合使用 on('data')、 on('readable')、 pipe() 或异步迭代器,会导致不明确的行为。所以我们在使用的时候应该选择一种统一的 API
对于大多数用户,建议使用 readable.pipe(),因为它是消费流数据最简单的方式。 如果开发者需要精细地控制数据的传递与产生,可以使用 EventEmitter、 readable.on('readable')/readable.read() 或 readable.pause()/readable.resume()。
close 事件
当流或其底层资源(比如文件描述符)被关闭时触发 'close' 事件。 该事件表明不会再触发其他事件,也不会再发生操作。
如果使用 emitClose 选项创建可读流,那么流在最后一定会触发 close 事件。
这个事件是流最后一次触发的事件了,触发后等于说流就再也不可用了,大多数是文件描述符被关闭了。
这个事件不一定会发生,取决于流的实现者,所以我们用这个事件比较少
data事件
当流将数据块传送给消费者后触发。 当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
(实际上就是当流内部调用了 readable.push(chunk) 的时候就会触发 data 事件)
将 'data' 事件监听器附加到尚未显式暂停的流将会使流切换为流动模式。 数据将会在可用时立即传递。
如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer。例如:
// 默认情况接收的是 Buffer 也就是字节数组
const readable = getReadableStreamSomehow();
// 如果调用了下面这句,那就会传入字符串,相当于是帮你编码了
// readable.setEncoding('utf8');
readable.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 个字节的数据`);
});
end 事件
当可读流没有更多可用数据,并且已读取所有可用数据时发出。
(也就是流已经完全读完了,实际来说是流内部调用了 readable.push(null))
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 个字节的数据`);
});
readable.on('end', () => {
console.log('已没有数据');
});
error 事件
'error' 事件可能随时由 Readable 实现触发。 通常,如果底层的流由于底层内部的故障而无法生成数据,或者流的实现尝试推送无效的数据块,则可能会发生这种情况。
监听器回调将会传入一个 Error 对象。紧随其后的事件通常是 close 事件,但是不能保证这一点。
(实际就是说,底层去拿数据的时候出错了,例如一个读 socket 的可读流,忽然 tcp 连接断了)
pause 事件
可读流暂停的时候会发出。通常是我们调用 readable.pause() 或者是流收到了背压的影响:可读流往可写流里写入时,可写流缓冲满了,此时可读流就会主动调用 pause() ,直到发生 drain 事件,再重新启动 resume()。一般没啥用这个事件,调试的时候可能会有点作用。
readable 事件
表示流目前处于可读状态,这个事件代表流的状态的更新。一般利用 readable 事件读取流的方式如下:
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// 有数据可读取。
let data;
while (data = this.read()) {
console.log(data);
}
});
有几个要点:
- 暂停模式触发
readable事件后,我们必须要一直循环读取数据,一直读到null为止,所以这种处理模式中一定有一个类似于while(data !== null){... data = stream.read()}的循环结构,保证触发readable事件后将数据读取完毕 - 当到达流数据的尽头时,
'readable'事件也会触发,但是在'end'事件之前触发。 'readable'事件表明流有新的动态:要么有新的数据,要么到达流的尽头。 对于前者,stream.read()会返回可用的数据。 对于后者,stream.read()会返回null。
也就是说在 readable 事件中,我们一般要读到 null 时才从回调方法中退出来。此时代表着流的可读缓冲区里已经没数据了,需要等待流的底层读取获取到更多数据到缓冲。等待流的缓冲中又有数据后,就会再次触发 readable 事件。
下面是我自己写的一个例子,循环读取 10、20、30 个字节:
const fs = require('fs')
const rs = fs.createReadStream(__filename);
var readerCall = function () {
var stage = 0;
return function () {
var data = this.read(nextReadSize(stage));
while (data !== null) {
stage++;
console.log('File size:', data.length);
data = this.read(nextReadSize(stage));
}
}
}
rs.on('readable', readerCall());
function nextReadSize(stage) {
return ((stage % 3) + 1) * 10;
}
// 10 20 30 的循环读取,使用闭包来保存读取状态
当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
resume 事件
当调用 resume() 或者是流收到背压,下游传来了 drain 事件,流就会重新被唤醒
readable.destroy([error])
error将会在'error'事件中的回调里作为那个error参数- 返回:
this
销毁流。 可选地触发 'error' 事件,并触发 'close' 事件(除非将 emitClose 设置为 false)。 在此调用之后,可读流将会释放所有内部的资源,并且将会忽略对 push() 的后续调用。 实现者不应该重写此方法,而应该实现 readable._destroy()
readable.destroyed
在调用 readable.destroy()之后为 true。
readable.isPaused()
readable.isPaused() 方法返回可读流当前的操作状态。 主要用于 readable.pipe() 底层的机制。 大多数情况下无需直接使用该方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()
readable.pause() 方法使流动模式的流停止触发 'data' 事件,并切换出流动模式(切成暂停模式)。 任何可用的数据都会保留在内部缓存中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 字节的数据`);
readable.pause();
console.log('暂停一秒');
setTimeout(() => {
console.log('数据重新开始流动');
readable.resume();
}, 1000);
});
如果存在 'readable' 事件监听器,则 readable.pause() 方法不会造成任何效果
readable.pipe(destination[, options])
destination数据写入的目标(可写流)options管道选项。end当读取器结束时终止写入器。默认值:true,也就是我们可以传入 {end: false}
- 返回目标可写流,如果是
Duplex流或Transform流则可以形成管道链(也就是可以接着再调用pipe形成一种流水线一样的处理方式)
readable.pipe() 方法绑定可写流到可读流,将可读流自动切换到流动模式,并将可读流的所有数据推送到绑定的可写流。 数据流会被自动管理,所以即使可读流更快,目标可写流也不会超负荷。
注:本方法最主要的作用就是,可以自动的进行管道连接,并且自动平衡两个流的速度,就类似于 UNIX 的管道一样
例子,将可读流的所有数据通过管道推送到 file.txt 文件:
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 的所有数据都推送到 'file.txt'。
readable.pipe(writable);
可以在单个可读流上绑定多个可写流。(例如读取多个文件进行单词技术)
readable.pipe() 会返回目标流的引用,这样就可以对流进行链式地管道操作:
const fs = require('fs');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
默认情况下,当来源可读流触发 'end' 事件时,目标可写流也会调用 stream.end() 结束写入。 若要禁用这种默认行为, end 选项应设为 false,这样目标流就会保持打开:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('结束');
});
如果可读流在处理期间发送错误,则可写流目标不会自动关闭。 如果发生错误,则需要手动关闭每个流以防止内存泄漏。
process.stderr 和 process.stdout 可写流在 Node.js 进程退出之前永远不会关闭,无论指定的选项如何。
readable.read([size])
size要读取的数据的字节数- 返回: string | Buffer | null | any
从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。 默认情况下, readable.read() 返回的数据是 Buffer 对象,除非使用 readable.setEncoding() 指定字符编码或流处于对象模式。
可选的 size 参数指定要读取的特定字节数。 如果无法读取 size 个字节,并且流还没有结束的话,则会返回 null,如果此时流已经结束了,那么将会把剩下的数据都读出来。
如果没有指定 size 参数,则返回内部缓冲中的所有数据。
readable.read() 应该只对处于暂停模式的可读流调用。 在流动模式中, readable.read() 会自动调用直到内部缓冲的数据完全耗尽。
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
console.log(`接收到 ${chunk.length} 字节的数据`);
}
});
使用 readable.read() 处理数据时, while 循环是必需的。 只有在 readable.read() 返回 null 之后,才会触发 'readable'
readable.readable
如果可以安全地调用 readable.read(),则为 true
readable.readableEncoding
获取用于给定可读流的 encoding 属性。 可以使用 readable.setEncoding() 方法设置 encoding 属性
readable.readableEnded
当 'end' 事件被触发后变为 true
readable.readableHighWaterMark
返回构造可读流时传入的 highWaterMark 的值
readable.readableLength
此属性包含准备读取的队列中的字节数(或对象数)。 该值提供有关 highWaterMark 状态的内省数据。
说白了,返回的是缓冲中当前的字节数。
readable.setEncoding(encoding)
encoding字符编码- 返回: this
对可读流设置字符编码。
默认情况下没有设置字符编码,流数据返回的是 Buffer 对象。 如果设置了字符编码,则流数据返回指定编码的字符串。 例如,调用 readable.setEncoding('utf-8') 会将数据解析为 UTF-8 数据,并返回字符串,调用 readable.setEncoding('hex') 则会将数据编码成十六进制字符串。
可读流将会正确地处理通过流传递的多字节字符,否则如果简单地从流中作为 Buffer 对象拉出,则会被不正确地解码。(内部会缓冲被截断的数据,我们在读取时很可能不是一个完整的字符,设置编码后会自动处理这种情况)
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('读取到 %d 个字符的字符串数据', chunk.length);
});
readable.unpipe([destination])
destination要移除管道的可写流- 返回: this
readable.unpipe() 方法解绑之前使用 stream.pipe() 方法绑定的可写流。
注:
如果没有指定
destination, 则解绑所有管道如果指定了
destination, 但它没有建立管道,则不起作用