Node.js Readable流作为Duplex流的数据源
Stream是计算机中数据读写的的一种抽象,基本上每种编程语言都会有它的身影,Node.js也不列外,想要深入理解和使用Node.js,就必须掌握Stream在Node.js下的工作原理。
本篇文章不是为了介绍Node.js下的Stream的工作原理,而是基于大家对Stream有一定的了解,然后向大家介绍一种将Readable流作为Duplex流数据源的实现方式。
Stream概述
虽然不介绍Stream的工作原理,但还是简要描述Node.js下几种流对象的特点,主要有以下4种:
- Writable Stream,可写流
可以往流对象中写入数据,常用于往文件中写入数据,可类比为“游泳池的进水管道”
- Readable Stream,可读流
可以从流对象中读取数据,常用于从文件中读出数据,可类比为“游泳池的出水管道”
- Transform Stream,转换流
可读可写,将输入数据经过转换后输出,可类比为“游泳池灌水时的过滤设备”
- Duplex Stream,双工流
可读可写,但读写相互独立,可类比为“A游泳池的进水管道与B游泳池的出水管道”
Duplex流的应用
通过对Stream的描述,大家对Duplex流的功能应该有大致了解,但什么情况下应该使用Duplex流,并没有明确标准,我个人的理解是:“Duplex流,在某些场景下,可以提供更高层面的抽象”,比如:
把一次HTTP事务的处理过程抽象为一个对象时,Duplex流就非常合适,request -> Duplex Stream Object -> response
之所以举这么个例子,是因为学习Koa时,对其中context对象印象深刻,也因此在自己用Node.js从零实现Server时,尝试模仿这种做法。
模仿Koa
在模仿Koa的过程中,设计context对象继承自Duplex,尝试将req对象作为context的read数据源,res对象作为context的write数据容器,代码如下:
const _ = require('lodash');
const Promise = require('bluebird');
const stream = require('stream');
const Events = require('events');
exports.Context = class Context extends stream.Duplex {
constructor(req, res) {
super();
this._req = req;
this._res = res;
this._init();
}
_init() {
// 用于异步通知
this.emitter = new Events();
this.reqFinished = false;
this._req.on('readable', () => {
this.emitter.emit('req-readable');
});
this._req.on('end', () => {
this.reqFinished = true;
});
this._req.on('close', () => {
this.reqFinished = true;
});
}
// impl for duplex stream read
_read(size) {
if (_.includes(['GET', 'DELETE', 'HEAD', 'OPTIONS'], this._req.method)) {
this.push(null); // push null表示没有更多数据可供读读取
return;
}
let chunk = this._req.read(size); // 尝试从req中读取size大小的数据
if (!chunk) {
// chunk为null,包含两种情况
// 1. 读取速度过快,req中数据准备不及时
// 2. req中已经没有可读取数据
if (this.reqFinished) {
return this.push(null);
}
return this.emitter.once('req-readable', () => {
this._read(size);
});
}
this.push(chunk); // 从req中读取数据成功,返回给外层读取者
if (chunk.length < size) { // req中数据全部读取完成,传达给外层读取者
return this.push(null);
}
}
// impl for duplex stream write
_write() {}
};
以上代码未考虑异常情况,比如req对象的的error事件处理
如果对Node.js的流对象不够了解,要理解上述代码也许还会有障碍,建议大家可先了解一下Node.js Stream对象,推荐两篇不错的文章:
上面两篇文章对Node.js Stream讲的很透测,希望有机会自己也能写出这样的文章,加油~