[date: 2018-03-16 00:46] [visits: 26]

Node.js Readable流作为Duplex流的数据源

Stream是计算机中数据读写的的一种抽象,基本上每种编程语言都会有它的身影,Node.js也不列外,想要深入理解和使用Node.js,就必须掌握Stream在Node.js下的工作原理。

本篇文章不是为了介绍Node.js下的Stream的工作原理,而是基于大家对Stream有一定的了解,然后向大家介绍一种将Readable流作为Duplex流数据源的实现方式。

Stream概述

虽然不介绍Stream的工作原理,但还是简要描述Node.js下几种流对象的特点,主要有以下4种:

可以往流对象中写入数据,常用于往文件中写入数据,可类比为“游泳池的进水管道”

可以从流对象中读取数据,常用于从文件中读出数据,可类比为“游泳池的出水管道”

可读可写,将输入数据经过转换后输出,可类比为“游泳池灌水时的过滤设备”

可读可写,但读写相互独立,可类比为“A游泳池的进水管道与B游泳池的出水管道”

Duplex流的应用

通过对Stream的描述,大家对Duplex流的功能应该有大致了解,但什么情况下应该使用Duplex流,并没有明确标准,我个人的理解是:“Duplex流,在某些场景下,可以提供更高层面的抽象”,比如:

把一次HTTP事务的处理过程抽象为一个对象时,Duplex流就非常合适,request -> Duplex Stream Object -> response

之所以举这么个例子,是因为学习Koa时,对其中context对象印象深刻,也因此在自己用Node.js从零实现Server时,尝试模仿这种做法。

模仿Koa

在模仿Koa的过程中,设计context对象继承自Duplex,尝试将req对象作为context的read数据源,res对象作为context的write数据容器,代码如下:

const _ = require('lodash');
const Promise = require('bluebird');
const stream = require('stream');
const Events = require('events');

exports.Context = class Context extends stream.Duplex {
    constructor(req, res) {
        super();

        this._req = req;
        this._res = res;

        this._init();
    }

    _init() {
        // 用于异步通知
        this.emitter = new Events();
        this.reqFinished = false;

        this._req.on('readable', () => {
            this.emitter.emit('req-readable');
        });
        this._req.on('end', () => {
            this.reqFinished = true;
        });
        this._req.on('close', () => {
            this.reqFinished = true;
        });
    }

    // impl for duplex stream read
    _read(size) {
        if (_.includes(['GET', 'DELETE', 'HEAD', 'OPTIONS'], this._req.method)) {
            this.push(null); // push null表示没有更多数据可供读读取
            return;
        }

        let chunk = this._req.read(size); // 尝试从req中读取size大小的数据
        if (!chunk) {
            // chunk为null,包含两种情况
            // 1. 读取速度过快,req中数据准备不及时
            // 2. req中已经没有可读取数据
            if (this.reqFinished) {
                return this.push(null);
            }

            return this.emitter.once('req-readable', () => {
                this._read(size);
            });
        }

        this.push(chunk); // 从req中读取数据成功,返回给外层读取者
        if (chunk.length < size) { // req中数据全部读取完成,传达给外层读取者
            return this.push(null);
        }
    }

    // impl for duplex stream write
    _write() {}
};

以上代码未考虑异常情况,比如req对象的的error事件处理

如果对Node.js的流对象不够了解,要理解上述代码也许还会有障碍,建议大家可先了解一下Node.js Stream对象,推荐两篇不错的文章:

上面两篇文章对Node.js Stream讲的很透测,希望有机会自己也能写出这样的文章,加油~