知识共享许可协议
本作品采用知识共享署名-非商业性使用 3.0 未本地化版本许可协议进行许可。

Node.js v0.10.18 手册 & 文档


#

稳定度: 2 - 不稳定

A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter

流是一个抽象接口,被 Node 中的很多对象所实现。比如对一个 HTTP 服务器的请求是一个流,stdout 也是一个流。流是可读、可写或兼具两者的。所有流都是 EventEmitter 的实例。

You can load the Stream base classes by doing require('stream'). There are base classes provided for Readable streams, Writable streams, Duplex streams, and Transform streams.

您可以通过 require('stream') 加载 Stream 基类,其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类。

This document is split up into 3 sections. The first explains the parts of the API that you need to be aware of to use streams in your programs. If you never implement a streaming API yourself, you can stop there.

本文档分为三个章节。第一章节解释了您在您的程序中使用流时需要了解的那部分 API,如果您不打算自己实现一个流式 API,您可以只阅读这一章节。

The second section explains the parts of the API that you need to use if you implement your own custom streams yourself. The API is designed to make this easy for you to do.

第二章节解释了当您自己实现一个流时需要用到的那部分 API,这些 API 是为了方便您这么做而设计的。

The third section goes into more depth about how streams work, including some of the internal mechanisms and functions that you should probably not modify unless you definitely know what you are doing.

第三章节深入讲解了流的工作方式,包括一些内部机制和函数,除非您明确知道您在做什么,否则尽量不要改动它们。

面向流消费者的 API#

Streams can be either Readable, Writable, or both (Duplex).

流可以是可读(Readable)或可写(Writable),或者兼具两者(Duplex,双工)的。

All streams are EventEmitters, but they also have other custom methods and properties depending on whether they are Readable, Writable, or Duplex.

所有流都是 EventEmitter,但它们也具有其它自定义方法和属性,取决于它们是 Readable、Writable 或 Duplex。

If a stream is both Readable and Writable, then it implements all of the methods and events below. So, a Duplex or Transform stream is fully described by this API, though their implementation may be somewhat different.

如果一个流既可读(Readable)也可写(Writable),则它实现了下文所述的所有方法和事件。因此,这些 API 同时也涵盖了 DuplexTransform 流,即便它们的实现可能有点不同。

It is not necessary to implement Stream interfaces in order to consume streams in your programs. If you are implementing streaming interfaces in your own program, please also refer to API for Stream Implementors below.

为了消费流而在您的程序中自己实现 Stream 接口是没有必要的。如果您确实正在您自己的程序中实现流式接口,请同时参考下文面向流实现者的 API

Almost all Node programs, no matter how simple, use Streams in some way. Here is an example of using Streams in a Node program:

几乎所有 Node 程序,无论多简单,都在某种途径用到了流。这里有一个使用流的 Node 程序的例子:

var http = require('http');

var server = http.createServer(function (req, res) {
  // req is an http.IncomingMessage, which is a Readable Stream
  // res is an http.ServerResponse, which is a Writable Stream

var server = http.createServer(function (req, res) {
  // req 为 http.IncomingMessage,是一个可读流(Readable Stream)
  // res 为 http.ServerResponse,是一个可写流(Writable Stream)

  var body = '';
  // we want to get the data as utf8 strings
  // If you don't set an encoding, then you'll get Buffer objects
  req.setEncoding('utf8');

  var body = '';
  // 我们打算以 UTF-8 字符串的形式获取数据
  // 如果您不设置编码,您将得到一个 Buffer 对象
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added
  req.on('data', function (chunk) {
    body += chunk;
  })

  // 一旦监听器被添加,可读流会触发 'data' 事件
  req.on('data', function (chunk) {
    body += chunk;
  })

  // the end event tells you that you have entire body
  req.on('end', function () {
    try {
      var data = JSON.parse(body);
    } catch (er) {
      // uh oh!  bad json!
      res.statusCode = 400;
      return res.end('error: ' + er.message);
    }

  // 'end' 事件表明您已经得到了完整的 body
  req.on('end', function () {
    try {
      var data = JSON.parse(body);
    } catch (er) {
      // uh oh!  bad json!
      res.statusCode = 400;
      return res.end('错误: ' + er.message);
    }

    // write back something interesting to the user:
    res.write(typeof data);
    res.end();
  })
})

    // 向用户回写一些有趣的信息
    res.write(typeof data);
    res.end();
  })
})

server.listen(1337);

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// 错误: Unexpected token o

类: stream.Readable#

The Readable stream interface is the abstraction for a source of data that you are reading from. In other words, data comes out of a Readable stream.

Readable(可读)流接口是对您正在读取的数据的来源的抽象。换言之,数据出自一个 Readable 流。

A Readable stream will not start emitting data until you indicate that you are ready to receive it.

在您表明您就绪接收之前,Readable 流并不会开始发生数据。

Readable streams have two "modes": a flowing mode and a paused mode. When in flowing mode, data is read from the underlying system and provided to your program as fast as possible. In paused mode, you must explicitly call stream.read() to get chunks of data out. Streams start out in paused mode.

Readable 流有两种“模式”:流动模式暂停模式。当处于流动模式时,数据由底层系统读出,并尽可能快地提供给您的程序;当处于暂停模式时,您必须明确地调用 stream.read() 来取出若干数据块。流默认处于暂停模式。

Note: If no data event handlers are attached, and there are no pipe() destinations, and the stream is switched into flowing mode, then data will be lost.

注意:如果没有绑定 data 事件处理器,并且没有 pipe() 目标,同时流被切换到流动模式,那么数据会流失。

You can switch to flowing mode by doing any of the following:

您可以通过下面几种做法切换到流动模式:

You can switch back to paused mode by doing either of the following:

您可以通过下面其中一种做法切换回暂停模式:

  • If there are no pipe destinations, by calling the pause() method.
  • If there are pipe destinations, by removing any 'data' event handlers, and removing all pipe destinations by calling the unpipe() method.

  • 如果没有导流目标,调用 pause() 方法。

  • 如果有导流目标,移除所有 ['data' 事件][] 处理器、调用 unpipe() 方法移除所有导流目标。

Note that, for backwards compatibility reasons, removing 'data' event handlers will not automatically pause the stream. Also, if there are piped destinations, then calling pause() will not guarantee that the stream will remain paused once those destinations drain and ask for more data.

请注意,为了向后兼容考虑,移除 'data' 事件监听器并不会自动暂停流。同样的,当有导流目标时,调用 pause() 并不能保证流在那些目标排空并请求更多数据时维持暂停状态。

Examples of readable streams include:

一些可读流的例子:

事件: 'readable'#

When a chunk of data can be read from the stream, it will emit a 'readable' event.

当一个数据块可以从流中被读出时,它会触发一个 'readable' 事件。

In some cases, listening for a 'readable' event will cause some data to be read into the internal buffer from the underlying system, if it hadn't already.

在某些情况下,假如未准备好,监听一个 'readable' 事件会使得一些数据从底层系统被读出到内部缓冲区中。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // 现在有数据可以读了
})

Once the internal buffer is drained, a readable event will fire again when more data is available.

当内部缓冲区被排空后,一旦更多数据时,一个 readable 事件会被再次触发。

