影子的知识库

影子的知识库

  • 知识库
  • GitHub

›stream系列

JVM系列

  • JVM内存区域
  • 对象创建-布局-访问
  • 内存溢出实战
  • 内存区域回收
  • 四大引用
  • 垃圾回收算法
  • HotSpot回收算法细节

Java系列

  • java注解
  • springboot请求参数绑定
  • springboot请求参数校验框架
  • YAML语法
  • 动态代理
  • classpath和java命令
  • springboot-aop编程
  • springboot统一异常处理
  • springboot数据库和事务
  • springboot拦截器
  • springboot中的web配置
  • docker的简单开发
  • springboot自动配置
  • 数据库的隔离级别
  • springboot监控
  • java类加载
  • java-agent的相关内容
  • 类加载器详解
  • java的SecurityManager
  • maven学习

Node

    JS 基础

    • 语法基础和数据类型
    • 数据类型转换
    • 语句 表达式 运算符
    • 变量与对象
    • 函数
    • 数据处理
    • 常用 API
    • 重点知识

    ES6

    • 块级作用域
    • 字符串和正则表达式
    • 函数
    • 对象
    • Symbol
    • Set和Map
    • 迭代器和生成器
    • 类
    • 数组
    • Promise

    Node 基础

    • 模块系统
    • package.json
    • 内置对象
    • npm脚本的使用
    • Buffer
    • Stream
    • 事件循环机制
    • 示例代码

    stream系列

    • 流的缓冲
    • 可读流
    • 可写流
    • 双工流和转换流
    • 自定义流

后期计划

  • 学习计划
  • 专题研究计划
Edit

本文内容

本文记录一些关于如何自定义流的内容

如何自定义流

声明一个新的 JavaScript 类,该类继承了四个基本流类之一(stream.Writeable、 stream.Readable、 stream.Duplex 或 stream.Transform),并确保调用了相应的父类构造函数:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({
      highWaterMark,
      autoDestroy: true,
      emitClose: true
    });
    // ...
  }
}

当继承流时,在传入基本构造函数之前,务必清楚使用者可以且应该提供哪些选项。 例如,如果实现需要 autoDestroy 和 emitClose 选项,则不允许使用者覆盖这些选项。 应明确要传入的选项,而不是隐式地传入所有选项。

自定义的流需要实现以下方法:

用例继承类需要实现的方法
可读流Readable_read()
可写流Writable_write()、_writev()、_final()
可读可写Duplex_read()、_write()、_writev()、_final()
对写入的数据进行操作,然后读取结果Transform_transform()、_flush()、_final()

简单实现

对于简单的案例,构造流可以不依赖继承。 直接创建 stream.Writable、 stream.Readable、 stream.Duplex 或 stream.Transform 的实例,并传入对应的方法作为构造函数选项。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

注:

也就是说,实现自定义流有 2 种方式:

  • 继承方式
    • 继承要实现的 4 种流之一
    • 自定义流的构造方法中调用基类的构造方法
    • 自定义流实现内部必须的方法,例如自定义可读流,需要实现 _read(size) 方法
  • 新建对象方式
    • new 基本流
    • 传入参数对象中包含 write、 read 等方法,用来覆盖 _write 和 _read 方法。具体有哪些方法,请参考上面的列表

这两种方案,下面那种稍微简单一点,但是可配置性较差。上面那种方案,我们可以配置更多的自定义流的属性和行为,更加灵活

自定义可读流

new stream.Readable([options])

可以传入的参数:

  • highWaterMark 从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384 (16kb), 对象模式的流默认为 16。
  • encoding 如果指定了,则使用指定的字符编码将 buffer 解码成字符串。 默认值: null。
  • objectMode 流是否可以是一个对象流。 也就是说 stream.read(n) 会返回对象而不是 Buffer。 默认值: false。
  • emitClose 流被销毁后是否应该触发 'close'。默认值: true。
  • read 对 stream._read() 方法的实现。
  • destroy 对 stream._destroy() 方法的实现。
  • autoDestroy 流是否应在结束后自动调用 .destroy()。默认值: false。
