Skip to content

大文件上传

  • 文件切片:将大文件分割成多个较小的块。目的是支持并行按需上传,提高上传成功率。
  • 计算 Hash 值:对每个文件块计算哈希值。目的是支持断点续传、秒传、并行按需上传、保证文件完整性、避免重复上传、等。
  • 秒传: 客户端会将待上传的文件分割成多个小块,并为每个小块计算唯一的哈希值(例如MD5、SHA-1等)。这些哈希值就像是每个文件块的数字指纹。然后,客户端将这些哈希值发送给服务器进行比对。服务器会检查自己的数据库中是否已经存在相同哈希值的文件块。存在相同的说明已上传过,服务端会给客户端返回已存在的信息。
  • 断点续传:上传过程中意外中断,从上次中断的地方继续传输,而不需要重新开始整个文件的传输,上传分片前需先请求接口查询需要上传的分片,哪些块已存在,哪些块不存在,存在就秒传,不存在重新上传。
  • 并行上传:多个分片同时上传并控制并发量,避免一次性上传过多切片资源过载,导致崩溃,而且可以更灵活地处理任务失败及重试。
  • 发送合并请求:前端上传完毕之后主动通知服务端进行文件切片的合并。

文件切片及 Hash 计算

因为当文件体积较大时,如果对整个文件计算 md5 以及加载整个文件并进行分片都是很吃性能和耗时的操作,需要通过输入流对文件进行读取。这里 Hash 的计算使用 spark-md5 它优化了大文件的哈希计算性能。读取文件通过 FileReader 它用于异步读取文件内容,不会阻塞主线程,同时保持较低的内存使用率,支持多种读取方法,可以在读取文件的过程中进行其他操作,如进度更新、取消读取等,这里结合 spark-md5 在文件读取的过程中逐步计算哈希值。大文件切片和 Hash 的计算是个耗时的操作,可以考虑在 Worker 线程中执行。

js
public getChunkListAndFileMd5(chunkSize: number, file: File): Promise<{ md5: string; chunkList: Blob[] }> {
  return new Promise((resolve, reject) => {
    const chunks = Math.ceil(file.size / chunkSize); // 计算总分片数
    const spark = new SparkMD5.ArrayBuffer(); // 创建 SparkMD5 实例,用于计算 hash
    const fileReader = new FileReader(); // 创建 FileReader 实例,用于读取文件
    const chunkList: Blob[] = []; // 存储分片的数组
    let currentChunk = 0; // 当前处理的分片索引

    // 文件读取完成的回调
    fileReader.onload = (e) => {
      if (e?.target?.result instanceof ArrayBuffer) {
        spark.append(e.target.result); // 将读取到的 ArrayBuffer 追加到 SparkMD5 实例中
      }
      currentChunk++; // 增加当前分片索引

      if (currentChunk < chunks) {
        loadNextChunk(); // 如果还有未处理的分片,继续加载下一个分片
      } else {
        const computedHash = spark.end(); // 计算最终的 MD5 哈希值
        resolve({ md5: computedHash, chunkList }); // 返回结果
      }
    };

    // 文件读取出错的回调
    fileReader.onerror = (e) => {
      console.error('Error reading file:', e);
      reject(new Error('Failed to read file')); // 拒绝Promise,传递错误信息
    };

    // 加载下一个分片
    function loadNextChunk() {
      const start = currentChunk * chunkSize; // 计算当前分片的起始位置
      const end = Math.min(start + chunkSize, file.size); // 计算当前分片的结束位置

      const chunk = file.slice(start, end); // 从文件中切出当前分片
      chunkList.push(chunk); // 将分片添加到分片列表中
      fileReader.readAsArrayBuffer(chunk); // 读取分片为 ArrayBuffer
    }

    // 开始加载第一个分片
    loadNextChunk();
  });
}

大文件切片和 Hash 的计算是个耗时的操作,可以考虑在 Worker 线程中执行:

js
// fileProcessorWorker.ts

self.onmessage = async (event: MessageEvent<{ file: File; chunkSize: number }>) => {
  const { file, chunkSize } = event.data;
  const chunks = Math.ceil(file.size / chunkSize);
  const spark = new SparkMD5.ArrayBuffer();
  const fileReader = new FileReader();
  const chunkList: Blob[] = [];
  let currentChunk = 0;

  fileReader.onload = (e: ProgressEvent<FileReader>) => {
    if (e?.target?.result instanceof ArrayBuffer) {
      spark.append(e.target.result);
    }
    currentChunk++;

    if (currentChunk < chunks) {
      loadNextChunk();
    } else {
      const computedHash = spark.end();
      self.postMessage({ md5: computedHash, chunkList });
    }
  };

  fileReader.onerror = (e: ProgressEvent<FileReader>) => {
    console.error('Error reading file:', e);
    self.postMessage({ error: 'Failed to read file' });
  };

  function loadNextChunk() {
    const start = currentChunk * chunkSize;
    const end = Math.min(start + chunkSize, file.size);

    const chunk = file.slice(start, end);
    chunkList.push(chunk);
    fileReader.readAsArrayBuffer(chunk);
  }

  loadNextChunk();
};