事件: 'data'#

  • chunk Buffer | String The chunk of data.

  • chunk Buffer | String 数据块。

Attaching a data event listener to a stream that has not been explicitly paused will switch the stream into flowing mode. Data will then be passed as soon as it is available.

绑定一个 data 事件监听器到一个未被明确暂停的流会将流切换到流动模式,数据会被尽可能地传递。

If you just want to get all the data out of the stream as fast as possible, this is the best way to do so.

如果您想从流尽快取出所有数据,这是最理想的方式。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('得到了 %d 字节的数据', chunk.length);
})

事件: 'end'#

This event fires when no more data will be provided.

该事件会在没有更多数据能够提供时被触发。

Note that the end event will not fire unless the data is completely consumed. This can be done by switching into flowing mode, or by calling read() repeatedly until you get to the end.

请注意,end 事件在数据被完全消费之前不会被触发。这可通过切换到流动模式,或者在到达末端前不断调用 read() 来实现。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('得到了 %d 字节的数据', chunk.length);
})
readable.on('end', function() {
  console.log('读取完毕。');
});

事件: 'close'#

Emitted when the underlying resource (for example, the backing file descriptor) has been closed. Not all streams will emit this.

当底层数据源(比如,源头的文件描述符)被关闭时触发。并不是所有流都会触发这个事件。

事件: 'error'#

Emitted if there was an error receiving data.

当数据接收时发生错误时触发。

readable.read([size])#

  • size Number Optional argument to specify how much data to read.
  • Return String | Buffer | null

  • size Number 可选参数,指定要读取多少数据。

  • 返回 String | Buffer | null

The read() method pulls some data out of the internal buffer and returns it. If there is no data available, then it will return null.

read() 方法从内部缓冲区中拉取并返回若干数据。当没有更多数据可用时,它会返回 null

If you pass in a size argument, then it will return that many bytes. If size bytes are not available, then it will return null.

若您传入了一个 size 参数,那么它会返回相当字节的数据;当 size 字节不可用时,它则返回 null

If you do not specify a size argument, then it will return all the data in the internal buffer.

若您没有指定 size 参数,那么它会返回内部缓冲区中的所有数据。

This method should only be called in paused mode. In flowing mode, this method is called automatically until the internal buffer is drained.

该方法仅应在暂停模式时被调用。在流动模式中,该方法会被自动调用直到内部缓冲区排空。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('得到了 %d 字节的数据', chunk.length);
  }
});

If this method returns a data chunk, then it will also trigger the emission of a 'data' event.

当该方法返回了一个数据块,它同时也会触发 'data' 事件

readable.setEncoding(encoding)#

  • encoding String The encoding to use.
  • Return: this

  • encoding String 要使用的编码。

  • 返回: this

Call this function to cause the stream to return strings of the specified encoding instead of Buffer objects. For example, if you do readable.setEncoding('utf8'), then the output data will be interpreted as UTF-8 data, and returned as strings. If you do readable.setEncoding('hex'), then the data will be encoded in hexadecimal string format.

调用此函数会使得流返回指定编码的字符串而不是 Buffer 对象。比如,当您 readable.setEncoding('utf8'),那么输出数据会被作为 UTF-8 数据解析,并以字符串返回。如果您 readable.setEncoding('hex'),那么数据会被编码成十六进制字符串格式。

This properly handles multi-byte characters that would otherwise be potentially mangled if you simply pulled the Buffers directly and called buf.toString(encoding) on them. If you want to read the data as strings, always use this method.

该方法能正确处理多字节字符。假如您不这么做,仅仅直接取出 Buffer 并对它们调用 buf.toString(encoding),很可能会导致字节错位。因此如果您打算以字符串读取数据,请总是使用这个方法。

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('得到了 %d 个字符的字符串数据', chunk.length);
})

readable.resume()#

  • Return: this

  • 返回: this

This method will cause the readable stream to resume emitting data events.

该方法让可读流继续触发 data 事件。

This method will switch the stream into flowing mode. If you do not want to consume the data from a stream, but you do want to get to its end event, you can call readable.resume() to open the flow of data.

该方法会将流切换到流动模式。如果您不想从流中消费数据,但您得到它的 end 事件,您可以调用 readable.resume() 来启动数据流。

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
  console.log('到达末端,但并未读取任何东西');
})

readable.pause()#

  • Return: this

  • 返回: this

This method will cause a stream in flowing mode to stop emitting data events, switching out of flowing mode. Any data that becomes available will remain in the internal buffer.

该方法会使一个处于流动模式的流停止触发 data 事件,切换到非流动模式,并让后续可用数据留在内部缓冲区中。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('取得 %d 字节数据', chunk.length);
  readable.pause();
  console.log('接下来 1 秒内不会有数据');
  setTimeout(function() {
    console.log('现在数据会再次开始流动');
    readable.resume();
  }, 1000);
})

readable.pipe(destination, [options])#

  • destination Writable Stream The destination for writing data
  • options Object Pipe options

    • end Boolean End the writer when the reader ends. Default = true
  • destination Writable Stream 写入数据的目标

  • options Object 导流选项
    • end Boolean 在读取者结束时结束写入者。缺省为 true

This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.

该方法从可读流中拉取所有数据,并写入到所提供的目标。该方法能自动控制流量以避免目标被快速读取的可读流所淹没。

Multiple destinations can be piped to safely.

可以导流到多个目标。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 所有来自 readable 的数据会被写入到 'file.txt'
readable.pipe(writable);

This function returns the destination stream, so you can set up pipe chains like so:

该函数返回目标流,因此您可以建立导流链:

var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

For example, emulating the Unix cat command:

例如,模拟 Unix 的 cat 命令:

process.stdin.pipe(process.stdout);

By default end() is called on the destination when the source stream emits end, so that destination is no longer writable. Pass { end: false } as options to keep the destination stream open.

缺省情况下当来源流触发 end 时目标的 end() 会被调用,所以此时 destination 不再可写。传入 { end: false } 作为 options 可以让目标流保持开启状态。

This keeps writer open so that "Goodbye" can be written at the end.

这将让 writer 保持开启,因此最后可以写入 "Goodbye"。

reader.pipe(writer, { end: false });
reader.on('end', function() {
  writer.end('Goodbye\n');
});

Note that process.stderr and process.stdout are never closed until the process exits, regardless of the specified options.

请注意 process.stderrprocess.stdout 在进程结束前都不会被关闭,无论是否指定选项。

readable.unpipe([destination])#

  • destination Writable Stream Optional specific stream to unpipe

  • destination Writable Stream 可选,指定解除导流的流

This method will remove the hooks set up for a previous pipe() call.

该方法会移除之前调用 pipe() 所设定的钩子。

If the destination is not specified, then all pipes are removed.

如果不指定目标,所有导流都会被移除。

If the destination is specified, but no pipe is set up for it, then this is a no-op.

如果指定了目标,但并没有与之建立导流,则什么事都不会发生。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 来自 readable 的所有数据都会被写入 'file.txt',
// 但仅发生在第 1 秒
readable.pipe(writable);
setTimeout(function() {
  console.log('停止写入到 file.txt');
  readable.unpipe(writable);
  console.log('自行关闭文件流');
  writable.end();
}, 1000);