const { Readable, Writable } = require('stream');
class SerialCharStream extends Readable {
    constructor(max, options) {
        super(options);
        this.max = max;
        this.num = 0;
    }
    _read(size) {
        console.log('size is: ' + size);
        while (this.max > 0) {
            console.log(this.push(String.fromCharCode('a'.charCodeAt(0) + this.num % 26)));
            this.num++;
            this.max--;
            size--;
            if (size < 1) {
                return;
            }
        }
        this.push(null);
    }
}
const rns = new SerialCharStream(100, {
    highWaterMark: 4
});
console.log(rns.read(5));

如上,就自定义了一个可读流,关键就是构造函数,以及 _read(size) 方法

更多详情,可以参阅 流的缓冲

自定义可写流

new stream.Writable([options])

可以传入的参数:

  • highWaterMark 当调用 stream.write() 开始返回 false 时的缓冲大小。 默认为 16384 (16kb), 对象模式的流默认为 16。
  • decodeStrings 是否把传入 stream._write()的 string 编码为 Buffer,使用的字符编码为调用 stream.write()时指定的。 不转换其他类型的数据(即不将 Buffer 解码为 string)。 设置为 false 将会阻止转换 string。 默认值: true。
  • defaultEncoding 当 stream.write()的参数没有指定字符编码时默认的字符编码。默认值: 'utf8'。
  • objectMode 是否可以调用 stream.write(anyObj)。 一旦设为 true,则除了字符串、 Buffer 或 Uint8Array,还可以写入流实现支持的其他 JavaScript 值。默认值: false。
  • emitClose 流被销毁后是否触发 'close' 事件。默认值: true。
  • write 对 stream._write() 方法的实现。
  • writev 对 stream._writev() 方法的实现。
  • destroy 对 stream._destroy()方法的实现。
  • final 对 stream._final()方法的实现。
  • autoDestroy 此流是否应在结束后自动调用 .destroy()。默认值: false.
const { Readable, Writable } = require('stream');
class OutputStream extends Writable {
    _write(chunk, enc, done) {
        console.log('_write is called: ' + chunk.toString().toUpperCase() + ' | time is: ' + process.uptime());
        setTimeout(() => {
            done();
        }, 10000); // 表示我们输出一次需要10秒钟
    }
}
var out = new OutputStream({ highWaterMark: 8 });
console.log(out.write('1111'));
console.log(out.writableBuffer);
console.log(out.write('222'));
console.log(out.write('3333'));
console.log(out.writableBuffer);

如上,就自定义了一个可写流,关键就是构造函数,以及 _write(size) 方法

更多详情,可以参阅 流的缓冲

_write(chunk, encoding, callback) 方法

  • chunk:要写入的 Buffer,从传给 stream.write() 的 string 转换而来。 如果流的 decodeStrings 选项为 false 或者流在对象模式下运行,则数据块将不会被转换,并且将是传给 stream.write() 的任何内容。
  • encoding:如果 chunk 是字符串,则指定字符编码。 如果 chunk 是 Buffer 或者流处于对象模式,则无视该选项。
  • callback:当数据块被处理完成后的回调函数。这个回调通知可写流:这次的底层输出的调用已经完成了,可以接着下一次的输出了。当出现错误时,也可以传递一个 error 参数。

这个方法是底层输出调用,只应该在可写流内部使用。当缓冲区有数据时,就会调用这个方法去输出到底层输出。当 callback 回调被调用之后,输出流就知道本次的底层输出完成了,如果缓冲区有数据,就会进行下一次的 _write() 调用去输出缓冲区里的数据,如果缓冲区里没有数据了,就会发送 drain 事件,表示底层输出完毕了,可以接着往缓冲区写数据了。

注意:一定要调用了 callback 才能通知流:本次输出完毕了。如果没有调用 callback,流就会认为还在输出中(有的输出相当耗时,例如网络IO),从而不会调用下一次输出

无论是成功完成写入还是写入失败出现错误,都必须调用 callback。 如果调用失败,则 callback 的第一个参数必须是 Error 对象。 如果写入成功,则 callback 的第一个参数为 null

在 writable._write() 被调用之后且 callback 被调用之前,所有对 writable.write() 的调用都会把要写入的数据缓冲起来。 当调用 callback 时,流将会触发 'drain'事件。 如果流的实现需要同时处理多个数据块,则应该实现 writable._writev() 方法。

