一个简单的 nodejs 目录迭代器

tsvico Lv5

死去的记忆突然攻击我,一份来自四年前使用递归写的遍历目录的方案发生了内存溢出,研究了一下旧代码,发现晦涩难懂,索性重写遍历方法

先简单说一下我们的需求,我们要实现一个简单的目录迭代器,这个迭代器支持 在调用 .next 后返回一个新路径,并要求性能够高,内存占用尽量少,同时可以支持多线程调用(Promise)

常见的方案

1. 递归遍历(同步)

这是最简单且常见的方案之一。通过递归地读取目录和子目录,处理每个文件或目录。在这种方案中,常用的 fs.readdirSync 会阻塞线程,直到完成读取。

优点:

  • 简单易懂。
  • 不需要复杂的控制结构。

缺点:

  • 阻塞 I/O,性能差,尤其在文件数量较多时。
  • 无法处理大规模文件系统遍历。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import * as fs from "fs";
import path from "path";

const traverseSync = (dir: string) => {
const items = fs.readdirSync(dir);
for (const item of items) {
const fullPath = path.join(dir, item);
const stats = fs.statSync(fullPath);
if (stats.isDirectory()) {
traverseSync(fullPath); // 递归遍历子目录
} else {
console.log(fullPath); // 处理文件
}
}
};

2. 递归遍历(异步)

为了避免阻塞主线程,可以使用异步 API(如 fs.promises.readdir)来异步读取目录内容。这种方法通过事件循环处理每一个目录,适合处理大规模文件系统。
优点:

  • 非阻塞,适合处理大规模的文件系统。
  • 性能相对较高。

缺点:

  • 需要处理并发问题。
  • 编写代码时需要考虑异步错误处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import * as fs from "fs";
import path from "path";

const traverseAsync = async (dir: string) => {
const items = await fs.promises.readdir(dir, { withFileTypes: true });
for (const item of items) {
const fullPath = path.join(dir, item.name);
if (item.isDirectory()) {
await traverseAsync(fullPath); // 递归遍历子目录
} else {
console.log(fullPath); // 处理文件
}
}
};

3. 使用异步生成器(我的实现)

使用异步生成器 (AsyncGenerator) 是一种非常高效且灵活的遍历方案。它允许你在遍历目录时逐步返回每个文件或目录,并能够在需要时暂停或继续遍历。
优点:

  • 高效并且具有良好的内存控制。
  • 可以与 for-await-of 循环配合使用,易于管理。
  • 可以轻松暂停或控制遍历的进度(例如通过 yield 控制)。

缺点:

  • 需要理解生成器和异步操作。
  • 需要手动处理递归的异步调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import * as fs from "fs";
import path from "path";

interface Context {
filePath: string;
}

export class FileTraverser {
private num = 0;

private sleep(time: number) {
return new Promise((resolve) => {
setTimeout(() => {
resolve(true);
}, time);
});
}

// 改成异步生成器,使用 yield 来返回每个文件或目录
async *traverse(dirname: string): AsyncGenerator<Context | null> {
try {
const dirents = fs.readdirSync(dirname, { withFileTypes: true });
this.num++;
if (this.num % 500 === 0) {
await this.sleep(0);
}
for (const dirent of dirents) {
const filepath = path.join(dirname, dirent.name);
try {
if (dirent.isDirectory()) {
for await (const item of this.traverse(filepath)) {
yield item;
}
} else if (dirent.isFile()) {
yield { filePath: filepath };
}
} catch (e) {
console.log(e);
}
}
} catch (e) {}
}
}

调用方式

单线程调用统计文件数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const traverser = new FileTraverser().traverse(dir);

for await (const item of traverser) {
console.log(item);
}

// 或者
while (!canceled) {
const { done, value } = await traverser.next();
if (done) {
return;
}
// 处理当前 item
try {
console.log(value.filePath);
// 文件数++
} catch (e) {
this.logger.error("出错", e);
}
}

4. 广度优先遍历(BFS)