readable.unshift(chunk)#

  • chunk Buffer | String Chunk of data to unshift onto the read queue

  • chunk Buffer | String 要插回读取队列开头的数据块

This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.

该方法在许多场景中都很有用,比如一个流正在被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”回来源,以便流能将它传递给其它消费者。

If you find that you must often call stream.unshift(chunk) in your programs, consider implementing a Transform stream instead. (See API for Stream Implementors, below.)

如果您发现您需要在您的程序中频繁调用 stream.unshift(chunk),请考虑实现一个 Transform 流。(详见下文面向流实现者的 API。)

// 取出以 \n\n 分割的头部并将多余部分 unshift() 回去
// callback 以 (error, header, stream) 形式调用
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  var decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // 找到头部边界
        var split = str.split(/\n\n/);
        header += split.shift();
        var remaining = split.join('\n\n');
        var buf = new Buffer(remaining, 'utf8');
        if (buf.length)
          stream.unshift(buf);
        stream.removeListener('error', callback);
        stream.removeListener('readable', onReadable);
        // 现在可以从流中读取消息的主体了
        callback(null, header, stream);
      } else {
        // 仍在读取头部
        header += str;
      }
    }
  }
}

readable.wrap(stream)#

  • stream Stream An "old style" readable stream

  • stream Stream 一个“旧式”可读流

Versions of Node prior to v0.10 had streams that did not implement the entire Streams API as it is today. (See "Compatibility" below for more information.)

Node v0.10 版本之前的流并未实现现今所有流 API。(更多信息详见下文“兼容性”章节。)

If you are using an older Node library that emits 'data' events and has a pause() method that is advisory only, then you can use the wrap() method to create a Readable stream that uses the old stream as its data source.

如果您正在使用早前版本的 Node 库,它触发 'data' 事件并且有一个仅作查询用途的 pause() 方法,那么您可以使用 wrap() 方法来创建一个使用旧式流作为数据源的 Readable 流。

You will very rarely ever need to call this function, but it exists as a convenience for interacting with old Node programs and libraries.

您可能很少需要用到这个函数,但它会作为与旧 Node 程序和库交互的简便方法存在。

For example:

例如:

myReader.on('readable', function() {
myReader.read(); // etc.
});

类: stream.Writable#

The Writable stream interface is an abstraction for a destination that you are writing data to.

Writable(可写)流接口是对您正在写入数据至一个目标的抽象。

Examples of writable streams include:

一些可写流的例子:

writable.write(chunk, [encoding], [callback])#

  • chunk String | Buffer The data to write
  • encoding String The encoding, if chunk is a String
  • callback Function Callback for when this chunk of data is flushed
  • Returns: Boolean True if the data was handled completely.
  • chunk {String | Buffer} 要写入的数据
  • encoding {String} 编码,假如 chunk 是一个字符串
  • callback {Function} 数据块写入后的回调
  • 返回: {Boolean} 如果数据已被全部处理则 true

This method writes some data to the underlying system, and calls the supplied callback once the data has been fully handled.

该方法向底层系统写入数据,并在数据被处理完毕后调用所给的回调。

The return value indicates if you should continue writing right now. If the data had to be buffered internally, then it will return false. Otherwise, it will return true.

返回值表明您是否应该立即继续写入。如果数据需要滞留在内部,则它会返回 false;否则,返回 true

This return value is strictly advisory. You MAY continue to write, even if it returns false. However, writes will be buffered in memory, so it is best not to do this excessively. Instead, wait for the drain event before writing more data.

返回值所表示的状态仅供参考,您【可以】在即便返回 false 的时候继续写入。但是,写入的数据会被滞留在内存中,所以最好不要过分地这么做。最好的做法是等待 drain 事件发生后再继续写入更多数据。

事件: 'drain'#

If a writable.write(chunk) call returns false, then the drain event will indicate when it is appropriate to begin writing more data to the stream.

如果一个 writable.write(chunk) 调用返回 false,那么 drain 事件则表明可以继续向流写入更多数据。

// 向所给可写流写入 1000000 次数据。
// 注意后端压力。
function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // 最后一次!
        writer.write(data, encoding, callback);
      } else {
        // 检查我们应该继续还是等待
        // 不要传递回调,因为我们还没完成。
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 不得不提前停止!
      // 一旦它排空,继续写入数据
      writer.once('drain', write);
    }
  }
}

writable.cork()#

Forces buffering of all writes.

强行滞留所有写入。

Buffered data will be flushed either at .uncork() or at .end() call.

滞留的数据会在 .uncork().end() 调用时被写入。

writable.uncork()#

Flush all data, buffered since .cork() call.

写入所有 .cork() 调用之后滞留的数据。

writable.end([chunk], [encoding], [callback])#

  • chunk String | Buffer Optional data to write
  • encoding String The encoding, if chunk is a String
  • callback Function Optional callback for when the stream is finished

  • chunk String | Buffer 可选,要写入的数据

  • encoding String 编码,假如 chunk 是一个字符串
  • callback Function 可选,流结束后的回调

Call this method when no more data will be written to the stream. If supplied, the callback is attached as a listener on the finish event.

当没有更多数据会被写入到流时调用此方法。如果给出,回调会被用作 finish 事件的监听器。

Calling write() after calling end() will raise an error.

在调用 end() 后调用 write() 会产生错误。

// 写入 'hello, ' 然后以 'world!' 结束
http.createServer(function (req, res) {
  res.write('hello, ');
  res.end('world!');
  // 现在不允许继续写入了
});

事件: 'finish'#

When the end() method has been called, and all data has been flushed to the underlying system, this event is emitted.

end() 方法被调用,并且所有数据已被写入到底层系统,此事件会被触发。

var writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write('hello, #' + i + '!\n');
}
writer.end('this is the end\n');
write.on('finish', function() {
  console.error('已完成所有写入。');
});

事件: 'pipe'#

  • src Readable Stream source stream that is piping to this writable

  • src Readable Stream 导流到本可写流的来源流

This is emitted whenever the pipe() method is called on a readable stream, adding this writable to its set of destinations.

该事件发生于可读流的 pipe() 方法被调用并添加本可写流作为它的目标时。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('pipe', function(src) {
  console.error('某些东西正被导流到 writer');
  assert.equal(src, reader);
});
reader.pipe(writer);

事件: 'unpipe'#

This is emitted whenever the unpipe() method is called on a readable stream, removing this writable from its set of destinations.