const getChunkListAndFileMd5 = async (file: File): Promise<{ md5: string; chunkList: Blob[] }> => {
  return new Promise((resolve, reject) => {
    const worker = new Worker();
    worker.postMessage({ file, chunkSize });

    worker.onmessage = (event: MessageEvent<{ md5: string; chunkList: Blob[]; error?: string }>) => {
      const { md5, chunkList, error } = event.data;
      if (error) {
        reject(new Error(error));
      } else {
        resolve({ md5, chunkList });
      }
    };

    worker.onerror = (e: ErrorEvent) => {
      console.error('Web Worker error:', e);
      reject(new Error('Web Worker error'));
    };
  });
};

断点续传及秒传

要实现断点续传和秒传还需要在上传前去服务器查询一下,是否已经存在当前文件。如果已存在,并且已经是上传成功的文件,则直接返回前端上传成功,即可实现 "秒传"。如果已存在,并且有一部分切片上传失败,则返回给前端已经上传成功的切片 name,前端拿到后,根据返回的切片,计算出未上传成功的剩余切片,然后把剩余的切片继续上传,即可实现 "断点续传"。

向服务端发送文件名和文件哈希,检查是否需要上传文件:

js
async verifyUpload(filename, fileHash) {
  const { data } = await this.request({
    url: "https://your-api-host/api/verify",
    headers: {
      "content-type": "application/json"
    },
    data: JSON.stringify({
      filename,
      fileHash
    })
  });
  return JSON.parse(data);
}

并行上传

文件切片完成后,构建上传任务并行上传,这里采用 PromisePool(e.g. p-queue) 管理任务队列和控制并发任务的数量,避免资源过载。首先实现 PromisePool 类用于管理和控制上传任务:

js
// src/utils/PromisePool.ts

class PromisePool {
    private concurrency: number; // 最大并发数
    private queue: Array<{ task: () => Promise<any>; priority: number; retries: number; timeout: number; onCompletion?: (result: any) => void }>; // 任务队列
    private running: number; // 当前正在运行的任务数
    private results: any[]; // 存储任务结果
    private errors: any[]; // 存储任务错误

    /**
     * 构造函数
     * @param { { concurrency?: number } } options - 配置选项
     * @param { number } options.concurrency - 最大并发数,默认为 5
     */
    constructor({ concurrency = 5 } = {}) {
        this.concurrency = concurrency;
        this.queue = [];
        this.running = 0;
        this.results = [];
        this.errors = [];
    }

    /**
     * 添加任务到队列
     * @param { (signal: AbortSignal) => Promise<any> } task - 任务函数
     * @param { { priority?: number, retries?: number, timeout?: number, onCompletion?: (result: any) => void } } options - 任务选项
     * @returns { Promise<any> } - 任务的 Promise
     */
    add(task: (signal: AbortSignal) => Promise<any>, { priority = 0, retries = 0, timeout = 0, onCompletion }: { priority?: number, retries?: number, timeout?: number, onCompletion?: (result: any) => void } = {}): Promise<any> {
        return new Promise((resolve, reject) => {
            const controller = new AbortController();
            const signal = controller.signal;

            const wrappedTask = async () => {
                let attempt = 0;
                while (attempt <= retries) {
                    try {
                        const result = await Promise.race([
                            task(signal), // 执行任务
                            this.createTimeout(timeout, controller) // 创建超时处理
                        ]);
                        this.results.push(result); // 记录任务结果
                        resolve(result); // 解决任务的 Promise
                        onCompletion?.(result); // 执行任务完成回调
                        return;
                    } catch (error) {
                        if (error.name === 'AbortError') {
                            this.errors.push(new Error('Task was aborted')); // 记录任务被取消的错误
                        } else {
                            this.errors.push(error); // 记录任务失败的错误
                        }
                        if (attempt < retries) {
                            console.warn(`Task failed, retrying... Attempt ${attempt + 1}/${retries}`); // 重试提示
                        } else {
                            reject(error); // 拒绝任务的 Promise
                            return;
                        }
                    }
                    attempt++;
                }
            };

            this.queue.push({ task: wrappedTask, priority, retries, timeout, onCompletion }); // 将任务添加到队列

            if (this.running < this.concurrency) {
                this.next(); // 如果当前运行的任务数小于最大并发数,则立即开始执行任务
            }
        });
    }

    /**
     * 从队列中取出下一个任务并执行
     */
    private next() {
        if (this.running < this.concurrency) {
            this.queue.sort((a, b) => b.priority - a.priority); // 按优先级排序任务队列
            if (this.queue.length > 0) {
                this.running++;
                const { task } = this.queue.shift()!; // 取出优先级最高的任务
                task().finally(() => {
                    this.running--; // 任务完成后减少正在运行的任务数
                    this.next(); // 继续执行下一个任务
                });
            }
        }
    }

