Node.js API详解之 stream

stream(流)在 Node.js 中是处理流数据的抽象接口(abstract interface)。
stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。
Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。
流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。
Node.js 中有四种基本的流类型:

Readable – 可读的流 (例如 fs.createReadStream()).
Writable – 可写的流 (例如 fs.createWriteStream()).
Duplex – 可读写的流 (例如 net.Socket).
Transform – 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

stream 模块可以通过以下方式引入:

const stream = require('stream');

尽管所有的 Node.js 用户都应该理解流的工作方式,这点很重要,
但是 stream 模块本身只对于那些需要创建新的流的实例的开发者最有用处。
对于主要是 消费 流的开发者来说,他们很少(如果有的话)需要直接使用 stream 模块。
文档分为三节。第一节阐述了在应用中使用流相关的 API 。 第二节阐述了 实现 新的流类型相关的 API 。第三节是一些额外的注意事项。

目录:

Buffering

说明:

Writable 和 Readable 流都会将数据存储到内部的缓存(buffer)中。
缓存的大小取决于传递给流构造函数的 highWaterMark 选项。
highWaterMark 选项指定了总的字节数或对象的总数。
当可读流调用 stream.push(chunk) 方法时,数据被放到缓存中。
在没有调用 stream.read() 方法之前, 这些数据会始终保存在于内部队列中,直到被使用。
当内部缓存的大小达到 highWaterMark 指定的阈值时,流会暂停读取数据,直到当前缓存的数据被使用。

可写流通过反复调用 writable.write(chunk) 方法将数据放到缓存。
当内部可写缓存的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。
一旦内部缓存的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false 。

Duplex 和 Transform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓存用于读和写。
在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。
例如, net.Socket 就是 Duplex 的实例,它的可读端可以消费从套接字(socket)中接收的数据, 可写端则可以将数据写入到套接字。
由于数据写入到套接字中的速度可能比从套接字接收数据的速度快或者慢, 在读写两端使用独立缓存,并进行独立操作就显得很重要了。

提供给Stream应用的API

说明:

几乎所有的 Node.js 应用,不管多么简单,都在某种程度上使用了流。
对于只是简单写入数据到流和从流中消费数据的应用来说, 不要求直接实现流接口,通常也不需要调用 require(‘stream’)。
我们称这一部分为Stream应用API

Writable Streams

说明:

可写流是对数据写入’目的地’的一种抽象。
可写流的例子包括:
HTTP requests, on the client
HTTP responses, on the server
fs write streams
zlib streams
crypto streams
TCP sockets
child process stdin
process.stdout, process.stderr

stream.Writable 类

说明:

提供可写流的API,介绍 Writable 类,我们用 http模块来做demo的演示。
我们主要介绍request事件中的 res 参数。

demo:

const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
});
 
httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

writable.write(chunk[, encoding][, callback])

说明:

writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。
chunk:要写入的数据,对于非对象模式下的流, chunk 必须是字符串, Buffer 或者 Uint8Array。
对于对象模式下的流,chunk 可以是除 null 外的任意 JavaScript 值。
encoding:如果 chunk 是字符串,这里指定字符编码
callback:缓冲数据输出时的回调函数
如果流需要等待 ‘drain’ 事件触发才能继续写入数据,返回 false; 否则返回 true。
如果有错误发生, callback 不一定 以这个错误作为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听 ‘error’ 事件。
在确认了 chunk 后,如果内部缓冲区的小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。
如果返回值为 false ,应该停止向流中写入数据,直到 ‘drain’ 事件被触发。

demo:

const http = require('http');
 
const httpServer = http.createServer();

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let isFull = res.write('Welcome to ', 'utf8', () =>{
  		console.log('change response');
  	});
  	console.log(isFull);
});  	
//  true
//  change response

httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

writable.end([chunk][, encoding][, callback])

说明:

调用 writable.end() 方法表明接下来没有数据要被写入 Writable。
通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据。
如果传入了可选的 callback 函数,它将作为 ‘finish’ 事件的回调函数。
在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

demo:

const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let isFull = res.write('Welcome to ', 'utf8', () =>{
  		console.log('change response');
  	});
  	
  	res.end('isjs', 'utf8', () => {
  		console.log(isFull);
  	});
});
// change response
// true

httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

writable.setDefaultEncoding(encoding)

说明:

此方法用来为 Writable 设置 encoding。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	res.setDefaultEncoding('utf8')

  	let isFull = res.write('Welcome to ', 'utf8', () =>{
  		console.log('change response');
  	});
  	
  	res.end('isjs', 'utf8', () => {
  		console.log(isFull);
  	});
});

writable.destroy([error])

说明:

销毁这个流,并抛出传递的错误。当这个函数被调用后,这个写入流就结束了。
使用者不应该重写这个函数,而是执行 writable._destroy。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let isFull = res.write('Welcome to ', 'utf8', () =>{
  		console.log('change response');
  	});
  	
  	res.destroy();
  	console.log(isFull);
});
// true
// change response

writable.cork()

说明:

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。
直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出。
在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internal buffer)可能失效,从而导致性能下降。
writable.cork() 方法主要就是用来避免这种情况。 对于这种情况, 实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率。

demo:

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

writable.uncork()

说明:

writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。
果使用 writable.cork() 和 writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。
通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理。
如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据

demo:

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  	stream.uncork();
  	// 之前的数据只有在 uncork() 被二次调用后才会输出
  	stream.uncork();
});

‘drain’ 事件

说明:

如果调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 ‘drain’ 事件,这时才可以继续向流中写入数据。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let ok = true;

  	res.on('drain', () =>{
  		res.end();
  	});

  	while(ok){
  		ok = res.write('Welcome to isjs');
  	}
});

‘close’ 事件

说明:

‘close’ 事件将在流或其底层资源(比如一个文件)关闭后触发。’close’ 事件触发后,该流将不会再触发任何事件。
不是所有可写流都会触发 ‘close’ 事件。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let ok = true;

  	res.on('close', () =>{
  		console.log('closed')
  	});

  	res.write('Welcome to isjs');
  	res.destroy();

});
// closed

‘error’ 事件

说明:

‘error’ 事件在写入数据出错或者使用管道出错时触发。事件发生时,回调函数仅会接收到一个 Error 参数。
注意: ‘error’ 事件发生时,流并不会关闭。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let ok = true;

  	res.on('error', () =>{
  		console.log('You can\'t use write function after end function');
  	});

  	
  	res.write('Welcome to isjs');
  	res.end();
  	res.write('!');

});
// You can't use write function after end function

‘finish’ 事件

说明:

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统(underlying system)之后, ‘finish’ 事件将被触发。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let ok = true;

  	res.on('finish', () =>{
  		console.log('has been end');
  	});

  	
  	res.write('Welcome to isjs');
  	res.end();

});
// has been end

‘pipe’ 事件

说明:

在 Readable Stream 上调用 stream.pipe() 方法,并传递当前 Writable Stream 时,将会在 Writable Stream 触发 ‘pipe’ 事件。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let ok = true;

  	res.on('pipe', (src) =>{
  		//src: 输出到目标可写流(writable)的源流(source stream)
  		console.log('something is piping into the writer');
  	});

  	res.write('Welcome to isjs');

  	req.pipe(res);

});

‘unpipe’ 事件

说明:

在 Readable Stream 上调用 stream.unpipe() 方法,从目标流向中移除当前 Writable 时,将会触发 ‘unpipe’ 事件。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	let ok = true;

  	res.on('pipe', (src) =>{
  		//src: 输出到目标可写流(writable)的源流(source stream)
  		console.log('something is piping into the writer');
  	});

  	res.on('unpipe', (src) =>{
  		//src: 当前可写流的源流
  		console.log('Something has stopped piping into the writer.');
  	});

  	res.write('Welcome to isjs');

  	req.pipe(res);
  	req.unpipe(res);

});
// something is piping into the writer
// Something has stopped piping into the writer.

Readable Streams

说明:

Readable Streams(可读流) 是对提供数据的 源头 (source)的抽象
可读流的例子包括:
HTTP responses, on the client
HTTP requests, on the server
fs read streams
zlib streams
crypto streams
TCP sockets
child process stdout and stderr
process.stdin

两种模式:

可读流工作在下面两种模式之一:flowing 和 paused 。
在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。
在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。
所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:
1.监听 ‘data’ 事件。
2.调用 stream.resume() 方法。
3.调用 stream.pipe() 方法将数据发送到 Writable。
可读流可以通过下面途径切换到 paused 模式:
1.如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
2.如果存在管道目标,可以通过取消 ‘data’ 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。
注意: 如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。