该事件发生于可读流的 unpipe() 方法被调用并将本可写流从它的目标移除时。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('unpipe', function(src) {
  console.error('某写东西停止导流到 writer 了');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

类: stream.Duplex#

Duplex streams are streams that implement both the Readable and Writable interfaces. See above for usage.

双工(Duplex)流同时实现了 ReadableWritable 的接口。详见下文用例。

Examples of Duplex streams include:

一些双工流的例子:

类: stream.Transform#

Transform streams are Duplex streams where the output is in some way computed from the input. They implement both the Readable and Writable interfaces. See above for usage.

转换(Transform)流是一种输出由输入计算所得的双工流。它们同时实现了 ReadableWritable 的接口。详见下文用例。

Examples of Transform streams include:

一些转换流的例子:

面向流实现者的 API#

To implement any sort of stream, the pattern is the same:

无论实现任何形式的流,模式都是一样的:

  1. Extend the appropriate parent class in your own subclass. (The util.inherits method is particularly helpful for this.)
  2. Call the appropriate parent class constructor in your constructor, to be sure that the internal mechanisms are set up properly.
  3. Implement one or more specific methods, as detailed below.

  4. 在您的子类中扩充适合的父类。(util.inherits 方法对此很有帮助。)

  5. 在您的构造函数中调用父类的构造函数,以确保内部的机制被正确初始化。
  6. 实现一个或多个特定的方法,参见下面的细节。

The class to extend and the method(s) to implement depend on the sort of stream class you are writing:

所扩充的类和要实现的方法取决于您要编写的流类的形式:

Use-case

Class

Method(s) to implement

Reading only

Readable

_read

Writing only

Writable

_write

Reading and writing

Duplex

_read, _write

Operate on written data, then read the result

Transform

_transform, _flush

使用情景

要实现的方法

只读

Readable

_read

只写

Writable

_write

读写

Duplex

_read, _write

操作被写入数据,然后读出结果

Transform

_transform, _flush

In your implementation code, it is very important to never call the methods described in API for Stream Consumers above. Otherwise, you can potentially cause adverse side effects in programs that consume your streaming interfaces.

在您的实现代码中,十分重要的一点是绝对不要调用上文面向流消费者的 API 中所描述的方法,否则可能在消费您的流接口的程序中产生潜在的副作用。

类: stream.Readable#

stream.Readable is an abstract class designed to be extended with an underlying implementation of the _read(size) method.

stream.Readable 是一个可被扩充的、实现了底层方法 _read(size) 的抽象类。

Please see above under API for Stream Consumers for how to consume streams in your programs. What follows is an explanation of how to implement Readable streams in your programs.

请阅读前文面向流消费者的 API 章节了解如何在您的程序中消费流。文将解释如何在您的程序中自己实现 Readable 流。

例子: 一个计数流#

This is a basic example of a Readable stream. It emits the numerals from 1 to 1,000,000 in ascending order, and then ends.

这是一个 Readable 流的基本例子。它将从 1 至 1,000,000 递增地触发数字,然后结束。

var Readable = require('stream').Readable;
var util = require('util');
util.inherits(Counter, Readable);

function Counter(opt) {
  Readable.call(this, opt);
  this._max = 1000000;
  this._index = 1;
}

function Counter(opt) {
  Readable.call(this, opt);
  this._max = 1000000;
  this._index = 1;
}

Counter.prototype._read = function() {
  var i = this._index++;
  if (i > this._max)
    this.push(null);
  else {
    var str = '' + i;
    var buf = new Buffer(str, 'ascii');
    this.push(buf);
  }
};

例子: SimpleProtocol v1 (Sub-optimal)#

This is similar to the parseHeader function described above, but implemented as a custom stream. Also, note that this implementation does not convert the incoming data to a string.

这个有点类似上文提到的 parseHeader 函数,但它被实现成一个自定义流。同样地,请注意这个实现并未将传入数据转换成字符串。

However, this would be better implemented as a Transform stream. See below for a better implementation.

实际上,更好的办法是将它实现成一个 Transform 流。更好的实现详见下文。

// 简易数据协议的解析器。
// “header”是一个 JSON 对象,后面紧跟 2 个 \n 字符,以及
// 消息主体。
//
// 注意: 使用 Transform 流能更简单地实现这个功能!
// 直接使用 Readable 并不是最佳方式,详见 Transform
// 章节下的备选例子。

var Readable = require('stream').Readable;
var util = require('util');

var Readable = require('stream').Readable;
var util = require('util');

util.inherits(SimpleProtocol, Readable);

util.inherits(SimpleProtocol, Readable);

function SimpleProtocol(source, options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

function SimpleProtocol(source, options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Readable.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;

  Readable.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;

  // source is a readable stream, such as a socket or file
  this._source = source;

  // source 是一个可读流,比如嵌套字或文件
  this._source = source;

  var self = this;
  source.on('end', function() {
    self.push(null);
  });

  var self = this;
  source.on('end', function() {
    self.push(null);
  });

  // give it a kick whenever the source is readable
  // read(0) will not consume any bytes
  source.on('readable', function() {
    self.read(0);
  });

  // 当 source 可读时做点什么
  // read(0) 不会消费任何字节
  source.on('readable', function() {
    self.read(0);
  });

  this._rawHeader = [];
  this.header = null;
}

  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) {
    var chunk = this._source.read();

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) {
    var chunk = this._source.read();

    if (split === -1) {
      // 继续等待 \n\n
      // 暂存数据块,并再次尝试
      this._rawHeader.push(chunk);
      this.push('');
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // 现在,我们得到了一些多余的数据,所以需要 unshift
      // 将多余的数据放回读取队列以便我们的消费者能够读取
      var b = chunk.slice(split);
      this.unshift(b);

      // and let them know that we are done parsing the header.
      this.emit('header', this.header);
    }
  } else {
    // from there on, just provide the data to our consumer.
    // careful not to push(null), since that would indicate EOF.
    var chunk = this._source.read();
    if (chunk) this.push(chunk);
  }
};

      // 并让它们知道我们完成了头部解析。
      this.emit('header', this.header);
    }
  } else {
    // 从现在开始,仅需向我们的消费者提供数据。
    // 注意不要 push(null),因为它表明 EOF。
    var chunk = this._source.read();
    if (chunk) this.push(chunk);
  }
};

// 用法:
// var parser = new SimpleProtocol(source);
// 现在 parser 是一个会触发 'header' 事件并提供已解析
// 的头部的可读流。

new stream.Readable([options])#

  • options Object

    • highWaterMark Number The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default=16kb, or 16 for objectMode streams
    • encoding String If specified, then buffers will be decoded to strings using the specified encoding. Default=null
    • objectMode Boolean Whether this stream should behave as a stream of objects. Meaning that stream.read(n) returns a single value instead of a Buffer of size n
  • options Object

    • highWaterMark Number 停止从底层资源读取前内部缓冲区最多能存放的字节数。缺省为 16kb,对于 objectMode 流则是 16
    • encoding String 若给出,则 Buffer 会被解码成所给编码的字符串。缺省为 null
    • objectMode Boolean 该流是否应该表现为对象的流。意思是说 stream.read(n) 返回一个单独的对象,而不是大小为 n 的 Buffer

In classes that extend the Readable class, make sure to call the Readable constructor so that the buffering settings can be properly initialized.

请确保在扩充 Readable 类的类中调用 Readable 构造函数以便缓冲设定能被正确初始化。

readable._read(size)#

  • size Number Number of bytes to read asynchronously

  • size Number 异步读取的字节数

Note: Implement this function, but do NOT call it directly.

注意:实现这个函数,但【不要】直接调用它。

This function should NOT be called directly. It should be implemented by child classes, and only called by the internal Readable class methods.

这个函数【不应该】被直接调用。它应该被子类所实现,并仅被 Readable 类内部方法所调用。

All Readable stream implementations must provide a _read method to fetch data from the underlying resource.