    /**
     * 开始执行所有任务
     */
    start() {
        while (this.running < this.concurrency && this.queue.length > 0) {
            this.next(); // 开始执行任务直到达到最大并发数
        }
    }

    /**
     * 等待所有任务完成
     * @returns { Promise<{ results: any[], errors: any[] }> } - 任务结果和错误
     */
    async wait(): Promise<{ results: any[], errors: any[] }> {
        return new Promise((resolve) => {
            const checkCompletion = () => {
                if (this.running === 0 && this.queue.length === 0) {
                    resolve({
                        results: this.results, // 返回所有任务的结果
                        errors: this.errors // 返回所有任务的错误
                    });
                } else {
                    setTimeout(checkCompletion, 100); // 每隔100毫秒检查一次任务是否完成
                }
            };
            checkCompletion();
        });
    }

    /**
     * 创建超时处理
     * @param { number } timeout - 超时时间(毫秒)
     * @param { AbortController } controller - AbortController 实例
     * @returns { Promise<never> } - 超时的 Promise
     */
    private createTimeout(timeout: number, controller: AbortController): Promise<never> {
        return new Promise((_, reject) => {
            if (timeout > 0) {
                setTimeout(() => {
                    controller.abort(); // 超时后取消任务
                    reject(new Error('Task timed out')); // 拒绝任务的 Promise
                }, timeout);
            }
        });
    }

    /**
     * 取消所有任务
     */
    cancelAll() {
        this.queue.forEach(({ task }) => {
            task.signal?.abort(); // 取消所有任务
        });
        this.queue = []; // 清空任务队列
        this.running = 0; // 重置正在运行的任务数
    }
}

export default PromisePool;

基于 getChunkListAndFileMd5 和 PromisePool 实现文件上传,在上传之前需先验证是否需要上传文件,如果不需要则直接返回。如果需要上传,则继续处理文件切片和上传:

js
async handleUpload() {
  if (!this.file) return;

  try {
    // 获取文件切片列表
    const { md5, chunkList } = await this.getChunkListAndFileMd5(this.file);

    // 验证是否需要上传文件
    const { shouldUpload, uploadedChunks } = await this.verifyUpload(this.file.name, md5);
    if (!shouldUpload) {
      console.log("文件已秒传");
      return;
    }

    // 初始化并发控制器
    const pool = new PromisePool({ concurrency: this.maxConcurrency });

    // 添加上传任务到池中
    chunkList.forEach((chunk, index) => {
      if (!uploadedChunks.includes(index)) {
        pool.add(async (signal: AbortSignal) => {
          await this.uploadPartFileFunc(chunk, index, md5, signal);
        }, {
          priority: 0,
          retries: 3, // 每个任务最多重试 3 次
          timeout: 10000, // 每个任务超时时间为 10 秒
          onCompletion: (result: any) => {
            console.log(`分片 ${index} 上传完成`, result);
          }
        });
      }
    });

    // 等待所有任务完成
    const { results, errors } = await pool.wait();

    if (errors.length > 0) {
      console.error(`文件上传失败: ${errors.map((error) => error.message).join(', ')}`);
    } else {
      console.log('文件上传成功');
      // 所有分片上传完成后,通知服务端进行文件切片的合并
      try {
        await this.mergeFileOnServer(md5, this.file.name);
        console.log('文件切片合并成功');
      } catch (mergeError) {
        console.error(`文件切片合并失败: ${mergeError.message}`);
      }
    }
  } catch (error) {
    console.error(`文件上传失败: ${error.message}`);
  }
}

const uploadPartFileFunc = async (chunk: Blob, index: number, uploadId: string) => {
  const formData = new FormData();
  formData.append('uploadId', uploadId);
  formData.append('partIndex', index.toString());
  formData.append('partFile', chunk);

  try {
    await axios.post(`https://your-api-host/api/uploadPart`, formData, {
      headers: { 'Content-Type': 'multipart/form-data' },
    });
    console.log(`上传分片 ${index} 完成`);
    // 所以分片上传完成后请求服务端合并
    this.mergeFileOnServer()
  } catch (error) {
    console.error(`上传分片 ${index} 失败: ${error.message}`);
    throw new Error(`上传分片 ${index} 失败`);
  }
};

合并切片

前端上传完毕之后主动通知服务端进行文件切片的合并:

js
const mergeFileOnServer = async () => {
  try {
    await axios.post(`https://your-api-host/api/mergeFile`, {
      fileName: (fileInput.current as any).files[0].name
    }, {
      headers: { 'Content-Type': 'application/json' }
    });
  } catch (error) {
    console.error(`合并文件请求失败: ${error.message}`);
    throw new Error(`合并文件请求失败: ${error.message}`);
  }
};

实现一个大文件上传和断点续传

超详细的大文件分片上传

简单实现大文件上传