[date: 2018-05-03 23:31] [visits: 22]

Node.js Stream pipe细节

流的pipe操作主要用于readable流向writable流传递数据,之前只是从经验角度去猜测pipe的工作原理,但遇到问题时才发现不靠谱的猜测并不能为解决问题带来帮助,所以花些时间了解一下pipe的工作原理,本文主要从源代码角度解释pipe的几个内部细节。

支持多个pipe下游

通过阅读源代码,发现同一个readable流支持多个pipe下游,pipe函数的第一段逻辑就是在自己的_readableState中维护所有pipe下游的相关信息,主要包括pipes与pipesCount,当pipesCount为1时,pipes直接指向下游流对象,当pipesCount大于1时,pipes是所有pipe下游流对象数组,相关代码:

var state = this._readableState;

switch (state.pipesCount) {
    case 0:
        state.pipes = dest;
        break;
    case 1:
        state.pipes = [state.pipes, dest];
        break;
    default:
        state.pipes.push(dest);
        break;
}
state.pipesCount += 1;

测试多个下游,新建test.js:

const fs = require('fs');
const Readable = require('stream').Readable;

let src = new Readable();
src.push('some data');
src.push(null);

src.pipe(fs.createWriteStream('./dest1'));
src.pipe(fs.createWriteStream('./dest2'));

执行node test.js会发现目录下多出dest1与dest2文件,内容都是'some data',即证实一个readable流支持pipe给多个writable流。

选择性触发下游end事件

通过pipe,数据是自发的流动不需要人为干涉,当上游数据传输完成时其end事件触发,下游的end事件是否触发由pipe方法的第二个参数决定,相关代码:

var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
    dest !== process.stdout &&
    dest !== process.stderr;

var endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
    process.nextTick(endFn);
else
    src.once('end', endFn);

dest.on('unpipe', onunpipe);

function onend() {
    debug('onend');
    dest.end();
}

pipeOpts就是pipe方法的第二个参数,pipeOpts.end默认为true,表明自动触发下游的end事件,但通过代码发现针对两个特殊的下游对象process.stdout与process.stderr,pipe会忽略pipeOpts.end参数而选择不触发下游end事件。

自动与手动unpipe

pipe是把上游与下游通过管道连起来传输数据,那么unpipe就是从两者之间切断管道,具体做法是在pipe内注册下游的unpipe事件处理函数,当用户调用unpipe时触发下游的unpipe事件,从而移除所有之前在pipe中绑定到上下游的事件,相关代码:

dest.on('unpipe', onunpipe);
function onunpipe(readable, unpipeInfo) {
    debug('onunpipe');
    if (readable === src) {
        if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
            unpipeInfo.hasUnpiped = true;
            cleanup();
        }
    }
}

function cleanup() {
    debug('cleanup');

    dest.removeListener('close', onclose);
    dest.removeListener('finish', onfinish);
    dest.removeListener('drain', ondrain);
    dest.removeListener('error', onerror);
    dest.removeListener('unpipe', onunpipe);
    src.removeListener('end', onend);
    src.removeListener('end', unpipe);
    src.removeListener('data', ondata);

    cleanedUp = true;

    if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain))
        ondrain();
}

数据传递与背压反馈

数据传递过程比较简单,主要是监听上游的ondata事件获取数据,再将数据写入下游,这个过程是自动触发的,但上下游的数据处理速度也许有所差异,比如上游的读取速度是10M/s,而下游的处理速度是5M/s,这种情况会通过背压反馈让上游的速度相应调整到5M/s左右。

针对writable流,数据处理不过来时会先写入一个内部缓冲buffer,当缓冲buffer中的数据大小超过_writableState.highWaterMark时,write操作会返回false以提醒调用者数据处理不过来需要暂停数据写入,等到缓冲数据处理完成时,会触发drain事件通知调用者继续写入。

如果不顾write操作返回false而单方面一直写入,则缓冲区大小会一直增加,相应的应用占用内存也会激增,最终可能导致应用OOM异常退出。

针对writable流的这个特性,pipe操作会动态调整上游的读取速度以适应下游所需,当发现下游write返回false时,就暂停上游的数据读取直到触发drain事件,相关代码:

var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);

// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);

function ondata(chunk) {
    debug('ondata');
    increasedAwaitDrain = false;
    var ret = dest.write(chunk);
    if (false === ret && !increasedAwaitDrain) {
        // If the user unpiped during `dest.write()`, it is possible
        // to get stuck in a permanently paused state if that write
        // also returned false.
        // => Check whether `dest` is still a piping destination.
        if (((state.pipesCount === 1 && state.pipes === dest) ||
                (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
            !cleanedUp) {
            debug('false write response, pause', src._readableState.awaitDrain);
            src._readableState.awaitDrain++;
            increasedAwaitDrain = true;
        }
        src.pause();
    }
}

function pipeOnDrain(src) {
    return function() {
        var state = src._readableState;
        debug('pipeOnDrain', state.awaitDrain);
        if (state.awaitDrain)
            state.awaitDrain--;
        if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
            state.flowing = true;
            flow(src);
        }
    };
}

function flow(stream) {
    const state = stream._readableState;
    debug('flow', state.flowing);
    while (state.flowing && stream.read() !== null);
}

异常处理

异常处理主要是处理下游的error、close、finish事件,发生上述事件时及时unpipe即可。

总结

学习pipe的原因是因为在实践过程中不确定pipe下游end事件的触发时机与条件,主要原因是不了解pipe以及文档看的不仔细,刚好借此机会对流有了进一步的认识,收获不少。

同时由于不单单是自己看一遍,还用写博客的方式总结了一番,理解的更细致,印象也更为深刻。

写博客的这两个月,越发觉得写博客是一个自我提升非常有效的方式,加油!

附:Github完整代码