三种状态:

可读流的“两种模式”是一种简单抽象。它抽象了在可读流实现(Readable stream implementation)内部发生的复杂的状态管理过程。
在任意时刻,任意可读流应确切处于下面三种状态之一:
1.readable._readableState.flowing = null
2.readable._readableState.flowing = false
3.readable._readableState.flowing = true
若 readable._readableState.flowing 为 null,由于不存在数据使用者,可读流将不会产生数据。
在这个状态下,监听 ‘data’ 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法,
readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件
调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure),
将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。
在这种情况下,为 ‘data’ 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。
当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。

使用建议:

可读流 API 的演化贯穿了多个 Node.js 版本,提供了多种方法来消费流数据。通常开发者应该选择其中 一种 来消费数据,而 不应该 在单个流使用多种方法来消费数据。
对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现。
开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitter 和 readable.pause()/readable.resume() 提供的 API

stream.Readable 类

说明:

提供可读流的API,介绍 Readable 类,我们用 http模块来做demo的演示。
我们主要介绍request事件中的 req 参数。

demo:

const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
});
 
httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

‘data’ 事件

说明:

chunk:数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。 对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。
data’ 事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。
调用 readable.pipe(), readable.resume() 方法,或为 ‘data’ 事件添加回调可以将流转换到 flowing 模式。
‘data’ 事件也会在调用 readable.read() 方法并有数据返回时触发。
在没有明确暂停的流上添加 ‘data’ 事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。
如果调用 readable.setEncoding() 方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个 Buffer 实例。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
  	
  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

  	
  	res.write('Welcome to isjs');
  	res.end();
});
// requestData: {"name":"isjs"}

‘end’ 事件

说明:

注意: ‘end’ 事件只有在数据被完全消费后 才会触发 。
可以在数据被完全消费后,通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
  	
  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{
	 	 console.log('There will be no more data.');
	});

  	
  	res.write('Welcome to isjs');
  	res.end();
});
// requestData: {"name":"isjs"}
// There will be no more data.

‘error’ 事件

说明:

‘error’ 事件可以在任何时候在可读流实现(Readable implementation)上触发。
通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
回调函数将接收到一个 Error 对象。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
  	
  	req.on('error', (err) =>{
	 	console.log(`errorMsg: ${err} `);
	});

	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{
	 	 console.log('There will be no more data.');
	});
  	
  	res.write('Welcome to isjs');
  	res.end();
});

‘close’ 事件

说明:

‘close’ 事件将在流或其底层资源(比如一个文件)关闭后触发。
‘close’ 事件触发后,该流将不会再触发任何事件。

demo:

const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
  	
  	req.on('readable', () =>{
       console.log('readable:', req.read());
  	});	

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

  	req.on('error', (err) =>{
	 	console.log(`errorMsg: ${err} `);
	});

	req.on('close', () =>{
	 	console.log('closed');
	});

	req.on('end', (chunk) =>{
	 	 console.log('There will be no more data.');
	});
  	
  	req.destroy();
  	res.write('Welcome to isjs');
  	res.end();
});
// start httpServer   ->   服务启动
// requestData: {"name":"isjs"}    ->   触发date事件
// readable: null    ->   有新的数据传递,触发readable事件
// readable: null    ->   到了流的尾部,触发readable事件
// There will be no more data.    ->   数据传递完成后触发end事件
// closed    ->    流被销毁后触发close事件

httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

‘readable’ 事件

说明:

‘readable’ 事件将在流中有数据可供读取时触发。在某些情况下,为 ‘readable’ 事件添加回调将会导致一些数据被读取到内部缓存中
当到达流数据尾部时, ‘readable’ 事件也会触发。触发顺序在 ‘end’ 事件之前
‘readable’ 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。
对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。
注意: 通常情况下,readable.pipe() 方法和 ‘data’ 事件机制比 ‘readable’ 事件更容易理解。

readable.read([size])

说明:

readable.read()方法从内部缓冲区中抽出并返回一些数据。如果没有可读的数据,返回null。
readable.read()方法默认数据将作为“Buffer”对象返回 ,除非已经使用readable.setEncoding()方法设置编码或流运行在对象模式。
size:可选的size参数指定要读取的特定数量的字节。如果size字节不可读,将返回null除非流已经结束,在这种情况下所有保留在内部缓冲区的数据将被返回。
如果没有指定size参数,则内部缓冲区包含的所有数据将返回。
readable.read()方法只应该在 paused 模式下的可读流上运行。在 flowing 模式下,readable.read()自动调用直到内部缓冲区的数据完全耗尽。
一般来说,建议开发人员避免使用’readable’事件和readable.read()方法,使用readable.pipe()或’data’事件代替。
注意:如果readable.read()方法返回一个数据块,那么一个’data’事件也将被发送。
注意:在已经被发出的’end’事件后调用stream.read([size])事件将返回null。不会抛出运行时错误。

demo:

CLENT:
const http = require('http');

const options = {
  hostname: '127.0.0.1',
  port: 8061,
  path: '/',
  method: 'POST'
};

const req = http.request(options, (res) => {
  console.log('状态码:', res.statusCode);
  console.log('请求头:', res.headers);
  res.setEncoding('utf8');

  res.on('data', (chunk) => {
    console.log(`响应主体: ${chunk}`);
  });

  res.on('end', () => {
    console.log('响应中已无数据。');
  });
});


req.write(JSON.stringify({'name':'isjs'}));
req.end();

SERVER:
const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
  	
  	req.on('readable', () =>{
  		 console.log('readable:', req.read());
	});
  	
  	res.write('Welcome to isjs');
  	res.end();
});

httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

//输出:
//start httpServer
//readable: 
//readable: null

readable.setEncoding(encoding)

说明:

readble.setEncoding() 方法会为从可读流读入的数据设置字符编码
默认返回Buffer对象。设置编码会使得该流数据返回指定编码的字符串而不是Buffer对象。
例如,调用readable.setEncoding(‘utf-8’)会使得输出数据作为UTF-8数据解析,并作为字符串返回。
调用readable.setEncoding(‘hex’)使得数据被编码成16进制字符串格式。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	req.on('data', (chunk) =>{
	 	console.log('requestData:', chunk);
	});

  	res.write('Welcome to isjs');
  	res.end();
});
// requestData: 

readable.destroy([error])

说明:

销毁流,并且触发error事件。然后,可读流将释放所有的内部资源。
开发者不应该覆盖这个方法,应该使用readable._destroy方法。

readable.pause()

说明:

readable.pause() 方法将会使 flowing 模式的流停止触发 ‘data’ 事件, 进而切出 flowing 模式。
任何可用的数据都将保存在内部缓存中。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	req.pause();

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{
	 	 console.log('There will be no more data.');
	});
  	
  	res.write('Welcome to isjs');
  	res.end();
});
// start httpServer
// There will be no more data.

readable.isPaused()

说明:

readable.isPaused() 方法返回可读流的当前操作状态。
该方法主要是在 readable.pipe() 方法的底层机制中用到。
大多数情况下,没有必要直接使用该方法。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	req.pause();

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{

 		console.log('There will be no more data.');
	});
	
	console.log(req.isPaused());

  	res.write('Welcome to isjs');
  	res.end();
});
// start httpServer
// true
// There will be no more data.

readable.resume()

说明:

readable.resume() 方法会重新触发 ‘data’ 事件, 将暂停模式切换到流动模式。
readable.resume() 方法可以用来充分使用流中的数据,而不用实际处理任何数据

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  	req.pause();

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{

 		console.log('There will be no more data.');
	});

	req.isPaused() && req.resume();

  	res.write('Welcome to isjs');
  	res.end();
});
// requestData: {"name":"isjs"}
// There will be no more data.

readable.pipe(destination[, options])

说明:

readable.pipe() 绑定一个 Writable 到 readable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。
其实就是把readable的数据传递给destination。
数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。
destination: 数据写入目标
options.end: 在 reader 结束时结束 writer 。默认为 true。
readable.pipe() 方法返回 目标流 的引用,这样就可以对流进行链式地管道操作:
当源可读流(the source Readable stream)触发 ‘end’ 事件时,目标流也会调用 stream.end() 方法从而结束写入。
要禁用这一默认行为, end 选项应该指定为 false。

demo:

const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
	
	req.pipe(res,{end:false});

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{
 		console.log('There will be no more data.');
	});

  	res.write('Welcome to isjs');
  	setTimeout(() => { res.end(); },1000);
});

httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

