死去的记忆突然攻击我,一份来自四年前使用递归写的遍历目录的方案发生了内存溢出,研究了一下旧代码,发现晦涩难懂,索性重写遍历方法
先简单说一下我们的需求,我们要实现一个简单的目录迭代器,这个迭代器支持 在调用 .next
后返回一个新路径,并要求性能够高,内存占用尽量少,同时可以支持多线程调用(Promise)
常见的方案
1. 递归遍历(同步)
这是最简单且常见的方案之一。通过递归地读取目录和子目录,处理每个文件或目录。在这种方案中,常用的 fs.readdirSync 会阻塞线程,直到完成读取。
优点:
缺点:
- 阻塞 I/O,性能差,尤其在文件数量较多时。
- 无法处理大规模文件系统遍历。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import * as fs from "fs"; import path from "path";
const traverseSync = (dir: string) => { const items = fs.readdirSync(dir); for (const item of items) { const fullPath = path.join(dir, item); const stats = fs.statSync(fullPath); if (stats.isDirectory()) { traverseSync(fullPath); } else { console.log(fullPath); } } };
|
2. 递归遍历(异步)
为了避免阻塞主线程,可以使用异步 API(如 fs.promises.readdir)来异步读取目录内容。这种方法通过事件循环处理每一个目录,适合处理大规模文件系统。
优点:
缺点:
- 需要处理并发问题。
- 编写代码时需要考虑异步错误处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import * as fs from "fs"; import path from "path";
const traverseAsync = async (dir: string) => { const items = await fs.promises.readdir(dir, { withFileTypes: true }); for (const item of items) { const fullPath = path.join(dir, item.name); if (item.isDirectory()) { await traverseAsync(fullPath); } else { console.log(fullPath); } } };
|
3. 使用异步生成器(我的实现)
使用异步生成器 (AsyncGenerator) 是一种非常高效且灵活的遍历方案。它允许你在遍历目录时逐步返回每个文件或目录,并能够在需要时暂停或继续遍历。
优点:
- 高效并且具有良好的内存控制。
- 可以与
for-await-of
循环配合使用,易于管理。 - 可以轻松暂停或控制遍历的进度(例如通过
yield
控制)。
缺点:
- 需要理解生成器和异步操作。
- 需要手动处理递归的异步调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import * as fs from "fs"; import path from "path";
interface Context { filePath: string; }
export class FileTraverser { private num = 0;
private sleep(time: number) { return new Promise((resolve) => { setTimeout(() => { resolve(true); }, time); }); }
async *traverse(dirname: string): AsyncGenerator<Context | null> { try { const dirents = fs.readdirSync(dirname, { withFileTypes: true }); this.num++; if (this.num % 500 === 0) { await this.sleep(0); } for (const dirent of dirents) { const filepath = path.join(dirname, dirent.name); try { if (dirent.isDirectory()) { for await (const item of this.traverse(filepath)) { yield item; } } else if (dirent.isFile()) { yield { filePath: filepath }; } } catch (e) { console.log(e); } } } catch (e) {} } }
|
调用方式
单线程调用统计文件数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| const traverser = new FileTraverser().traverse(dir);
for await (const item of traverser) { console.log(item); }
while (!canceled) { const { done, value } = await traverser.next(); if (done) { return; } try { console.log(value.filePath); } catch (e) { this.logger.error("出错", e); } }
|
4. 广度优先遍历(BFS)
在广度优先遍历中,目录会按照层级逐层遍历,而不是递归进入深层目录。这种方法适合需要按层级处理文件的场景。这种方式需要记下所有待遍历的目录和文件,然后逐个遍历。可以简单理解为将根目录所有文件入栈,再进行出栈
优点:
- 可以避免深递归带来的栈溢出问题。
- 在某些应用场景下,广度优先遍历是更合理的选择(例如,当需要优先处理浅层目录时)。
缺点:
- 需要使用队列或其他数据结构来存储待遍历的目录和文件。
- 对内存的需求相对较高,因为需要存储所有待处理的目录。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import * as fs from "fs"; import path from "path";
const traverseBFS = async (dir: string) => { const queue = [dir]; while (queue.length > 0) { const currentDir = queue.shift(); if (currentDir) { const items = await fs.promises.readdir(currentDir, { withFileTypes: true, }); for (const item of items) { const fullPath = path.join(currentDir, item.name); if (item.isDirectory()) { queue.push(fullPath); } else { console.log(fullPath); } } } } };
|
实现优化
在上面的代码中,FileTraverser
类通过递归方式读取目录中的文件。在每处理完 500 个文件后,通过 sleep(0)
使线程短暂休眠,避免阻塞主线程。然而,使用 Promise
实现的 sleep
会导致多线程调用时的不安全。
删除 FileTraverser
的 sleep
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import * as fs from "fs"; import path from "path";
interface Context { filePath: string; }
export class FileTraverser { async *traverse(dirname: string): AsyncGenerator<Context | null> { try { const dirents = fs.readdirSync(dirname, { withFileTypes: true }); for (const dirent of dirents) { const filepath = path.join(dirname, dirent.name); try { if (dirent.isDirectory()) { for await (const item of this.traverse(filepath)) { yield item; } } else if (dirent.isFile()) { yield { filePath: filepath }; } } catch (e) { console.log(e); } } } catch (e) {} } }
|
这时会面临一个很严重的问题,这种方案当调用方处理过快时会阻塞主线程,需要在调用方中进行 sleep
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| let cnt = 0; while (!canceled) { cnt++; if (cnt % 500 === 0) { await this.sleep(0); } const { done, value } = await traverser.next(); if (done) { return; } try { } catch (e) { this.logger.error("出错", e); } }
|
测试及示例
最终测试统计 C 盘文件数,用时 1 分半
多线程调用示例,外部 rxjs 每发送一个 next 信号,抛出一个文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
private walkDir(dir: string, next$: Observable<unknown>): Observable<WalkDirItem> { return new Observable<WalkDirItem>(subscriber => { const traverser = new FileTraverser().traverse(dir)
const processNextItem = async (traverser: AsyncIterableIterator<any>, subscriber: Subscriber<WalkDirItem>): Promise<void> => { while (true) { try { const { done, value } = await traverser.next() if (!done) { subscriber.next({ filepath: value.filePath, stats: fs.statSync(value.filePath) }) break } else { subscriber.complete() break } } catch (e) { this.logger.error("遍历目录出错,正在重试...", e) } } } let cnt = 0 next$.subscribe({ next: async () => { cnt++ if (cnt % 500 === 0) { setTimeout(() => { processNextItem(traverser, subscriber).then() }, 0) } else { processNextItem(traverser, subscriber).then() } } }) }) }
|
代码整理较混乱,见谅