如果在构造函数选项中设置 decodeStrings 属性为 false,则 chunk 会保持原样传入 .write(),它可能是字符串而不是 Buffer。 这是为了实现对某些特定字符串数据编码的支持。 在这种情况下, encoding 参数将指示字符串的字符编码。 否则,可以安全地忽略编码参数。

writable._writev(chunks, callback)

跟 _write(chunk,encoding,callback) 差不多,只不过是可以传入多个缓冲

  • chunks Object[] 格式,要写入的多个数据块。 每个数据块的格式为{ chunk: ..., encoding: ... }。
  • callback 当全部数据块被处理完成后的回调函数。

writable._destroy(err, callback)

_destroy() 方法会被 writable.destroy()调用。 它可以被子类重写,但不能直接调用

writable._final(callback)

  • callback 当结束写入所有剩余数据时的回调函数。

_final() 方法不能直接调用。 它应该由子类实现,且只能通过内部的 Writable 类的方法调用。

该方法会在流关闭之前被调用,且在 callback 被调用后触发 'finish' 事件。 主要用于在流结束之前关闭资源或写入缓冲的数据。

也就是说,流的执行是如下顺序:

  • 先调用 writable.end()
  • 可写流的所有数据都被写入完毕了
  • 自动调用 writable._final(callback) 里的 callback 回调
  • 触发 finish 事件

自定义双工流

双工流同时实现了可读流和可写流,例如 TCP socket 连接。

因为 JavaScript 不支持多重继承,所以使用 stream.Duplex 类实现双工流(而不是使用 stream.Readable 类和 stream.Writable 类)。

stream.Duplex 类的原型继承自 stream.Readable 和 stream.Writable,但是 instanceof 对这两个基础类都可用,因为重写了 stream.Writable 的 Symbol.hasInstance。

自定义的双工流必须调用 new stream.Duplex([options]) 构造函数并实现 readable._read() 和 writable._write() 方法。

new stream.Duplex(options)

  • allowHalfOpen 如果设为 false,则当可读端结束时,可写端也会自动结束。 默认为 true。
  • readableObjectMode 设置流的可读端为 objectMode。 如果 objectMode 为 true,则不起作用。 默认为 false。
  • writableObjectMode 设置流的可写端为 objectMode。 如果 objectMode 为 true,则不起作用。 默认为 false。
  • readableHighWaterMark 设置流的可读端的 highWaterMark。 如果已经设置了 highWaterMark,则不起作用。
  • writableHighWaterMark 设置流的可写端的 highWaterMark。 如果已经设置了 highWaterMark,则不起作用。
  • read 对 stream._read() 方法的实现。
  • write 对 stream._write() 方法的实现。

例如,实现一个可以将输入流输出 2 遍的双工流

const { Duplex } = require('stream');
class DoubleStream extends Duplex {
    constructor(options) {
        super(options);
        this._data = [];
    }

    _read(size) {
        while (this._data.length > 0) {
            var str = this._data.shift();
            if (!this.push(str)) {
                return;
            }
        }
        /**
         * 这里表示,这个双工流作为可写流,已经被写完了所有的数据
         * 也就是说,双工流已经接收完了所有的数据来源
         * 此时 this._data.length = 0 ,this.writableFinished = true
         * 表示双工流作为可读流已经被读取完了所有的字节了
         * 因此可以 this.push(null)  对双工流的可读流的部分触发 end 事件
         */
        if (this.writableFinished) {
            this.push(null);
        }
        setTimeout(this._read.bind(this), 100);
    }

    _write(chunk, encoding, callback) {
        // 这里就是将读取进行双份处理
        this._data.push(chunk);
        this._data.push(chunk);
        callback();
    }
}
var myDouble = new DoubleStream();
// 标准输入经过双工流处理,然后输出给标准输出,就像一个常规 UNIX 进程那样
process.stdin
    .pipe(myDouble)
    .pipe(process.stdout);
// 这里 end 触发表示双工流作为可读流,已经被标准输出读走了所有的数据
myDouble.once('end', () => {
    // 这里表示,不断检查标准输出是否已经输出完了:process.stdout.writableLength == 0
    // 常规的输出流调用 writable.end() 即可,并且还会触发 finish 事件
    // 但是 stdout 比较特殊,无法被关闭,也不会触发 finish 事件,因此使用缓冲区长度来判断
    (function checkAndExit() {
        if (process.stdout.writableLength == 0) {
            process.exit(0);
        } else {
            setTimeout(checkAndExit, 500);
        }
    })();
});


  • _data 用来保存内部数据
  • _write 的时候,push 两遍数据到 _data
  • _read 的时候,从 _data 中取数据