所有 Readable 流的实现都必须提供一个 _read 方法来从底层资源抓取数据。

This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.

该方法以下划线开头是因为它对于定义它的类是内部的,并且不应该被用户程序直接调用。但是,你应当在您的扩充类中覆盖这个方法。

When data is available, put it into the read queue by calling readable.push(chunk). If push returns false, then you should stop reading. When _read is called again, you should start pushing more data.

当数据可用时,调用 readable.push(chunk) 将它加入到读取队列。如果 push 返回 false,那么您应该停止读取。当 _read 被再次调用,您应该继续推出更多数据。

The size argument is advisory. Implementations where a "read" is a single call that returns data can use this to know how much data to fetch. Implementations where that is not relevant, such as TCP or TLS, may ignore this argument, and simply provide data whenever it becomes available. There is no need, for example to "wait" until size bytes are available before calling stream.push(chunk).

参数 size 仅作查询。“read”调用返回数据的实现可以通过这个参数来知道应当抓取多少数据;其余与之无关的实现,比如 TCP 或 TLS,则可忽略这个参数,并在可用时返回数据。例如,没有必要“等到” size 个字节可用时才调用 stream.push(chunk)

readable.push(chunk, [encoding])#

  • chunk Buffer | null | String Chunk of data to push into the read queue
  • encoding String Encoding of String chunks. Must be a valid Buffer encoding, such as 'utf8' or 'ascii'
  • return Boolean Whether or not more pushes should be performed

  • chunk Buffer | null | String 推入读取队列的数据块

  • encoding String 字符串块的编码。必须是有效的 Buffer 编码,比如 utf8ascii
  • 返回 Boolean 是否应该继续推入

Note: This function should be called by Readable implementors, NOT by consumers of Readable streams.

注意:这个函数应该被 Readable 实现者调用,【而不是】Readable 流的消费者。

The _read() function will not be called again until at least one push(chunk) call is made.

函数 _read() 不会被再次调用,直到至少调用了一次 push(chunk)

The Readable class works by putting data into a read queue to be pulled out later by calling the read() method when the 'readable' event fires.

Readable 类的工作方式是,将数据读入一个队列,当 'readable' 事件发生、调用 read() 方法时,数据会被从队列中取出。

The push() method will explicitly insert some data into the read queue. If it is called with null then it will signal the end of the data (EOF).

push() 方法会明确地向读取队列中插入一些数据。如果调用它时传入了 null 参数,那么它会触发数据结束信号(EOF)。

This API is designed to be as flexible as possible. For example, you may be wrapping a lower-level source which has some sort of pause/resume mechanism, and a data callback. In those cases, you could wrap the low-level source object by doing something like this:

这个 API 被设计成尽可能地灵活。比如说,您可以包装一个低级别的具备某种暂停/恢复机制和数据回调的数据源。这种情况下,您可以通过这种方式包装低级别来源对象:

// source 是一个带 readStop() 和 readStart() 方法的类,
// 以及一个当有数据时会被调用的 `ondata` 成员、一个
// 当数据结束时会被调用的 `onend` 成员。

util.inherits(SourceWrapper, Readable);

util.inherits(SourceWrapper, Readable);

function SourceWrapper(options) {
  Readable.call(this, options);

function SourceWrapper(options) {
  Readable.call(this, options);

  this._source = getLowlevelSourceObject();
  var self = this;

  this._source = getLowlevelSourceObject();
  var self = this;

  // Every time there's data, we push it into the internal buffer.
  this._source.ondata = function(chunk) {
    // if push() returns false, then we need to stop reading from source
    if (!self.push(chunk))
      self._source.readStop();
  };

  // 每当有数据时,我们将它推入到内部缓冲区中
  this._source.ondata = function(chunk) {
    // 如果 push() 返回 false,我们就需要暂停读取 source
    if (!self.push(chunk))
      self._source.readStop();
  };

  // When the source ends, we push the EOF-signalling `null` chunk
  this._source.onend = function() {
    self.push(null);
  };
}

  // 当来源结束时,我们 push 一个 `null` 块以表示 EOF
  this._source.onend = function() {
    self.push(null);
  };
}

// _read 会在流想要拉取更多数据时被调用
// 本例中忽略 size 参数
SourceWrapper.prototype._read = function(size) {
  this._source.readStart();
};

类: stream.Writable#

stream.Writable is an abstract class designed to be extended with an underlying implementation of the _write(chunk, encoding, callback) method.

stream.Writable 是一个可被扩充的、实现了底层方法 _write(chunk, encoding, callback) 的抽象类。

Please see above under API for Stream Consumers for how to consume writable streams in your programs. What follows is an explanation of how to implement Writable streams in your programs.

请阅读前文面向流消费者的 API 章节了解如何在您的程序中消费可读流。下文将解释如何在您的程序中自己实现 Writable 流。

new stream.Writable([options])#

  • options Object

    • highWaterMark Number Buffer level when write() starts returning false. Default=16kb, or 16 for objectMode streams
    • decodeStrings Boolean Whether or not to decode strings into Buffers before passing them to _write(). Default=true
  • options Object

    • highWaterMark Number write() 开始返回 false 的缓冲级别。缺省为 16kb,对于 objectMode 流则是 16
    • decodeStrings Boolean 是否在传递给 _write() 前将字符串解码成 Buffer。缺省为 true

In classes that extend the Writable class, make sure to call the constructor so that the buffering settings can be properly initialized.

请确保在扩充 Writable 类的类中调用构造函数以便缓冲设定能被正确初始化。

writable._write(chunk, encoding, callback)#

  • chunk Buffer | String The chunk to be written. Will always be a buffer unless the decodeStrings option was set to false.
  • encoding String If the chunk is a string, then this is the encoding type. Ignore if chunk is a buffer. Note that chunk will always be a buffer unless the decodeStrings option is explicitly set to false.
  • callback Function Call this function (optionally with an error argument) when you are done processing the supplied chunk.

  • chunk Buffer | String 要被写入的数据块。总会是一个 Buffer,除非 decodeStrings 选项被设定为 false

  • encoding String 如果数据块是字符串,则这里指定它的编码类型。如果数据块是 Buffer 则忽略此设定。请注意数据块总会是一个 Buffer,除非 decodeStrings 选项被明确设定为 false
  • callback Function 当您处理完所给数据块时调用此函数(可选地可附上一个错误参数)。

All Writable stream implementations must provide a _write() method to send data to the underlying resource.

所有 Writable 流的实现必须提供一个 _write() 方法来将数据发送到底层资源。

Note: This function MUST NOT be called directly. It should be implemented by child classes, and called by the internal Writable class methods only.

注意:该函数【禁止】被直接调用。它应该被子类所实现,并仅被 Writable 内部方法所调用。

Call the callback using the standard callback(error) pattern to signal that the write completed successfully or with an error.

使用标准的 callback(error) 形式来调用回调以表明写入成功完成或遇到错误。

If the decodeStrings flag is set in the constructor options, then chunk may be a string rather than a Buffer, and encoding will indicate the sort of string that it is. This is to support implementations that have an optimized handling for certain string data encodings. If you do not explicitly set the decodeStrings option to false, then you can safely ignore the encoding argument, and assume that chunk will always be a Buffer.