在广度优先遍历中,目录会按照层级逐层遍历,而不是递归进入深层目录。这种方法适合需要按层级处理文件的场景。这种方式需要记下所有待遍历的目录和文件,然后逐个遍历。可以简单理解为将根目录所有文件入栈,再进行出栈
优点:

  • 可以避免深递归带来的栈溢出问题。
  • 在某些应用场景下,广度优先遍历是更合理的选择(例如,当需要优先处理浅层目录时)。

缺点:

  • 需要使用队列或其他数据结构来存储待遍历的目录和文件。
  • 对内存的需求相对较高,因为需要存储所有待处理的目录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import * as fs from "fs";
import path from "path";

const traverseBFS = async (dir: string) => {
const queue = [dir]; // 队列存储待遍历的目录
while (queue.length > 0) {
const currentDir = queue.shift();
if (currentDir) {
const items = await fs.promises.readdir(currentDir, {
withFileTypes: true,
});
for (const item of items) {
const fullPath = path.join(currentDir, item.name);
if (item.isDirectory()) {
queue.push(fullPath); // 将目录添加到队列
} else {
console.log(fullPath); // 处理文件
}
}
}
}
};

实现优化

在上面的代码中,FileTraverser 类通过递归方式读取目录中的文件。在每处理完 500 个文件后,通过 sleep(0) 使线程短暂休眠,避免阻塞主线程。然而,使用 Promise 实现的 sleep 会导致多线程调用时的不安全。

删除 FileTraversersleep 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import * as fs from "fs";
import path from "path";

interface Context {
filePath: string;
}

export class FileTraverser {
async *traverse(dirname: string): AsyncGenerator<Context | null> {
try {
const dirents = fs.readdirSync(dirname, { withFileTypes: true });
for (const dirent of dirents) {
const filepath = path.join(dirname, dirent.name);
try {
if (dirent.isDirectory()) {
for await (const item of this.traverse(filepath)) {
yield item;
}
} else if (dirent.isFile()) {
yield { filePath: filepath };
}
} catch (e) {
console.log(e);
}
}
} catch (e) {}
}
}

这时会面临一个很严重的问题,这种方案当调用方处理过快时会阻塞主线程,需要在调用方中进行 sleep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 单线程调用统计文件数
let cnt = 0;
while (!canceled) {
cnt++;
if (cnt % 500 === 0) {
await this.sleep(0);
}
const { done, value } = await traverser.next();
if (done) {
return;
}
// 处理当前 item
try {
// do something
} catch (e) {
this.logger.error("出错", e);
}
}

测试及示例

最终测试统计 C 盘文件数,用时 1 分半

多线程调用示例,外部 rxjs 每发送一个 next 信号,抛出一个文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 遍历目录
* @param dir 将遍历的目录
* @param next$ 外部rxjs,在需要新的文件时发送一个next
* @return 返回遍历的结果,每次抛出一个文件
*/
private walkDir(dir: string, next$: Observable<unknown>): Observable<WalkDirItem> {
return new Observable<WalkDirItem>(subscriber => {
const traverser = new FileTraverser().traverse(dir)

const processNextItem = async (traverser: AsyncIterableIterator<any>, subscriber: Subscriber<WalkDirItem>): Promise<void> => {
while (true) {
try {
const { done, value } = await traverser.next()
if (!done) {
subscriber.next({
filepath: value.filePath,
stats: fs.statSync(value.filePath)
})
break
} else {
subscriber.complete()
break
}
} catch (e) {
this.logger.error("遍历目录出错,正在重试...", e)
}
}
}
let cnt = 0
next$.subscribe({
next: async () => {
cnt++
if (cnt % 500 === 0) {
setTimeout(() => {
processNextItem(traverser, subscriber).then()
}, 0)
} else {
processNextItem(traverser, subscriber).then()
}
}
})
})
}

代码整理较混乱,见谅

  • 标题: 一个简单的 nodejs 目录迭代器
  • 作者: tsvico
  • 创建于 : 2024-11-23 11:12:57
  • 更新于 : 2024-11-23 16:46:43
  • 链接: https://blog.tbox.fun/2024/320884568.html
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论