nodejs流
什么是流
流是nodejs中很重要的一个概念,是用于在 Node.js 中处理流数据的抽象接口,它能以一种以高效的方式处理读/写文件、网络通信、或任何类型的端到端的信息交换。
在传统的方式中,处理文件时会将文件的所有内容读取到内存中,然后再处理,假如遇到大文件,这种处理方式不仅低效还可能影响服务器性能,而流能解决这类问题,流每次都读取一部分内容到内存中处理,而不是一次性读取所有内容,这在处理大文件时很有用。
流的类型
Node.js 中有四种基本的流类型:
Writable: 可以写入数据的流(例如,fs.createWriteStream())。
Readable: 可以从中读取数据的流(例如,fs.createReadStream())。
Duplex:Readable和Writable的流(例如,net.Socket)。
Transform: 可以在写入和读取数据时修改或转换数据的Duplex流(例如,zlib.createDeflate())。
许多 Node.js 核心模块提供了原生的流处理功能,最值得注意的有:
process.stdin返回连接到 stdin 的流。
process.stdout返回连接到 stdout 的流。
process.stderr返回连接到 stderr 的流。
fs.createReadStream()创建文件的可读流。
fs.createWriteStream()创建到文件的可写流。
net.connect()启动基于流的连接。
http.request()返回 http.ClientRequest 类的实例,该实例是可写流。
zlib.createGzip()使用 gzip(压缩算法)将数据压缩到流中。
zlib.createGunzip()解压缩 gzip 流。
zlib.createDeflate()使用 deflate(压缩算法)将数据压缩到流中。
zlib.createInflate()解压缩 deflate 流。
可读流
读取大文件
以读取文件为例
const reader = fs.createReadStream("./data.txt");
这段代码通过
fs.createReadStream()创建了一个可读流,然后将data.txt的内容片段化地读取出来。两种模式
可读流有两种模式:流动和暂停。
当创建reader时,可读流处于暂停
pause模式,此时不会进行读取操作,但可以通过以下方式之一切换到流动模式:- 添加
'data'事件句柄。
- 调用
stream.resume()方法。
- 调用
stream.pipe()方法将数据发送到Writable。
当可读流处于流动模式时,可以通过
reader.pause()或者reader.unpipe()来转换成暂停模式。// 自动转成流动模式,此时会将内容打印到屏幕上 reader.pipe(process.stdout); // 如果执行下面命令,则会使reader流转成暂停模式 reader.pause(); // 或者 reader.unpipe(process.stdout);
常用事件
可读流提供一些事件,其中最值得注意的有
- error
- data
reader.on("data", (chunk) => { console.log(chunk.toString()); } // 类似于 // reader.pipe(process.stdout);
可写流
写入大文件
下面我们实现一个大文件的写入操作
const reader = fs.createReadStream("./data.txt"); const writer = fs.createWriteStream("./data2.txt"); reader.pipe(writer);
这段代码首先创造可读流读取了
data.txt(假设是一个超大的文件),然后将可读流通过管道流入可写流中。这种是将可读流的内容流入到可写流中,当然你也可以直接写入可写流。
const writer = fs.createWriteStream("./data2.txt"); writer.write("hello world\n"); writer.write("hello node\n"); writer.end("end");
事件
可写流最值得关心的事件有
- error 在写入或管道数据时发生错误时触发
- drain
'drain'事件将在适合继续将数据写入流时触发。
- finish 在调用
stream.end()方法之后,并且所有数据都已刷新到底层系统时触发
const reader = fs.createReadStream(data1); const writer = fs.createWriteStream("./data2.txt"); writer.on("drain", () => { // 每次写入chunk时触发 // 如果内容体积太小,可以一次性写入,则不会触发该事件 console.log("drain"); }); writer.on("finish", () => { // 最后一次写入chunk时触发 console.log("finish"); }); reader.pipe(writer);
双工流和转换流
双工流是实现Readable和Writable的流,实际上就是实现
_read和_write的类,既能当做可读流又能当做可写流。转换流则是在双工流的基础上增加了transform方法,转换流会将输入经过transform后转换成输出。
Nodejs中的的压缩流和加密流都载内部实现了转换流。下面将以压缩流来演示转换流的工作方式。
const zlib = require("zlib"); const fs = require("fs"); fs.createReadStream("./data.txt") .pipe(zlib.createGzip()) .pipe(fs.createWriteStream("./test.txt.gz"));
通过fs来创建了可读流和可写流,目的是将
data.txt内容读取出来然后保存到test.txt.gz文件中,而在流的流动过程中会经历转换流zlib.createGzip(),它的作用是将可读流的内容进行转换,并提供给后面的可写流。流的API
pipe()
readable.pipe() 方法将 Writable 流绑定到 readable,使其自动切换到流动模式并将其所有数据推送到绑定的 Writable。 数据流将被自动管理,以便目标 Writable 流不会被更快的 Readable 流漫过。注意事项如果Readable流在处理过程中触发错误,则Writable目标不会自动关闭。 如果发生错误,则需要手动关闭每个流以防止内存泄漏(通过this.destroy())。
unpipe()
reader.pipe(writer); reader.unpipe(writer);
destroy()
在流中触发的异常,并不会传播到
error事件中,也不会自动关闭释放资源,但出现异常时,需要手动执行this.destory(err)来销毁流。销毁流 可选地触发
'error' 事件,并且触发 'close' 事件(除非 emitClose 设置为 false)。 在此调用之后,可读流将释放任何内部资源,随后对 push() 的调用将被忽略。所有流中出现的异常都必须调用
this.destroy进行捕获,否则可能会导致内存泄露const myReadable = new Readable({ read(size) { fs.open("./xxx", (err) => { if (err) { this.destroy(err); } }); }, }); myReadable.on("error", (err) => { console.log("err", err); }); myReadable.on("close", () => { console.log("close"); }); myReadable.pipe(process.stdout);
_construct()
_construct是流的一个内部方法,是可选的。_construct方法接收一个参数callback,当实现了_construct方法时,_write()、_final()、_read() 和 _destroy() 等操作的调用都会被延迟,直到执行了callback。callback可以传递一个Error类型的参数。const myWritable = new Writable({ construct(callback) { try { this.fd = fs.openSync("./input.txt", "a+"); callback(); } catch (err) { callback(err); } }, write(chunk, encoding, callback) { fs.write(this.fd, chunk, (err) => { if (err) this.destroy(err); callback(); }); }, }); fs.createReadStream("./data.txt").pipe(myWritable);
push()
Readable上的方法,添加chunk到可读流中,可选指定encoding。
const myReadable = new Readable({ read(size) {}, }); myReadable.push("123\n"); myReadable.push("123\n"); myReadable.push(null); myReadable.push("123"); // 报错,不能在EOF后push新的数据 myReadable.pipe(process.stdout);
当
chunk 作为 null 传递时,表示流结束 (EOF),之后不能再写入数据。实现自定义流
除了使用nodejs内置的流外,我们还可以自己实现流。
实现流的方式
实现流有三种方式,分别是通过类、构造函数或对象模式来实现流。
const {Readable} = require('stream') // 类 class MyReadable extends Readable{ constructor(options){ super(options) } _read(){} } // 构造函数 const myReadable = new Readable() myReadable._read = function(){} // 对象模式 const myReadable = new Readable({ read(){} })
下面将统一采用对象模式或类的形式实现流。
实现可读流
实现可读流必须要实现
_read()方法,以从底层资源中获取数据const myReadable = new Readable({ read(size){} })
调用
readable.read() 时,如果资源中的数据可用,则实现应开始使用 this.push(dataChunk) 方法将该数据推送到读取队列中。 一旦流准备好接受更多数据,则 _read() 将在每次调用 this.push(dataChunk) 后再次调用。 _read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。 只有当 _read() 停止后再次被调用时,它才能继续将额外的数据推入队列。size参数是可选的,它的作用是确定每次push的数据量。实现可写流
和可读流类似,可写流必须实现
_write()方法 或/和 writev()。const myWritable = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); }, }); process.stdin.pipe(myWritable);
writable会持续传入片段数据(chunk)并执行writable.write()。 如果流实现能够同时处理多个数据块,则应实现 writable._writev() 方法。encoding表示chunk的编码,如果块是字符串,则
encoding 是该字符串的字符编码。 如果块是 Buffer,或者如果流在对象模式下运行,则可以忽略 encoding。如果需要监听
'drain' 事件,则必须执行callback函数,按照nodejs风格,第一个参数传Error,如果没有异常则传null。实现类似
fs.createWriteStream的效果。const myWritable = new Writable({ write(chunk, encoding, callback) { fs.writeFile("./input.txt", chunk, (err) => { callback(err, chunk); }); }, }); process.stdin.pipe(myWritable);
实现双工流
双工流实际上就是实现
readabale和writable的流,因此我们需要实现duplex._read和duplex._write。const { Duplex } = require("stream"); class MyDuplex extends Duplex { _read(chunk) {} _write(chunk) { this.push(chunk); } } const myDuplex = new MyDuplex(); process.stdin.pipe(myDuplex).pipe(process.stdout);
process.stdin获取输入流,然后通过pipe流到myDuplex中,此时myDuplex是作为可写流,在这个过程中MyDuplex._write会被多次调用来片段化地写入数据,我们在这里通过this.push()将片段数据chunk添加,然后myDuplex就可以作为可读流来讲内容流入到可写流process.stdout,即屏幕上。实现转换流
转换流在双工流的基础上实现了transform,它能够转换内容,例如前面讲过的压缩流。
const zlib = require("zlib"); const fs = require("fs"); fs.createReadStream("./data.txt") .pipe(zlib.createGzip()) // 转换 .pipe(fs.createWriteStream("./test.txt.gz"));
我们可以实现一个简单的转换流,将文本每一行内容的前面都加上行号
let count = 0; const myTransform = new Transform({ transform(chunk) { this.push(`[${count}] ` + chunk); }, }); fs.createReadStream("./data.txt").pipe(myTransform).pipe(process.stdout);