Node.js 流处理高效处理大数据的艺术什么是流在 Node.js 中流Stream是处理大量数据的抽象接口。它允许我们逐块读取或写入数据而不需要一次性将全部数据加载到内存中。为什么需要流想象一下处理一个 10GB 的日志文件如果使用fs.readFile会将整个文件加载到内存中可能导致内存溢出使用流可以逐块读取每处理完一块就释放内存流的四种类型1. Readable可读流用于读取数据例如从文件或网络读取。const fs require(fs); const readable fs.createReadStream(large-file.txt); readable.on(data, (chunk) { console.log(Received ${chunk.length} bytes); }); readable.on(end, () { console.log(Finished reading); });2. Writable可写流用于写入数据例如写入文件或发送到网络。const fs require(fs); const writable fs.createWriteStream(output.txt); writable.write(Hello, ); writable.write(World!); writable.end();3. Duplex双工流既可以读取也可以写入例如 TCP socket。const net require(net); const server net.createServer((socket) { socket.write(Hello from server); socket.on(data, (data) { console.log(Received: ${data}); }); });4. Transform转换流在读取和写入之间进行数据转换例如压缩、加密。const { Transform } require(stream); const upperCase new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } });流的核心概念背压Backpressure当写入速度慢于读取速度时数据会在内存中堆积导致内存溢出。流自动处理背压问题。readable.on(data, (chunk) { if (!writable.write(chunk)) { readable.pause(); } }); writable.on(drain, () { readable.resume(); });Pipe管道使用pipe方法可以自动处理背压是推荐的数据传输方式。const fs require(fs); const zlib require(zlib); fs.createReadStream(input.txt) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(input.txt.gz));实战创建自定义流创建自定义可读流const { Readable } require(stream); class NumberStream extends Readable { constructor(max) { super({ objectMode: true }); this.max max; this.current 1; } _read() { if (this.current this.max) { this.push(this.current); } else { this.push(null); } } } const stream new NumberStream(5); stream.on(data, (num) console.log(num));创建自定义转换流const { Transform } require(stream); class JSONParser extends Transform { constructor() { super({ readableObjectMode: true }); this.buffer ; } _transform(chunk, encoding, callback) { this.buffer chunk; let index; while ((index this.buffer.indexOf(\n)) ! -1) { const line this.buffer.slice(0, index); this.buffer this.buffer.slice(index 1); try { this.push(JSON.parse(line)); } catch (e) { console.error(Invalid JSON:, line); } } callback(); } _flush(callback) { if (this.buffer) { try { this.push(JSON.parse(this.buffer)); } catch (e) { console.error(Invalid JSON:, this.buffer); } } callback(); } }流的高级用法并发流处理const { pipeline, Transform } require(stream); const fs require(fs); const processor new Transform({ transform(chunk, encoding, callback) { const result processChunk(chunk); callback(null, result); } }); pipeline( fs.createReadStream(input.txt), processor, fs.createWriteStream(output.txt), (err) { if (err) { console.error(Pipeline failed:, err); } else { console.log(Pipeline succeeded); } } );流与 Promise 结合const { pipeline } require(stream/promises); const fs require(fs); async function processFile() { try { await pipeline( fs.createReadStream(input.txt), fs.createWriteStream(output.txt) ); console.log(Processing complete); } catch (err) { console.error(Error:, err); } }流在实际项目中的应用场景一日志处理const fs require(fs); const { createInterface } require(readline); const rl createInterface({ input: fs.createReadStream(access.log), crlfDelay: Infinity }); rl.on(line, (line) { const log parseLog(line); if (log.statusCode 400) { console.log(Error:, line); } });场景二数据转换const csv require(csv-parser); const fs require(fs); fs.createReadStream(data.csv) .pipe(csv()) .on(data, (row) { const json transformRow(row); writeToDatabase(json); }) .on(end, () { console.log(CSV parsing complete); });场景三HTTP 响应流const http require(http); const fs require(fs); http.createServer((req, res) { const stream fs.createReadStream(large-file.zip); res.writeHead(200, { Content-Type: application/zip }); stream.pipe(res); }).listen(3000);性能优化建议1. 使用适当的 highWaterMarkconst stream fs.createReadStream(file.txt, { highWaterMark: 64 * 1024 // 64KB });2. 避免不必要的数据转换尽可能在流中直接处理数据避免多次转换。3. 使用对象模式对于非二进制数据使用objectMode: true可以提高可读性。总结Node.js 流是处理大数据的利器掌握流的使用能够显著降低内存占用提高处理速度实现高效的数据管道从日志分析到文件处理从数据转换到 HTTP 响应流的应用无处不在。深入理解流的原理和用法将使你成为更优秀的 Node.js 开发者。
Node.js 流处理:高效处理大数据的艺术
发布时间:2026/5/21 8:32:27
Node.js 流处理高效处理大数据的艺术什么是流在 Node.js 中流Stream是处理大量数据的抽象接口。它允许我们逐块读取或写入数据而不需要一次性将全部数据加载到内存中。为什么需要流想象一下处理一个 10GB 的日志文件如果使用fs.readFile会将整个文件加载到内存中可能导致内存溢出使用流可以逐块读取每处理完一块就释放内存流的四种类型1. Readable可读流用于读取数据例如从文件或网络读取。const fs require(fs); const readable fs.createReadStream(large-file.txt); readable.on(data, (chunk) { console.log(Received ${chunk.length} bytes); }); readable.on(end, () { console.log(Finished reading); });2. Writable可写流用于写入数据例如写入文件或发送到网络。const fs require(fs); const writable fs.createWriteStream(output.txt); writable.write(Hello, ); writable.write(World!); writable.end();3. Duplex双工流既可以读取也可以写入例如 TCP socket。const net require(net); const server net.createServer((socket) { socket.write(Hello from server); socket.on(data, (data) { console.log(Received: ${data}); }); });4. Transform转换流在读取和写入之间进行数据转换例如压缩、加密。const { Transform } require(stream); const upperCase new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } });流的核心概念背压Backpressure当写入速度慢于读取速度时数据会在内存中堆积导致内存溢出。流自动处理背压问题。readable.on(data, (chunk) { if (!writable.write(chunk)) { readable.pause(); } }); writable.on(drain, () { readable.resume(); });Pipe管道使用pipe方法可以自动处理背压是推荐的数据传输方式。const fs require(fs); const zlib require(zlib); fs.createReadStream(input.txt) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(input.txt.gz));实战创建自定义流创建自定义可读流const { Readable } require(stream); class NumberStream extends Readable { constructor(max) { super({ objectMode: true }); this.max max; this.current 1; } _read() { if (this.current this.max) { this.push(this.current); } else { this.push(null); } } } const stream new NumberStream(5); stream.on(data, (num) console.log(num));创建自定义转换流const { Transform } require(stream); class JSONParser extends Transform { constructor() { super({ readableObjectMode: true }); this.buffer ; } _transform(chunk, encoding, callback) { this.buffer chunk; let index; while ((index this.buffer.indexOf(\n)) ! -1) { const line this.buffer.slice(0, index); this.buffer this.buffer.slice(index 1); try { this.push(JSON.parse(line)); } catch (e) { console.error(Invalid JSON:, line); } } callback(); } _flush(callback) { if (this.buffer) { try { this.push(JSON.parse(this.buffer)); } catch (e) { console.error(Invalid JSON:, this.buffer); } } callback(); } }流的高级用法并发流处理const { pipeline, Transform } require(stream); const fs require(fs); const processor new Transform({ transform(chunk, encoding, callback) { const result processChunk(chunk); callback(null, result); } }); pipeline( fs.createReadStream(input.txt), processor, fs.createWriteStream(output.txt), (err) { if (err) { console.error(Pipeline failed:, err); } else { console.log(Pipeline succeeded); } } );流与 Promise 结合const { pipeline } require(stream/promises); const fs require(fs); async function processFile() { try { await pipeline( fs.createReadStream(input.txt), fs.createWriteStream(output.txt) ); console.log(Processing complete); } catch (err) { console.error(Error:, err); } }流在实际项目中的应用场景一日志处理const fs require(fs); const { createInterface } require(readline); const rl createInterface({ input: fs.createReadStream(access.log), crlfDelay: Infinity }); rl.on(line, (line) { const log parseLog(line); if (log.statusCode 400) { console.log(Error:, line); } });场景二数据转换const csv require(csv-parser); const fs require(fs); fs.createReadStream(data.csv) .pipe(csv()) .on(data, (row) { const json transformRow(row); writeToDatabase(json); }) .on(end, () { console.log(CSV parsing complete); });场景三HTTP 响应流const http require(http); const fs require(fs); http.createServer((req, res) { const stream fs.createReadStream(large-file.zip); res.writeHead(200, { Content-Type: application/zip }); stream.pipe(res); }).listen(3000);性能优化建议1. 使用适当的 highWaterMarkconst stream fs.createReadStream(file.txt, { highWaterMark: 64 * 1024 // 64KB });2. 避免不必要的数据转换尽可能在流中直接处理数据避免多次转换。3. 使用对象模式对于非二进制数据使用objectMode: true可以提高可读性。总结Node.js 流是处理大数据的利器掌握流的使用能够显著降低内存占用提高处理速度实现高效的数据管道从日志分析到文件处理从数据转换到 HTTP 响应流的应用无处不在。深入理解流的原理和用法将使你成为更优秀的 Node.js 开发者。