// 请求头: { date: 'Wed, 10 Jan 2018 03:15:03 GMT',
//   connection: 'close',
//   'transfer-encoding': 'chunked' }
// 响应主体: Welcome to isjs
// 响应主体: {"name":"isjs"}
// 响应中已无数据。

readable.unpipe([destination])

说明:

readable.unpipe() 方法将之前通过stream.pipe()方法绑定的流分离
如果 destination 没有传入, 则所有绑定的流都会被分离.
如果传入 destination, 但它没有被pipe()绑定过,则该方法不作为.
destination: 可选的,指定需要分离的目标流

demo:

const http = require('http');
 
const httpServer = http.createServer();
 
httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
	
	req.pipe(res,{end:false});
	req.unpipe(res);

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{
 		console.log('There will be no more data.');
	});

  	res.write('Welcome to isjs');
  	setTimeout(() => { res.end(); },1000);
});

httpServer.listen(8061, '127.0.0.1', () => {
	console.log('start httpServer');
});

// 请求头: { date: 'Wed, 10 Jan 2018 03:15:03 GMT',
//   connection: 'close',
//   'transfer-encoding': 'chunked' }
// 响应主体: Welcome to isjs
// 响应中已无数据。

readable.unshift(chunk)

说明:

readable.unshift() 方法会把一块数据压回到Buffer内部。
这在如下特定情形下有用: 代码正在消费一个数据流,已经”乐观地”拉取了数据。 又需要”反悔-消费”一些数据,以便这些数据可以传给其他人用。
注意: ‘end’ 事件已经触发或者运行时错误抛出后,stream.unshift(chunk) 方法不能被调用。

demo:

httpServer.on('request', (req, res) => {
  	// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  	// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
	
	req.unshift('{"name":"isjs.cn"}')

  	req.on('data', (chunk) =>{
	 	console.log(`requestData: ${chunk} `);
	});

	req.on('end', (chunk) =>{
 		console.log('There will be no more data.');
	});

  	res.write('Welcome to isjs');
  	setTimeout(() => { res.end(); },1000);
});
// start httpServer
// requestData: {"name":"isjs.cn"}
// requestData: {"name":"isjs"}
// There will be no more data.

readable.wrap(stream)

说明:

stream:老式可读流
在0.10版本之前,没有实现流的模块,以及对应的API
wrap方法可以使用老式的数据流来创建一个可读流。
一般很少需要用到wrap方法,他只对老的Node版本提供便利。

demo:

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});

Duplex流与Transform流

说明:

Duplex流是同时实现了Readable和 Writable接口的流。
Duplex 流的实例包括了:
TCP sockets
zlib streams
crypto streams

stream.Transform类

说明:

变换流(Transform streams)是一种Duplex流。
它的输出与输入是通过某种方式关联的。和所有Duplex流一样,变换流同时实现了Readable和Writable接口。
变换流的实例包括:
zlib streams
crypto streams

transform.destroy([error])

说明:

销毁这个流,发射’error’事件。调用这个方法之后,变换流会释放全部内部资源实现者不应该重载此方法,
而应该实现readable._destroy。 Transform的默认_destroy实现也发射’close’事件。

Stream的底层API

说明:

stream模块API的设计是为了让JavaScript的原型继承模式可以简单的实现流。
首先,一个流开发者可能声明了一个JavaScript类并且继承四个基本流类中的一个
(stream.Weiteable,stream.Readable,stream.Duplex,或者stream.Transform),
确保他们调用合适的父类构造函数:

注意:实现流的代码里面不应该出现调用“public”方法的地方,因为这些方法是给使用者使用的(流使用者部分的API所述)。
这样做可能会导致使用流的应用程序代码产生不利的副作用。

创建一个简单流

说明:

对于许多简单的案例,它是有可能在不依赖继承的情况下创建流。
通过流基础类stream.Writable,stream.Readable,stream.Duplex,或者stream.Transform传入对象直接创建流实例,
对象包含合适的方法作为构造函数选项。

demo:

const { Writable } = require('stream');
 
const myWritable = new Writable({
  	write(chunk, encoding, callback) {
   		console.log('my Writable');
   		//do some thing
  	}
});

实现一个 Writable Stream

说明:

这个stream.Writable类被用于实现可写流。
自定义可写流必须调用new stream.Writable([options])构造函数并且实现writable._write()方法。
writable._writev()方法也是可以实现的。

new stream.Writable([options])

说明:

使用 stream 模块的 Writable 构造方法可以创建一个自定义可写流
options:
highWaterMark: 缓冲大小当开始调用stream.write() 返回 false。默认16384 (16kb), 对于 objectMode 流为默认为16。
decodeStrings: 是否解码字符串在调用 stream._write() 传递到缓冲区之前。默认为 true
objectMode: 是否是一个有效的操作. 一旦设置,可以写字符串以外的值,例如Buffer 或者 Uint8Array 只要流支持。默认为false。
write: 实现stream._write() 方法。
writev: 实现stream._writev() 方法。
destroy: 实现stream._destroy() 方法。
final: 实现stream._final() 方法。

demo:

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)

说明:

chunk:要写的数据块。会一直作为缓冲区,除非decodeStrings选项设置为false或者流以对象模式运行。
encoding:如果块是字符串,那么encoding就是该字符串的字符编码。 如果块是Buffer,或者是流在对象模式下运行,encoding可能被忽略。
callback:调用此函数(err参数可选)当块处理完成时。
所有可写流实现必须提供一个 writable._write() 方法将数据发送到底层资源。
注意:Transform 流提供自己实现的writable._write()
注意:此函数不得直接由应用程序代码调用。 它应该由子类实现,并由内部的Writable类方法调用。
必须调用callback方法来表示写完成成功或失败,出现错误。callback第一个参数必须是Error对象如果调用失败,成功时为null。
需要重点注意的是,所有writable._write()被调用并且callback被调用将导致要缓冲的写入数据。
一旦调用callback,流将会执行’drain’事件。 如果想让流实现一次能够处理多个数据块,writable._writev()方法应该被实现。
如果在构造函数选项中设置decodeStrings属性,这是为了支持对某些字符串具有优化处理的实现数据编码。
如果decodeStrings属性显式设置为false,encoding参数可以安全地忽略,chunk将保持不变传递给.write()的对象。
writable._write()方法前缀为下划线,因为它是在定义它的类的内部,不应该直接调用用户程序

writable._writev(chunks, callback)

说明:

chunks:要写的块 每个块都有以下格式:{chunk:…,encoding:…}。
callback:一个回调函数(可选地带有一个错误参数)在提供的块的处理完成时被调用。
注:此函数不得直接通过应用程序代码调用。 它应由子类实现,并由内部Writable进行调用类方法。
writable._writev()方法能够一次处理多个数据块的流,该方法将缓存的所有数据块写入队列。
writable._writev()方法前缀为下划线,因为它是定义它的类的内部,不应该由用户程序直接调用。

writable._destroy(err, callback)

说明:

err: 错误
callback: 回调函数,err参数可选
通过 writable.destroy() 方法调用_destroy()。它可以被子类覆盖,但不能直接调用。

writable._final(callback)

说明:

callback:在完成写入所有剩余数据时调用该函数(err参数可选)
_final()方法不能直接调用。 应该由子类负责实现,如果是,将仅可以由内部的Writable类方法进行调用。
这个可选的函数将在流关闭之前被调用, 直到callback回调函数执行完成才触发finish事件。
这对于关闭资源或在流结束之前写入缓冲数据很有用。

一个可写流的例子:

说明:

下面说明了一个相当简单(有点无意义)的可写流实现。虽然这个具体的可写流实例没有任何真正的特殊用途,但该示例说明了一个自定义流实例所需要的元素:

demo:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }

  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}

实现一个 Readable Stream

说明:

用户实现的自定义可读流 必须 调用new stream.Readable([options]) 构造函数并且实现readable._read()方法。

new stream.Readable([options])

说明:

使用 stream 模块的 Readable 构造方法可以创建一个自定义可读流
options:
highWaterMark:从底层资源读取数据并存储在内部缓冲区中的最大字节数。默认16384 (16kb), 或者 16对应objectMode流模式。
encoding:指定解析的字符编码格式. 默认 为null
objectMode:一个对象的流。 这意味着 stream.read(n) 返回的是一个单一的对象而不是n个字节的缓冲区。默认 false
read: 对 stream._read()方法的实现
destroy: 对 stream._destroy() 方法的实现。

demo:

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)

说明:

所有实现可读流的实例必须实现readable._read() 方法去获得底层的数据资源。
当 readable._read() 被调用,如果读取的数据是可用的,应该在最开始的时候使用this.push(dataChunk)方法将该数据推入读取队列。
_read() 应该一直读取资源直到推送数据方法readable.push()返回false的时候停止。想再次调用_read()方法,需要再次往可读流里面push数据。
注意:一旦readable._read()方法被调用,只有在 readable.push()方法被调用之后,才能再次被调用。
size 可选参数。_read()方法是一个实现读取数据的单操作,设置size参数来确定要读取数据的大小。
其他的实现可能会忽略这个参数,只要数据可用就提供数据。 不需要等到stream.push(chunk)方法推入一定size的数据后才能调用。
readable._read()方法的前缀是一个下划线,因为它是在定义它的类的内部,不应该被直接调用用户程序。

readable._destroy(err, callback)

说明:

_destroy()需通过readable.destroy()方法调用。它可以被子类覆盖,但不能直接调用。
err:错误。
callback:回调函数,第一个参数为err参数

readable.push(chunk[, encoding])

说明:

chunk: 压入读队列的数据块。
encoding:字符串数据块的编码方式. 必须是可用的Buffer编码方式,例如’utf8′ 或 ‘ascii’
如果多余的数据块可以继续压入,那么返回true; 否则返回 false

一个可读流例子:

说明:

以下是可读流的一个基本例子,触发数字1到1,000,000升序,然后结束

demo:

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = '' + i;
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

实现一个 Duplex Stream

说明:

双工流(可读可写流)是可读流和可写流的实现,例如TCP套接字连接。
因为JavaScript不支持多重继承,所以stream.Duplex类被扩展以实现双工流(而不是扩展stream.Readable和stream.Writable类)。
注意。stream.Duplex类原型继承来自stream.Readable和寄生的stream.Writable,但是instanceof将会在这两个基础类上正确工作,
由于stream.Writable覆盖了 Symbol.hasInstance方法。
自定义双工流必须通过new stream.Duplex([options])构造函数并实现readable._read()和writable._write()

new stream.Duplex(options)

说明:

使用 stream 模块的 Duplex 构造方法可以创建一个自定义双工流。
options:
allowHalfOpen:默认是true. 如果设置为false, 那么当读端停止时,写端自动停止。
readableObjectMode: 默认是 false。 会为流的读端设置objectMode。 如果 objectMode是 true,那就没有任何用。
writableObjectMode: 默认是 false。 会为流的读端设置objectMode。 如果 objectMode是 true,那就没有任何用。
readableHighWaterMark: 会为流的读端设置HighWaterMark
writableHighWaterMark: 会为流的读端设置HighWaterMark

demo:

const { Duplex } = require('stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

一个双工流的例子:

说明:

面是一个可读可写流包装了一个假定的可读可写的底层源对象, 尽管用了一个与Node.js流不兼容的API。
下面是一个简单的例子, 在一个可读可写流中,来的buffers通过Writable 接口写入数据,再通过Readable接口读回数据。
尽管在一个对象实例中共存,读端和写端却是相互独立于彼此,这是可读可写流最为重要的一点。

demo:

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

实现一个Transform Stream

说明:

一个Transform流是一种Duplex流,输入经过Transform流,做某种计算然后输出。 比如 zlib流和crypto流会做压缩,加密和解密数据。
注意: 输出流的大小,有多少数据包,到达时间都不一定非要和输入流一样。
比如,一个哈希流再输入结束时永远只会输出单个数据块; 而一个zlib流的输出,可能比输入大得多或小得多。
stream.Transform 类被扩展了,实现了一个Transform流。
stream.Transform类最初继承自stream.Duplex,并且实现了它自己版本的writable._write()和readable._read()方法。
一般地,变换流必须实现transform._transform() 方法; 而transform._flush() 方法是非必须的。
注意: 用变换流时要注意,如果读端输出没有被消费,那么往写数据可能会引起写端暂停。

new stream.Transform([options])

说明:

使用 stream 模块的 Transform 构造方法可以创建一个自定义变换流。
options:
transform:对stream._transform()方法的实现。
flush:对stream._flush()方法的实现。

demo:

const { Transform } = require('stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

‘finish’事件 和 ‘end’事件

说明:

‘finish’事件来自stream.Writable;’end’事件来自stream.Readable类。
在调用了stream.end()并且stream._transform()处理了全部数据块之后, ‘finish’事件触发。
transform._flush()中的回调函数被调用之后,所有数据已经输出,此时,’end’事件触发