如果构造函数选项中设定了 decodeStrings 标志,则 chunk 可能会是字符串而不是 Buffer,并且 encoding 表明了字符串的格式。这种设计是为了支持对某些字符串数据编码提供优化处理的实现。如果您没有明确地将 decodeStrings 选项设定为 false,那么您可以安全地忽略 encoding 参数,并假定 chunk 总是一个 Buffer。

This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.

该方法以下划线开头是因为它对于定义它的类是内部的,并且不应该被用户程序直接调用。但是,你应当在您的扩充类中覆盖这个方法。

writable._writev(chunks, callback)#

  • chunks Array The chunks to be written. Each chunk has following format: <span class="type"> chunk: ..., encoding: ... </span>.
  • callback Function Call this function (optionally with an error argument) when you are done processing the supplied chunks.

  • chunks Array 要写入的块。每个块都遵循这种格式:{ chunk: ..., encoding: ... }

  • callback Function 当您处理完所给数据块时调用此函数(可选地可附上一个错误参数)。

Note: This function MUST NOT be called directly. It may be implemented by child classes, and called by the internal Writable class methods only.

注意:该函数【禁止】被直接调用。它应该被子类所实现,并仅被 Writable 内部方法所调用。

This function is completely optional to implement. In most cases it is unnecessary. If implemented, it will be called with all the chunks that are buffered in the write queue.

该函数的实现完全是可选的,在大多数情况下都是不必要的。如果实现,它会被以所有滞留在写入队列中的数据块调用。

类: stream.Duplex#

A "duplex" stream is one that is both Readable and Writable, such as a TCP socket connection.

“双工”(duplex)流同时兼具可读和可写特性,比如一个 TCP 嵌套字连接。

Note that stream.Duplex is an abstract class designed to be extended with an underlying implementation of the _read(size) and _write(chunk, encoding, callback) methods as you would with a Readable or Writable stream class.

值得注意的是,stream.Duplex 是一个可以像 Readable 或 Writable 一样被扩充、实现了底层方法 _read(sise)_write(chunk, encoding, callback) 的抽象类。

Since JavaScript doesn't have multiple prototypal inheritance, this class prototypally inherits from Readable, and then parasitically from Writable. It is thus up to the user to implement both the lowlevel _read(n) method as well as the lowlevel _write(chunk, encoding, callback) method on extension duplex classes.

由于 JavaScript 并不具备多原型继承能力,这个类实际上继承自 Readable,并寄生自 Writable,从而让用户在双工类的扩充中能同时实现低级别的 _read(n) 方法和 _write(chunk, encoding, callback) 方法。

new stream.Duplex(options)#

  • options Object Passed to both Writable and Readable constructors. Also has the following fields:

    • allowHalfOpen Boolean Default=true. If set to false, then the stream will automatically end the readable side when the writable side ends and vice versa.
  • options Object Passed to both Writable and Readable constructors. Also has the following fields:

    • allowHalfOpen Boolean Default=true. If set to false, then the stream will automatically end the readable side when the writable side ends and vice versa.

In classes that extend the Duplex class, make sure to call the constructor so that the buffering settings can be properly initialized.

请确保在扩充 Duplex 类的类中调用构造函数以便缓冲设定能被正确初始化。

类: stream.Transform#

A "transform" stream is a duplex stream where the output is causally connected in some way to the input, such as a zlib stream or a crypto stream.

“转换”(transform)流实际上是一个输出与输入存在因果关系的双工流,比如 zlib 流或 crypto 流。

There is no requirement that the output be the same size as the input, the same number of chunks, or arrive at the same time. For example, a Hash stream will only ever have a single chunk of output which is provided when the input is ended. A zlib stream will produce output that is either much smaller or much larger than its input.

输入和输出并无要求相同大小、相同块数或同时到达。举个例子,一个 Hash 流只会在输入结束时产生一个数据块的输出;一个 zlib 流会产生比输入小得多或大得多的输出。

Rather than implement the _read() and _write() methods, Transform classes must implement the _transform() method, and may optionally also implement the _flush() method. (See below.)

转换类必须实现 _transform() 方法,而不是 _read()_write() 方法。可选的,也可以实现 _flush() 方法。(详见下文。)

new stream.Transform([options])#

  • options Object Passed to both Writable and Readable constructors.

  • options Object 传递给 Writable 和 Readable 构造函数。

In classes that extend the Transform class, make sure to call the constructor so that the buffering settings can be properly initialized.

请确保在扩充 Transform 类的类中调用了构造函数,以使得缓冲设定能被正确初始化。

transform._transform(chunk, encoding, callback)#

  • chunk Buffer | String The chunk to be transformed. Will always be a buffer unless the decodeStrings option was set to false.
  • encoding String If the chunk is a string, then this is the encoding type. (Ignore if decodeStrings chunk is a buffer.)
  • callback Function Call this function (optionally with an error argument) when you are done processing the supplied chunk.

  • chunk Buffer | String 要被转换的数据块。总是 Buffer,除非 decodeStrings 选项被设定为 false

  • encoding String 如果数据块是一个字符串,那么这就是它的编码类型。(数据块是 Buffer 则会忽略此参数。)
  • callback Function 当您处理完所提供的数据块时调用此函数(可选地附上一个错误参数)。

Note: This function MUST NOT be called directly. It should be implemented by child classes, and called by the internal Transform class methods only.

注意:该函数【禁止】被直接调用。它应该被子类所实现,并仅被 Transform 内部方法所调用。

All Transform stream implementations must provide a _transform method to accept input and produce output.

所有转换流的实现都必须提供一个 _transform 方法来接受输入并产生输出。

_transform should do whatever has to be done in this specific Transform class, to handle the bytes being written, and pass them off to the readable portion of the interface. Do asynchronous I/O, process things, and so on.

_transform 应当承担特定 Transform 类中所有处理被写入的字节、并将它们丢给接口的可写端的职责,进行异步 I/O,处理其它事情等等。

Call transform.push(outputChunk) 0 or more times to generate output from this input chunk, depending on how much data you want to output as a result of this chunk.

调用 transform.push(outputChunk) 0 或多次来从输入块生成输出,取决于您想从这个数据块输出多少数据。

Call the callback function only when the current chunk is completely consumed. Note that there may or may not be output as a result of any particular input chunk.

仅当当前数据块被完全消费时调用回调函数。注意,任何特定的输入块都有可能或可能不会产生输出。

This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.

该方法以下划线开头是因为它对于定义它的类是内部的,并且不应该被用户程序直接调用。但是,你应当在您的扩充类中覆盖这个方法。

transform._flush(callback)#

  • callback Function Call this function (optionally with an error argument) when you are done flushing any remaining data.

  • callback Function 当您写入完毕剩下的数据后调用此函数(可选地可附上一个错误对象)。

Note: This function MUST NOT be called directly. It MAY be implemented by child classes, and if so, will be called by the internal Transform class methods only.

注意:该函数【禁止】被直接调用。它【可以】被子类所实现,并且如果实现,仅被 Transform 内部方法所调用。

In some cases, your transform operation may need to emit a bit more data at the end of the stream. For example, a Zlib compression stream will store up some internal state so that it can optimally compress the output. At the end, however, it needs to do the best it can with what is left, so that the data will be complete.

