暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【译】Node.js Stream API:理解与运用

背井 2021-03-03
1137

原文标题:Working with Node.js Stream API

原文作者:Darko Milosevic

原文链接:https://medium.com/florence-development/working-with-node-js-stream-api-60c12437a1be




注: 本文中 Stream 和 流 是一个意思,可互换。


介绍


Stream一词在计算机科学中用于描述分块数据集,这种数据集不是一次性可得的,而是跨时间获取的。一个Stream仅仅是一组值,类似于一个数组,但从空间向反转到了时间向(即Stream的值是随着时间不断产生的)。


在Node.js中,Stream 是实现用于处理流数据的API模块的名称。


自2009年首次引入Node.js Stream API 以来,它已经有了很大的发展。由于存在不同的实现方式,接口也混合了不同的风格,这个不断发展的API给用户造成了很大的困惑。


本文将关注最新的“Stream 3”实现,以及Node v10+附带的新的有用的api。





Stream Basics


所有的streams都是EventEmitter的实例,这意味着它们会发出用于读写数据的事件(events)。


Stream Types 流的类型


Node.js中有四种基本流类型。


Readable (可读流):

  • 对可读取和可消费的数据源的抽象

  • 示例:客户端上的HTTP响应、服务器上的HTTP请求、fs read streams、process.stdin等。


Writable (可写流):

  • 对可以写入数据的目标的抽象

  • 示例:服务器上的HTTP响应、客户端上的HTTP请求、fs write streams、process.stout、process.stderr等。


Duplex (双向流):

  • 同时实现可读(Readable)和可写(Writable)接口的流

  • 示例:TCP套接字(net.Socket)。


Transform (转换流):

  • 类似于双向流的流,具有在读写数据时修改或转换数据的能力

  • 示例:压缩流(zlib.createGzip)。


Stream Modes 流的模式


Node.js中的Stream有两种操作模式:


Standard mode (标准模式):

  • 默认设置的模式

  • 操作的数据类型是“STRING”和“BUFFER”(或“UInt8Array”)

  • Node.js的内部stream实现中使用的类型


Object mode(对象模式):

  • 创建流对象时要设定“objectMode”标识参数

  • 内部缓冲算法计算的是对象(objects)而不是字节(bytes)


Buffering(缓冲)


每个流都有一个用于存储数据的内部缓冲区。可读流和可写流各有一个,可以使用 Readable.readableBuffer 和 writeable.writeablebuffer 访问。


双向流 和 转换流 有两个独立的缓冲区,允许每侧独立操作。


缓冲区的大小由 highWatermarkOption 参数决定。对于在标准模式下运行的流,它指定缓冲区大小;对于在对象模式下运行的流,它指定对象数


Backpressure(背压)


背压这个概念对于刚开始使用Stream API的人来说很难理解,这也使得它成为一个常见的引发bug的源头。没有背压,流就不会这么有效率,背压是Stream最重要的特性之一。


背压是 writable stream 回传给 readable stream 的信号。当 readable stream 读取数据过快并且 writable 的内部缓冲区(由 highWatermarkOption 设置)的填充速度快于它可以被处理的速度时,信号就会发送出去。


该信号提醒 readable stream 先暂停发送更多的数据。Backpressure允许在 readable stream 和 writable stream 之间进行可靠的、基于pull的数据传输。


如果在传输数据时不考虑背压系统,可能会发生一些情况:

  • 系统内存耗尽

  • 当前进程减缓

  • 垃圾收集器压力过大


Backpressure支持可靠、无损和内存高效的数据传输,这是Node.js提供流API的首要目的。




API for Stream Consumers 可用的Stream API


许多Node.js应用程序会使用到流。熟悉 Stream API 可以让你正确地使用和消费「流」。


Consuming Writable Streams 消费可写流


每个 writable stream 的实例都有如下几个方法:


writable.write(chunk[, encoding][, callback])

  • 向流中写入一些数据

  • 如果内部缓冲区满了,方法返回false;否则返回true


writable.end([chunk][, encoding][, callback])

  • 指示写入流不再写入额外的数据。一个可选的「chunk」参数用于在关闭流之前写入最后一块数据。


writable.cork()

  • 强制让所有写入的数据缓冲在内存中。


writable.uncork()

  • 将自上次调用cork()之后的缓冲区的所有数据flush到目标源中。


writable.destroy()

  • 销毁这个stream。


下面的一段代码展示了可写入流实例的简单使用。它没有处理backpressure,这很可能不是你想要的:



writable stream 可以发出下面这些事件:


drain

  • 在“writeable.write()”由于背压而返回“false”且该压力已被释放后,如果适合继续向流中写入数据,则会生成此事件。


error

  • 如果在写入或通过管道(pipe)传输数据时发生错误,则生成此事件(发出此事件时流还未关闭)。


finish

  • 在调用了“writable.end()”方法并刷新(flush)了所有数据后发出。


close

  • 当流及其任何底层资源关闭时发出。


pipe

  • 在可读流上调用“.pipe()”方法时发出。


unpipe

  • 在可读流上调用“.unpipe()”方法时发出。


下面这个简单的例子,使用了手工的方法将数据适当地写入到 writable stream (没使用 readable.pipe() ),并将背压考虑在内



上面是一个简单的例子,因为它只在一个循环中写同一个句子。最重要的是管理背压


Backpressure,以及其它的一些事项,可以通过“readable.pipe()”方法方便地处理,该方法如下所示:



我们将在下文中进一步详细介绍 readable.pipe() 方法。


在手工写数据到writable stream时,不仅要关注背压,监听错误也很重要。


下面是一个完整的手动写数据到到 writable stream 的示例,其中考虑了背压、正确的错误处理和写入后操作(我们给的例子中是记录日志):



Consuming Readable Streams 使用可读流


可读流可以在两种模式下工作:


  • flowing:从底层系统自动读取数据并尽快提供给应用程序

  • paused:必须显式调用“readable.read()”方法才能读取数据块


(文档中也提到了对象模式 object mode,但它是一个单独的特性,flowing 和 paused 可读流都可以处于或不处于对象模式)


所有可读流都以 paused 模式启动。要从 paused模式 切换到 flowing模式,必须执行以下操作之一,这将在下一节中详细介绍:


  • 添加 data 事件侦听器

  • 调用 readable.resume() 方法

  • 使用 readable.pipe() 将可读流定向到可写流


要切换回paused模式,必须执行以下操作之一:


  • 如果没有 pipe() 目标,则调用 readable.pause()

  • 如果存在 pipe() 目标,则删除这些目标(readable.unpipe()可以帮上忙)




有四种使用可读流的方法。开发人员应该选择其中一种。对于单个可读流,混合使用这些API会导致意想不到的行为。


1. 使用 readable.pause()readable.resume() 和 data 事件:


`data` 事件

  • 当该stream传送数据块时发出(添加监听器后自动切换到flowing模式)


`readable.pause()`

  • 暂停当前流,将其切换到paused模式


`readable.resume()`

  • 将当前流切换为flowing模式


下面一个可读流的例子,它被消耗,数据被写入标准输出。不是特别有用,但它可以作为示范:



2. 使用 readable.read()readable 事件


`readable` event

  • 在有某些底层数据需要被读取时触发(当添加“readable”监听器后,流切换到paused模式)


`readable.read([size])`

  • 从内部缓冲区中提取一些数据并返回。如果没有要读取的数据,则返回“null”。默认情况下,如果未指定编码,则数据将作为 “Buffer” 返回。


这是一个类似于上面的例子,但是使用了第二种方式来使用可读流:



3. 使用 readable.pipe() 


`readable.pipe(writable[, options])`

  • 将可写流附加到可读流,可读流自动切换到flowing模式并使可读流将其所有数据传递到附加的可写流。数据流(即背压)将自动处理。


这是使用可读流的几种方式中最方便的一种,因为它并不冗长,会自动处理背压以及在结束后自动关闭流。



从之前的代码片段中复制的一个简单示例:



有一样东西不是自动管理的,那就是错误处理和传播。例如,如果要在发生错误时关闭每个流,则必须监听「error」事件。


带有error事件处理的pipe示例:



4. 使用 Async Iteration Async Generator


  • 可读流实现了「Symbol.asyncIterator」方法,因此可以使用'for await of'对它们进行迭代`。


异步生成器(Async Generator)在Node v10+中正式可用。异步生成器是异步函数和生成器函数的混合体。它们实现了Symbol.asyncIterator方法,并可用于异步迭代。通常,流是跨时间的数据块集合,因此异步生成器非常适合。下面是一个例子:



Consuming Duplex and Transform Streams 使用双向流和转换流


双向流同时实现了 Readable 和 Writable 接口。PassThrough流 就是一种双向流。当某些API期望可读流作为参数,并且你还希望手动写入一些数据时,就使用这种类型的流。


要同时满足两种需要:

  • 创建 PassThrough流 的实例

  • 将流传给API(API将使用这个流的 Readable 接口)

  • 向流中添加一些数据(使用流的 Writable 接口)


过程如下所示(误: 示例中的body应该是uploadStream)



转换流也是双向流。这些流具有可读和可写接口,但它们的主要目的是转换传递的数据。


最常见的示例是使用来自“zlib”模块的 内置转换流 压缩数据:





Useful class methods 有用的类方法(Node v10+)


Stream.finished(stream, callback)

  • 允许我们在流不再可读、不可写或遇到错误或过早关闭时收到通知。


此方法对于错误处理或在流被使用后执行进一步操作时非常有用。例如:



Stream.pipeline(…streams[, callback])

  • 该方法允许在多个流之间进行pipe操作,并提供了完整的错误处理、资源清理和结束时回调。


该方法是建造pipeline最干净和最简洁的方式。与 readable.pipe() 不同,所有内容都是自动处理的,包括错误传播和进程结束后资源的清理。例如:





API for Stream Implementers 给Stream实现者的API


Stream API是可扩展的,它为开发人员提供了一个接口来创建他们自己的扩展。有两种方法可以实现你自己的Stream。


1. 扩展适当的父类



新的stream类必须实现一个或多个特定的方法,具体取决于所创建的流的类型(我们在对每种类型的流进行实现时,会列出这些方法)。


这些方法都以下划线作为前缀,它们只在实现新流时使用。如果用户在使用时调用它们,将导致意外行为(即,这些方法是给实现者用的,不是给用户端用的)


2. 通过直接创建实例并提供适当的方法作为构造函数的参数,以简化的方式扩展流:



要特别记住的是,在这种情况下,所需方法不需要下划线前缀。


实现 Writable Stream


为了实现可写流,我们需要提供一个 writable._write() 方法来向底层资源发送数据:


`writable._write(chunk, encoding, callback)`

  • chunk:要写入的chunk 数据块

  • encoding:如果块是 String 类型,则需要指定

  • callback:在写入已完成或失败时必须调用此方法


一个简单的writable stream的实现:



此流将标准输入打印到标准输出,当输入「/」时,流抛出异常。这个例子是出于演示的目的而写。


实现一个Readable Stream


要实现自定义的可读流,我们必须调用其构造函数并实现 readable._read() 方法(其他方法是可选的),且read方法内部必须调用 readable.push()


`readable._read(size)`

  • 调用此方法时,如果源中有数据可用,则实现者应调用 this.push(dataChunk) 方法将该数据推入读取队列

  • size:要异步读取的字节数


`readable.push()`

  • 仅由实现者调用,并且只能从 readable._read() 方法中调用。调用时,数据块将添加到内部队列中,供流的用户使用(“null”是一个终止字符)。


下面这个可读流的实现,是在一分钟内每秒生成1到10的随机整数,然后结束数据生成并关闭自身。



实现一个Duplex Stream


双向流实现了可读和可写接口,它们彼此独立。Duplex类的prototype继承自stream.Readable,寄生于stream.Writable(JavaScript不支持多重继承)。


要创建双向流的自定义实现,必须实现可写和可读流的每个必需方法,即 readable._read()writeable._write()


下面的流将 stdin(可写端)的内容打印出来,并将一些随机表情写出到stdout(可读端),直到sad smiley(:()出现,随即终止可读流。



实现一个Transform Stream


转换流类似于双向流(其实它是双向流的一种),但具有更简单的接口。输出是根据输入计算出来的。不要求输出与输入的长度相同,不要求数据块的数量相同,也不要求数据同时到达。


实现转换流只需要一个方法,即 transform._transform() 方法(“transform._flush()”是可选的)。


`transform._transform(chunk, encoding, callback)`

  • 处理正在写入的字节,计算输出,然后使用“readable.push()”方法将输出传递给可读部分。可以为一个接收的块多次调用来生成输出,或者根本不生成输出

  • chunk:要写入的数据块

  • encoding:String类型的块需要指定此值

  • callback:(err,transformedChunk)


`transform._flush(callback)` — 可选.

  • 在某些情况下,当一些计算完成时,转换操作可能需要在流的末尾发出额外的数据。在流结束之前,此方法刷新数据。





总结


通过本文,我们学习了如何使用Node.js的所有Stream类型。我们还学习了如何实现自己的Stream并利用好它们强大的功能。


Node.js的Stream以难以使用而闻名,但是伴随对它们独特的API的理解,它们也会成为开发者非常宝贵的工具。

文章转载自背井,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论