输出:

(py3.5) czp@:~/workspace/knowledge-base/demos/node_start$ echo '哈哈爱斯达克交换机卡书法家看哈'| node hello.js
哈哈爱斯达克交换机卡书法家看哈
哈哈爱斯达克交换机卡书法家看哈

注:

这种双工流的逻辑是:

  • 先作为可写流从上游接收输出,保存在自己内部的某个数据中
  • 然后作为可读流为下游提供数据,从上一步保存的数据中进行输出

上面的代码,实际上有很大的问题,后续修改如下:

const { Duplex } = require('stream');
class DoubleStream extends Duplex {
    constructor(options) {
        super(options);
        this._data = [];
    }
    _read(size) {
        while (this._data.length > 0) {
            var str = this._data.shift();
            if (!this.push(str)) {
                return;
            }
        }
        if (this.writableFinished) {
            this.push(null);
            // 这里必须 return
            return;
        }
        setTimeout(this._read.bind(this), 100);
    }

    _write(chunk, encoding, callback) {
        // 这里就是将读取进行双份处理
        this._data.push(chunk);
        this._data.push(chunk);
        callback();
    }
}
var myDouble = new DoubleStream();
// 标准输入经过双工流处理,然后输出给标准输出,就像一个常规 UNIX 进程那样
process.stdin
    .pipe(myDouble)
    .pipe(process.stdout);

注意:

if (this.writableFinished) {
  this.push(null);
  // 这里必须 return
  return;
}
setTimeout(this._read.bind(this), 100);

不加 return 的话,就会一直存在 timeOut 事件,导致进程无法退出。(所以我第一版代码里是最后用了 process.exit,那时候没有弄明白真正无法退出的原因,以为是 stdout 的问题)

自定义转换流

转换流是一种双工流,它会对输入做些计算然后输出。 例如 zlib 流和 crypto 流会压缩、加密或解密数据。

输出流的大小、数据块的数量都不一定会和输入流的一致。 例如, Hash 流在输入结束时只会输出一个数据块,而 zlib 流的输出可能比输入大很多或小很多。

继承 stream.Transform 类可用于实现一个转换流。

stream.Transform 类继承自 stream.Duplex,并且实现了自有的 writable._write() 和 readable._read() 方法。 自定义的转换流必须实现 transform._transform() 方法,transform._flush() 方法是可选的。

当使用转换流时,如果可读端的输出没有被消费,则写入流的数据可能会导致可写端被暂停。

注:

也就是说:

  • 转换流就是双攻流
  • 转换流自己实现了 _write() 和 _read(),用来控制转换的相关逻辑
  • 我们只需要实现 transform._transform() 方法(和可选的 .flush()),转换流就可以实现从上游读取,然后经过转换处理输出给下游的逻辑了
  • 我们上面的自定义双工流其实就可以算是一种转换流了,只不过是我们自己实现的 read 和 write,转换流这两个方法是内置的,重点是 transform 转换

我们用到的绝大部分双工流都是转换流,实现自定义转换流方法如下:

  • 继承 Transform 类
  • 实现 _transform() 方法
  • 实现 _flush() 方法(可以不实现)

new stream.Transform([options])

options 同时传给 Writable 和 Readable 的构造函数。

  • transform 对 stream._transform() 的实现。
  • flush 对 stream._flush() 的实现。

实现一个和上面双工流一样的程序,将输入流的数据输出 2 遍,使用转换流的话如下:

const { Transform } = require('stream');
class DoubleStream extends Transform {
    constructor(options) {
        super(options);
    }
    _transform(chunk, encoding, callback) {
        this.push(chunk);
        this.push(chunk);
        callback();
    }
}
var myDouble = new DoubleStream();
process.stdin
    .pipe(myDouble)
    .pipe(process.stdout);

输出:

(py3.5) czp@:~/workspace/knowledge-base/demos/node_start$ echo '哈哈爱斯达克交换机卡书法家看哈'| node hello.js
哈哈爱斯达克交换机卡书法家看哈
哈哈爱斯达克交换机卡书法家看哈