在一些情景中,您的转换操作可能需要在流的末尾多发生一点点数据。例如,一个 Zlib 压缩流会储存一些内部状态以便更好地压缩输出,但在最后它需要尽可能好地处理剩下的东西以使数据完整。

In those cases, you can implement a _flush method, which will be called at the very end, after all the written data is consumed, but before emitting end to signal the end of the readable side. Just like with _transform, call transform.push(chunk) zero or more times, as appropriate, and call callback when the flush operation is complete.

在这种情况中,您可以实现一个 _flush 方法,它会在最后被调用,在所有写入数据被消费、但在触发 end 表示可读端到达末尾之前。和 _transform 一样,只需在写入操作完成时适当地调用 transform.push(chunk) 零或多次。

This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you are expected to override this method in your own extension classes.

该方法以下划线开头是因为它对于定义它的类是内部的,并且不应该被用户程序直接调用。但是,你应当在您的扩充类中覆盖这个方法。

例子: SimpleProtocol 解析器 v2#

The example above of a simple protocol parser can be implemented simply by using the higher level Transform stream class, similar to the parseHeader and SimpleProtocol v1 examples above.

上文的简易协议解析器例子能够很简单地使用高级别 Transform 流类实现,类似于前文 parseHeaderSimpleProtocal v1 示例。

In this example, rather than providing the input as an argument, it would be piped into the parser, which is a more idiomatic Node stream approach.

在这个示例中,输入会被导流到解析器中,而不是作为参数提供。这种做法更符合 Node 流的惯例。

var util = require('util');
var Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // 检查数据块是否有 \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // 仍旧等待 \n\n
      // 暂存数据块并重试。
      this._rawHeader.push(chunk);
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // 并让它们知道我们完成了头部解析。
      this.emit('header', this.header);

      // now, because we got some extra data, emit this first.
      this.push(chunk.slice(split));
    }
  } else {
    // from there on, just provide the data to our consumer as-is.
    this.push(chunk);
  }
  done();
};

      // 现在,由于我们获得了一些额外的数据,先触发这个。
      this.push(chunk.slice(split));
    }
  } else {
    // 之后,仅需向我们的消费者原样提供数据。
    this.push(chunk);
  }
  done();
};

// 用法:
// var parser = new SimpleProtocol();
// source.pipe(parser)
// 现在 parser 是一个会触发 'header' 并带上解析后的
// 头部数据的可读流。

类: stream.PassThrough#

This is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. Its purpose is mainly for examples and testing, but there are occasionally use cases where it can come in handy as a building block for novel sorts of streams.

这是 Transform 流的一个简单实现,将输入的字节简单地传递给输出。它的主要用途是演示和测试,但偶尔要构建某种特殊流的时候也能派上用场。

流:内部细节#

缓冲#

Both Writable and Readable streams will buffer data on an internal object called _writableState.buffer or _readableState.buffer, respectively.

无论 Writable 或 Readable 流都会在内部分别叫做 _writableState.buffer_readableState.buffer 的对象中缓冲数据。

The amount of data that will potentially be buffered depends on the highWaterMark option which is passed into the constructor.

被缓冲的数据量取决于传递给构造函数的 highWaterMark(最高水位线)选项。

Buffering in Readable streams happens when the implementation calls stream.push(chunk). If the consumer of the Stream does not call stream.read(), then the data will sit in the internal queue until it is consumed.

Readable 流的滞留发生于当实现调用 stream.push(chunk) 的时候。如果流的消费者没有调用 stream.read(),那么数据将会一直待在内部队列,直到它被消费。

Buffering in Writable streams happens when the user calls stream.write(chunk) repeatedly, even when write() returns false.

Writable 流的滞留发生于当用户重复调用 stream.write(chunk) 即便此时 write() 返回 false 时。

The purpose of streams, especially with the pipe() method, is to limit the buffering of data to acceptable levels, so that sources and destinations of varying speed will not overwhelm the available memory.

流,尤其是 pipe() 方法的初衷,是将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存。

stream.read(0)#

There are some cases where you want to trigger a refresh of the underlying readable stream mechanisms, without actually consuming any data. In that case, you can call stream.read(0), which will always return null.

在某写情景中,您可能需要触发底层可读流机制的刷新,但不真正消费任何数据。在这中情况下,您可以调用 stream.read(0),它总会返回 null

If the internal read buffer is below the highWaterMark, and the stream is not currently reading, then calling read(0) will trigger a low-level _read call.

如果内部读取缓冲低于 highWaterMark 水位线,并且流当前不在读取状态,那么调用 read(0) 会触发一个低级 _read 调用。

There is almost never a need to do this. However, you will see some cases in Node's internals where this is done, particularly in the Readable stream class internals.

虽然几乎没有必要这么做,但您可以在 Node 内部的某些地方看到它确实这么做了,尤其是在 Readable 流类的内部。

stream.push('')#

Pushing a zero-byte string or Buffer (when not in Object mode) has an interesting side effect. Because it is a call to stream.push(), it will end the reading process. However, it does not add any data to the readable buffer, so there's nothing for a user to consume.

推入一个零字节字符串或 Buffer(当不在 对象模式 时)有一个有趣的副作用。因为它是一个对 stream.push() 的调用,它会结束 reading 进程。然而,它没有添加任何数据到可读缓冲中,所以没有东西可以被用户消费。

Very rarely, there are cases where you have no data to provide now, but the consumer of your stream (or, perhaps, another bit of your own code) will know when to check again, by calling stream.read(0). In those cases, you may call stream.push('').

在极少数情况下,您当时没有数据提供,但您的流的消费者(或您的代码的其它部分)会通过调用 stream.read(0) 得知何时再次检查。在这中情况下,您可以调用 stream.push('')

So far, the only use case for this functionality is in the tls.CryptoStream class, which is deprecated in Node v0.12. If you find that you have to use stream.push(''), please consider another approach, because it almost certainly indicates that something is horribly wrong.

到目前为止,这个功能唯一一个使用情景是在 tls.CryptoStream 类中,但它将在 Node v0.12 中被废弃。如果您发现您不得不使用 stream.push(''),请考虑另一种方式,因为几乎可以明确表明这是某种可怕的错误。

与 Node 早期版本的兼容性#

In versions of Node prior to v0.10, the Readable stream interface was simpler, but also less powerful and less useful.

在 v0.10 之前版本的 Node 中,Readable 流的接口较为简单,同时功能和实用性也较弱。

  • Rather than waiting for you to call the read() method, 'data' events would start emitting immediately. If you needed to do some I/O to decide how to handle data, then you had to store the chunks in some kind of buffer so that they would not be lost.
  • The pause() method was advisory, rather than guaranteed. This meant that you still had to be prepared to receive 'data' events even when the stream was in a paused state.

  • 'data' 事件会开始立即开始发生,而不会等待您调用 read() 方法。如果您需要进行某些 I/O 来决定如何处理数据,那么您只能将数据块储存到某种缓冲区中以防它们流失。

  • pause() 方法仅起提议作用,而不保证生效。这意味着,即便当流处于暂停状态时,您仍然需要准备接收 'data' 事件。

