前言

上一期我们讲了前端如何上传大文件,本期我们来讲一下后端如何处理大文件上传。

具体实现步骤

  1. 处理切片上传,根据请求中的 fileHash 参数创建名字为 fileHash 变量的文件夹,并将切片文件保存到该文件夹中。
  2. 处理合并上传,根据请求中的 fileHash 参数找到该文件夹下的所有切片文件,并按照顺序将其合并成一个文件。

这里步骤是不是很简单啊,实现起来是不是也很简单啊

哎,还真是

不过里面有很多细节要注意,这个后面讲代码的时候会提到

代码实现

这里使用 nodejs 进行后端开发,其实使用其他语言也是同理,只是语法有点区别而已

初始化项目

创建一个文件夹,然后在该文件夹下打开终端,输入以下命令初始化项目:

bash
1
2
npm init -y # 初始化项目
npm install express cors multiparty fs-extra # 安装依赖

然后在根目录下创建一个 index.js 文件和 uploads 文件夹,uploads 文件夹用来存放上传的文件

初始化服务

先修改一下 package.json 文件,添加一个 type 属性,值为 module,我比较喜欢使用模块化语法进行开发

json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"name": "backend",
"version": "1.0.0",
"main": "index.js",
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"dependencies": {
"cors": "^2.8.5",
"express": "^4.21.2",
"fs-extra": "^11.2.0",
"multiparty": "^4.2.3"
}
}

导入模块并初始化一个基本的服务

javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import express from "express";
import cors from "cors";
import multiparty from "multiparty";
import fse from "fs-extra";
import { fileURLToPath } from "node:url";
import path from "node:path";

const app = express();

// 获取上传路径
const UPLOAD_PATH = fileURLToPath(new URL("./uploads", import.meta.url));

// 跨域配置
const corsOption = {
origin: "*",
credentials: true,
allowedHeaders: ["Content-Type", "Authorization", "X-Requested-With"],
methods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
};

// 使用cors模块处理跨域
app.use(cors(corsOption));
// 解析请求体,不添加这个的话,接收不到post请求中的请求体参数
app.use(express.json());

// 监听3000端口
app.listen(3000, () => {
console.log("服务启动在3000端口");
});

ok 接下来就来处理上传切片请求了

创建一个上传切片的接口

使用 multiparty 模块解析请求体中的 formdata,并将切片文件保存到对应的文件夹中

javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 上传分片
*/
app.post("/chunk", (req, res) => {
try {
// 使用 multiparty 中间件,可以解析请求并访问表单数据和上传的文件
const form = new multiparty.Form();
// 解析请求体中的formdata
// 表单数据会存放到fields对象中,文件数据会存放到files对象中
form.parse(req, (err, fields, files) => {
if (err) {
return res.status(500).send({ err, message: "上传失败" });
}
// 获取formdata中的fileHash和chunkHash
const fileHash = fields["fileHash"][0];
const chunkHash = fields["chunkHash"][0];
// 切片要保存到的位置
const chunkPath = path.join(UPLOAD_PATH, fileHash);
// 保存切片
// files["chunk"][0]["path"]是multiparty解析出来的临时路径,我们需要将其保存到指定路径下
saveChunk(chunkPath, files["chunk"][0]["path"], chunkHash);
return res.send({ data: null, message: "上传成功" });
});
} catch (error) {
return res.status(500).send({ err: error, message: "上传失败" });
}
});

这里使用了一个 saveChunk 函数,使用 fs-extra 模块来保存切片文件,我们来编写它

javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 保存切片函数
* @param {*} savePath 要保存到的路径
* @param {*} oldPath 原来的临时路径
* @param {*} chunkHash 切片的hash值,也就是切片的文件名
* @returns
*/
const saveChunk = (savePath, oldPath, chunkHash) => {
// 切片要保存到的路径不存在则创建
if (!fse.existsSync(savePath)) {
fse.mkdirSync(savePath);
}
try {
// 将临时路径下的切片保存到指定路径下
return fse.moveSync(oldPath, path.join(savePath, chunkHash));
} catch (err) {
return err;
}
};

这样我们就实现了上传切片的功能,接下来我们来实现合并上传的功能

创建一个合并上传的接口

合并是这样的,先拿到所有的切片,然后要按照索引进行排序,要不然直接合并的话会文件会乱,之后遍历切片数组,创建切片的读取流和合并文件的写入流,读取切片内容,写入到要合并的文件的写入流中,写入完成后删除切片文件,最后合并完成后删除切片文件夹

javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 合并切片
*/
app.post("/merge", async (req, res) => {
try {
// 获取请求体中的参数
const { fileHash, fileName, chunkSize } = req.body;
// 切片所在的文件夹路径
const mergePath = path.join(UPLOAD_PATH, fileHash);
// 合并后的文件路径
const targerFilePath = path.join(
UPLOAD_PATH,
`${fileHash}${path.extname(fileName)}`
);
// 如果切片路径不存在,则合并失败
if (!fse.existsSync(mergePath)) {
return res.send({ message: "合并失败" });
}
// 读取切片文件夹下的所有切片
const chunkFiles = await fse.readdir(mergePath);
// 按照切片的chunkHash的最后一部分,也就是索引部分进行排序
chunkFiles.sort((a, b) => {
return a.split("-")[1] - b.split("-")[1];
});
// 遍历切片
const list = chunkFiles.map((chunkName, index) => {
return new Promise((resolve, _reject) => {
// 切片所在的文件路径
const chunkPath = path.join(mergePath, chunkName);
// 创建写入流,根据请求中的chunkSize参数从指定字节位置开始写入内容
const writeStream = fse.createWriteStream(targerFilePath, {
start: index * chunkSize,
end: (index + 1) * chunkSize,
});
// 创建读取流,读取切片文件的内容
const readStream = fse.createReadStream(chunkPath);
// 读取流结束后,删除切片文件
readStream.on("end", async () => {
fse.unlinkSync(chunkPath);
// 更改Promise状态
resolve(true);
});
// 将读取流的内容写入到写入流中
readStream.pipe(writeStream);
});
});
// 等待所有切片写入完成
await Promise.all(list);
// 删除切片文件夹
await fse.rm(mergePath, { recursive: true });
return res.send({ message: "合并成功" });
} catch (error) {
return res.send({ error, message: "合并失败" });
}
});

到这里我们的后端处理大文件上传就完成了

最终代码

javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import express from "express";
import cors from "cors";
import multiparty from "multiparty";
import fse from "fs-extra";
import { fileURLToPath } from "node:url";
import path from "node:path";

const app = express();

// 获取上传路径
const UPLOAD_PATH = fileURLToPath(new URL("./uploads", import.meta.url));

// 跨域配置
const corsOption = {
origin: "*",
credentials: true,
allowedHeaders: ["Content-Type", "Authorization", "X-Requested-With"],
methods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
};

// 使用cors模块处理跨域
app.use(cors(corsOption));
app.use(express.json());

/**
* 保存切片函数
* @param {*} savePath 要保存到的路径
* @param {*} oldPath 原来的临时路径
* @param {*} chunkHash 切片的hash值,也就是切片的文件名
* @returns
*/
const saveChunk = (savePath, oldPath, chunkHash) => {
// 切片要保存到的路径不存在则创建
if (!fse.existsSync(savePath)) {
fse.mkdirSync(savePath);
}
try {
// 将临时路径下的切片保存到指定路径下
return fse.moveSync(oldPath, path.join(savePath, chunkHash));
} catch (err) {
return err;
}
};

/**
* 上传分片
*/
app.post("/chunk", (req, res) => {
try {
// 使用 multiparty 中间件,可以解析请求并访问表单数据和上传的文件
const form = new multiparty.Form();
// 解析请求体中的formdata
// 表单数据会存放到fields对象中,文件数据会存放到files对象中
form.parse(req, (err, fields, files) => {
if (err) {
return res.status(500).send({ err, message: "上传失败" });
}
// 获取formdata中的fileHash和chunkHash
const fileHash = fields["fileHash"][0];
const chunkHash = fields["chunkHash"][0];
// 切片要保存到的位置
const chunkPath = path.join(UPLOAD_PATH, fileHash);
// 保存切片
// files["chunk"][0]["path"]是multiparty解析出来的临时路径,我们需要将其保存到指定路径下
saveChunk(chunkPath, files["chunk"][0]["path"], chunkHash);
return res.send({ data: null, message: "上传成功" });
});
} catch (error) {
return res.status(500).send({ err: error, message: "上传失败" });
}
});

/**
* 合并切片
*/
app.post("/merge", async (req, res) => {
try {
// 获取请求体中的参数
const { fileHash, fileName, chunkSize } = req.body;
// 切片所在的文件夹路径
const mergePath = path.join(UPLOAD_PATH, fileHash);
// 合并后的文件路径
const targerFilePath = path.join(
UPLOAD_PATH,
`${fileHash}${path.extname(fileName)}`
);
// 如果切片路径不存在,则合并失败
if (!fse.existsSync(mergePath)) {
return res.send({ message: "合并失败" });
}
// 读取切片文件夹下的所有切片
const chunkFiles = await fse.readdir(mergePath);
// 按照切片的chunkHash的最后一部分,也就是索引部分进行排序
chunkFiles.sort((a, b) => {
return a.split("-")[1] - b.split("-")[1];
});
// 遍历切片
const list = chunkFiles.map((chunkName, index) => {
return new Promise((resolve, _reject) => {
// 切片所在的文件路径
const chunkPath = path.join(mergePath, chunkName);
// 创建写入流,根据请求中的chunkSize参数从指定字节位置开始写入内容
const writeStream = fse.createWriteStream(targerFilePath, {
start: index * chunkSize,
end: (index + 1) * chunkSize,
});
// 创建读取流,读取切片文件的内容
const readStream = fse.createReadStream(chunkPath);
// 读取流结束后,删除切片文件
readStream.on("end", async () => {
fse.unlinkSync(chunkPath);
// 更改Promise状态
resolve(true);
});
// 将读取流的内容写入到写入流中
readStream.pipe(writeStream);
});
});
// 等待所有切片写入完成
await Promise.all(list);
// 删除切片文件夹
await fse.rm(mergePath, { recursive: true });
return res.send({ message: "合并成功" });
} catch (error) {
return res.send({ error, message: "合并失败" });
}
});

// 监听3000端口
app.listen(3000, () => {
console.log("服务启动在3000端口");
});

补充

这里只是简单的实现了大文件上传,其实大文件上传还有很多功能和需要注意的点,比如文件秒传和断点续传等等,这些可以读者根据秒传、断点续传的定义跟文件 hash 值的关系,自己思考并实现,其实实现起来也不难