finish 和 end 事件

finish 事件来自 stream.Writable 类,end 事件来自 stream.Readable 类。 当调用了 stream.end() 并且 stream._transform() 处理完全部数据块之后,触发 'finish' 事件。

(表示作为可写流的部分,已经将所有的数据都写入了)

当调用了 transform._flush() 中的回调函数并且所有数据已经输出之后,触发 'end' 事件。

(表示作为可读流的分布,所有的数据都被读取完毕了)

transform._flush(callback)

callback:当剩余的数据被 flush 后的回调函数

某些情况下,转换操作可能需要在流的末尾发送一些额外的数据。 例如, zlib 压缩流时会储存一些用于优化输出的内部状态。 当流结束时,这些额外的数据需要被 flush 才算完成压缩。

自定义的转换流的 transform._flush() 方法是可选的。 当没有更多数据要被消费时,就会调用这个方法,但如果是在 'end' 事件被触发之前调用则会发出可读流结束的信号。

在 transform._flush() 的实现中, readable.push() 可能会被调用零次或多次。 当 flush 操作完成时,必须调用 callback 函数。

也就是说,这个方法是用来在转换流的末尾添加数据的,改写上面的例子如下:

const { Transform } = require('stream');
class DoubleStream extends Transform {
    constructor(options) {
        super(options);
    }
    _transform(chunk, encoding, callback) {
        this.push(chunk);
        this.push(chunk);
        callback();
    }
    _flush(callback) {
        this.push('嘻嘻嘻了呢\n');
        callback();
    }
}
var myDouble = new DoubleStream();
process.stdin
    .pipe(myDouble)
    .pipe(process.stdout);

最后输出了 嘻嘻嘻了呢 加上一个换行符

输出如下:

(py3.5) czp@:~/workspace/knowledge-base/demos/node_start$ echo '哈哈爱斯达克交换机卡书法家看哈'|node hello.js
哈哈爱斯达克交换机卡书法家看哈
哈哈爱斯达克交换机卡书法家看哈
嘻嘻嘻了呢

transform._transform(chunk, encoding, callback)

  • chunk 要转换的 Buffer,从传给 stream.write() 的 string 转换而来。 如果流的 decodeStrings 选项为 false 或者流在对象模式下运行,则数据块将不会被转换,并且将是传给 stream.write() 的任何内容。
  • encoding 如果数据块是一个字符串,则这是编码类型。 如果数据块是一个 buffer,则为特殊值 'buffer'。在这种情况下忽略它。
  • callback 当 chunk 处理完成时的回调函数。

所有转换流的实现都必须提供 _transform() 方法来接收输入并生产输出。 transform._transform() 的实现会处理写入的字节,进行一些计算操作,然后使用 readable.push() 输出到可读流。

transform.push() 可能会被调用零次或多次用来从每次输入的数据块产生输出,调用的次数取决需要多少数据来产生输出的结果。

输入的数据块有可能不会产生任何输出。

当前数据被完全消费之后,必须调用 callback 函数。 当处理输入的过程中发生出错时, callback 的第一个参数传入 Error 对象,否则传入 null。 如果 callback 传入了第二个参数,则它会被转发到 readable.push()。 就像下面的例子:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
};

上面这两个是等价的方法

transform._transform() 不能并行调用。 流使用了队列机制,无论同步或异步的情况下,都必须先调用 callback 之后才能接收下一个数据块。

Last updated on 11/8/2020
← 双工流和转换流学习计划 →
  • 如何自定义流
    • 简单实现
  • 自定义可读流
    • new stream.Readable([options])
  • 自定义可写流
    • new stream.Writable([options])
    • _write(chunk, encoding, callback) 方法
    • writable._writev(chunks, callback)
    • writable._destroy(err, callback)
    • writable._final(callback)
  • 自定义双工流
    • new stream.Duplex(options)
  • 自定义转换流
    • new stream.Transform([options])
    • finish 和 end 事件
    • transform._flush(callback)
    • transform._transform(chunk, encoding, callback)
影子的知识库
Docs
Getting Started (or other categories)Guides (or other categories)API Reference (or other categories)
Community
User ShowcaseStack OverflowProject ChatTwitter
More
BlogGitHub
Copyright © 2020 Cen ZhiPeng