In Node v0.10, the Readable class described below was added. For backwards compatibility with older Node programs, Readable streams switch into "flowing mode" when a 'data' event handler is added, or when the resume() method is called. The effect is that, even if you are not using the new read() method and 'readable' event, you no longer have to worry about losing 'data' chunks.

在 Node v0.10 中,下文所述的 Readable 类被加入进来。为了向后兼容考虑,Readable 流会在添加了 'data' 事件监听器、或 resume() 方法被调用时切换至“流动模式”。其作用是,即便您不使用新的 read() 方法和 'readable' 事件,您也不必担心丢失 'data' 数据块。

Most programs will continue to function normally. However, this introduces an edge case in the following conditions:

大多数程序会维持正常功能,然而,这也会在下列条件下引入一种边界情况:

  • No 'data' event handler is added.
  • The resume() method is never called.
  • The stream is not piped to any writable destination.

  • 没有添加 'data' 事件处理器。

  • resume() 方法从未被调用。
  • 流未被导流到任何可写目标。

For example, consider the following code:

举个例子,请留意下面代码:

// 警告!不能用!
net.createServer(function(socket) {

  // we add an 'end' method, but never consume the data
  socket.on('end', function() {
    // It will never get here.
    socket.end('I got your message (but didnt read it)\n');
  });

  // 我们添加了一个 'end' 事件,但从未消费数据
  socket.on('end', function() {
    // 它永远不会到达这里
    socket.end('我收到了您的来信(但我没看它)\n');
  });

}).listen(1337);

In versions of node prior to v0.10, the incoming message data would be simply discarded. However, in Node v0.10 and beyond, the socket will remain paused forever.

在 Node v0.10 之前的版本中,传入消息数据会被简单地丢弃。然而在 Node v0.10 及之后,socket 会一直保持暂停。

The workaround in this situation is to call the resume() method to start the flow of data:

对于这种情形的妥协方式是调用 resume() 方法来开启数据流:

// 妥协
net.createServer(function(socket) {

  socket.on('end', function() {
    socket.end('I got your message (but didnt read it)\n');
  });

  socket.on('end', function() {
    socket.end('我收到了您的来信(但我没看它)\n');
  });

  // start the flow of data, discarding it.
  socket.resume();

  // 开启数据流,并丢弃它们。
  socket.resume();

}).listen(1337);

In addition to new Readable streams switching into flowing mode, pre-v0.10 style streams can be wrapped in a Readable class using the wrap() method.

额外的,对于切换到流动模式的新 Readable 流,v0.10 之前风格的流可以通过 wrap() 方法被包装成 Readable 类。

对象模式#

Normally, Streams operate on Strings and Buffers exclusively.

通常情况下,流只操作字符串和 Buffer。

Streams that are in object mode can emit generic JavaScript values other than Buffers and Strings.

处于对象模式的流除了 Buffer 和字符串外还能读出普通的 JavaScript 值。

A Readable stream in object mode will always return a single item from a call to stream.read(size), regardless of what the size argument is.

一个处于对象模式的 Readable 流调用 stream.read(size) 时总会返回单个项目,无论传入什么 size 参数。

A Writable stream in object mode will always ignore the encoding argument to stream.write(data, encoding).

一个处于对象模式的 Writable 流总是会忽略传给 stream.write(data, encoding)encoding 参数。

The special value null still retains its special value for object mode streams. That is, for object mode readable streams, null as a return value from stream.read() indicates that there is no more data, and stream.push(null) will signal the end of stream data (EOF).

特殊值 null 在对象模式流中依旧保持它的特殊性。也就说,对于对象模式的可读流,stream.read() 返回 null 意味着没有更多数据,同时 stream.push(null) 会告知流数据到达末端(EOF)。

No streams in Node core are object mode streams. This pattern is only used by userland streaming libraries.

Node 核心不存在对象模式的流,这种设计只被某些用户态流式库所使用。

You should set objectMode in your stream child class constructor on the options object. Setting objectMode mid-stream is not safe.

您应该在您的流子类构造函数的选项对象中设置 objectMode。在流的过程中设置 objectMode 是不安全的。

状态对象#

Readable streams have a member object called _readableState. Writable streams have a member object called _writableState. Duplex streams have both.

Readable 流有一个成员对象叫作 _readableStateWritable 流有一个成员对象叫作 _writableStateDuplex 流二者兼备。

These objects should generally not be modified in child classes. However, if you have a Duplex or Transform stream that should be in objectMode on the readable side, and not in objectMode on the writable side, then you may do this in the constructor by setting the flag explicitly on the appropriate state object.

这些对象通常不应该被子类所更改。然而,如果您有一个 Duplex 或 Transform 流,它的可读端应该是 objectMode,但可写端却又不是 objectMode,那么您可以在构造函数里明确地设定合适的状态对象的标记来达到此目的。

var util = require('util');
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
util.inherits(JSONParseStream, Transform);

// Gets \n-delimited JSON string data, and emits the parsed objects
function JSONParseStream(options) {
  if (!(this instanceof JSONParseStream))
    return new JSONParseStream(options);

// 获取以 \n 分隔的 JSON 字符串数据,并丢出解析后的对象
function JSONParseStream(options) {
  if (!(this instanceof JSONParseStream))
    return new JSONParseStream(options);

  Transform.call(this, options);
  this._writableState.objectMode = false;
  this._readableState.objectMode = true;
  this._buffer = '';
  this._decoder = new StringDecoder('utf8');
}

  Transform.call(this, options);
  this._writableState.objectMode = false;
  this._readableState.objectMode = true;
  this._buffer = '';
  this._decoder = new StringDecoder('utf8');
}

JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
  this._buffer += this._decoder.write(chunk);
  // split on newlines
  var lines = this._buffer.split(/\r?\n/);
  // keep the last partial line buffered
  this._buffer = lines.pop();
  for (var l = 0; l < lines.length; l++) {
    var line = lines[l];
    try {
      var obj = JSON.parse(line);
    } catch (er) {
      this.emit('error', er);
      return;
    }
    // push the parsed object out to the readable consumer
    this.push(obj);
  }
  cb();
};

JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
  this._buffer += this._decoder.write(chunk);
  // 以新行分割
  var lines = this._buffer.split(/\r?\n/);
  // 保留最后一行被缓冲
  this._buffer = lines.pop();
  for (var l = 0; l < lines.length; l++) {
    var line = lines[l];
    try {
      var obj = JSON.parse(line);
    } catch (er) {
      this.emit('error', er);
      return;
    }
    // 推出解析后的对象到可读消费者
    this.push(obj);
  }
  cb();
};

JSONParseStream.prototype._flush = function(cb) {
  // 仅仅处理剩下的东西
  var rem = this._buffer.trim();
  if (rem) {
    try {
      var obj = JSON.parse(rem);
    } catch (er) {
      this.emit('error', er);
      return;
    }
    // 推出解析后的对象到可读消费者
    this.push(obj);
  }
  cb();
};

The state objects contain other useful information for debugging the state of streams in your programs. It is safe to look at them, but beyond setting option flags in the constructor, it is not safe to modify them.

状态对象包含了其它调试您的程序的流的状态时有用的信息。读取它们是可以的,但越过构造函数的选项来更改它们是不安全的