gpt4 book ai didi

多线程在打包工具中的运用

转载 作者:撒哈拉 更新时间:2024-10-31 16:10:36 58 4
gpt4 key购买 nike

我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值.

本文作者:UED 团队 。

现代操作系统都是「多任务」的,也就是操作系统可以「并发」处理多个任务,比如可以在浏览页面的时候同时播放音乐。但是,一般来说我们的 PC 只有一个物理 CPU ,那么它是如何做到在只有一个 CPU 的情况下,并发处理多个任务的呢?我们简单探究一下.

前置知识

我们先简单熟悉一下 CPU 硬件相关的术语:

  • Sockets(physical CPU): 物理CPU,指我们主板上实际插入的CPU,一般来说 PC 只有一个,服务器可能会有多个
  • Cores: CPU物理核心,CPU商品上宣传的一共几核指代的就是这个
  • Logical Processors: 逻辑处理器,如果采用超线程(多线程)技术的话,会比物理核心数多

总的来说: Logical Processors = Sockets _ Cores _ SMT(HT) Multiple 逻辑处理器数量也就代表了操作系统认为能「并行」执行的任务的最高数量 。

并发 VS 并行

我们对「并发」和「并行」先下个定义,「并发」指的是系统允许多个任务同时存在,「并行」则指的是系统支持多个任务同时执行,「并发」和「并行」的关键区别在于是否能同时执行。在只有单一逻辑处理器的情况下,我们的操作系统只能「并发」执行任务,比如早期的单核 CPU 电脑。但是我们仍然可以边听歌边浏览网页,这是因为 CPU 速度足够快,可以在系统的使用过程中快速切换任务,这样我们就感觉到多个任务同时存在。在单一逻辑处理器的情况下,虽然我们可以「并发」执行任务,但实际上我们同时也只能执行一个任务,对于 IO 密集类型的任务,我们用到 CPU 的时间不多,决定任务快慢的往往是硬盘以及网络等硬件,「并发」执行也未尝不可,但是对于计算密集型的任务,我们需要占用更多的 CPU 时间,如果「并发」执行,则往往会造成任务的卡顿(响应时间过长),因此我们需要「并行」的执行该任务,而逻辑处理器的数量代表了能「并行」执行任务的最高数量,这也是为什么现在的处理器大多是多核处理器的原因所在.

进程 VS 线程

我们使用的一个个程序可以称为「进程」( process ),而 process 下可以开辟多个「线程」( thread ),这里引用一下 Microsoft 官方对于进程和线程的解释About Processes and Threads

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads. 。

A thread is the entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread's set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread's process. Threads can also have their own security context, which can be used for impersonating clients. 。

在操作系统层面,process 相互独立,拥有一块独立的虚拟地址空间(内存中),而同一 process 下的 thread 共享该虚拟地址空间,这也是 process 和 thread 最典型,最根本的区别 。

多进程 VS 多线程

假如我们现在要开发一款浏览器,浏览器的基础功能包括 HTTP 请求,GUI 渲染等功能,如果我们采用单线程来开发,那么势必会遇到一个问题: 当需要网络请求的时候,我们的浏览器就会卡住,所有的用户操作如输入等都没有响应,等网络请求完成,我们才可以进行后续操作,非常影响用户体验,这也是为什么像浏览器这样的程序大多都是多线程的原因,我们需要任务同时进行。但是我们前面讲到的多进程也可以多任务同时进行,那么问题就来了,当我们需要实现多任务的时候,多进程和多线程该如何选择呢?

多进程

前面我们提到过,进程之间是相互独立的,每个进程有独立的虚址空间,那么当一个进程因为某些原因崩掉了,其他的进程也不会受到影响(主进程挂掉除外,但是主进程一般只负责调度,挂掉的几率较小),所以当我们需要较高的稳定性时,可以考虑多进程。但是创建进程的开销是比较大的,因此要考虑资源问题.

多线程

多线程可以共享虚址空间,而且创建一个线程的开销较小,这样我们就可以减少资源的占用。但是正是因为线程之间可以共享虚址空间,当一个线程挂掉了,整个进程会随之挂掉,所以多线程的稳定性相比多进程较差.

Node.js 中的多线程与多进程

child_process & cluster

Node.js提供了多种方法来创建多进程,例如 child_process 提供的 child_process.spawn() 和 child_process.fork() ,那么什么是 spawn

Spawn in computing refers to a function that loads and executes a new child process. The current process may wait for the child to terminate or may continue to execute concurrent computing. 。

所以 child_process.spawn 的作用是创建了一个子进程,然后在子进程执行一些命令,但是 child_process.spawn() 有一个缺点,就是不能进行进程间通信(IPC: Inter Process Communication),那么当需要进程间通信的时候,就需要使用child_process.fork() .

涉及到现实中多进程的运用,我们往往不会只起一个子进程,当我们需要进程间共享一个端口时,这时候就可以使用Node.js提供的cluster,cluster创建子进程内部也是通过child_process.fork()实现的,支持IPC 。

structured clone

当我们创建了一个子进程的时候,进程间的通信 Node.js 已经帮我们封装好了,使用 worker.send(message) 和 process.on('message', handle) 就可以实现进程间的通信,以 cluster 为例

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', (msg) => {
    process.send(msg);
  });
}

但是需要注意一点,我们发送的 message 会被 structured clone 一份,然后传递给其他进程,因此我们需要注意如果传递了一个 Object 过去,Object 中定义的 Function 及其 prototype 等内容都不会被clone过去。这里发散一下,如果我们需要深拷贝一个对象,而且该对象满足Structured clone的相关算法要求,那么我们可以考虑使用structuredClone(caniuse)或者直接创建一个worker来拷贝(当然不推荐) 。

worker_threads

上述我们讲到进程间的资源是独立的,当我们想共享数据的时候,我们需要structured clone 对应的数据然后传递过去,这在共享数据量较小的时候还可以接受,但是当数据量较多时,克隆数据是一个比较大的开销,这是我们所不能接受的,因此我们需要多线程来共享内存(数据),Node.js 中也提供了相应的方法 worker_threads .

多线程在 ko 中的实践

ko

ko 是基于 webpack@5.x 的打包工具,其仓库采用了 Monorepo 的方式进行包管理.

在这里,ko 提供了 concurrency 模式,该模式下使用多线程执行 eslint 、prettier 或 stylelint ,这里简单介绍一下如何实现.

获取需要 lint 的所有文件

这里使用的是 fast-glob ,主要代码如下所示 factory/runner.ts:

import fg, { Pattern } from 'fast-glob';

protected async getEntries(
  patterns: Pattern[],
  ignoreFiles: string[]
): Promise<string[]> {
  return fg(patterns, {
    dot: true,
    ignore: this.getIgnorePatterns(...ignoreFiles),
  });
}

private getIgnorePatterns(...ignoreFiles: string[]) {
    return ['.gitignore', ...ignoreFiles]
      .map(fileName => {
        const filePath = join(this.cwd, fileName);
        if (existsSync(filePath)) {
          return readFileSync(filePath, 'utf-8')
            .split('\n')
            .filter(str => str && !str.startsWith('#'));
        }
        return [];
      })
      .reduce((acc, current) => {
        current.forEach(p => {
          if (!acc.includes(p)) {
            acc.push(p);
          }
        });
        return acc;
      }, []);
  }

返回的是需要 lint 的所有文件路径 。

lint 相关的 Parser

我们以 eslint 为例eslint/parser.ts

import { eslint } from 'ko-lint-config';
import LintParserFactory from '../factory/parser';
import { IParserOpts } from '../interfaces';

class ESLintParser extends LintParserFactory {
  static readonly EXTENSIONS = ['ts', 'tsx', 'js', 'jsx'];
  private eslintInstance: eslint.ESLint;
  private opts: IParserOpts;
  private config: Record<string, any>;

  constructor(opts: IParserOpts) {
    super();
    this.opts = opts;
    this.generateConfig();
    this.initInstance();
  }

  private initInstance() {
    const { write } = this.opts;
    this.eslintInstance = new eslint.ESLint({
      fix: write,
      overrideConfig: this.config,
      useEslintrc: false,
      extensions: ESLintParser.EXTENSIONS,
    });
  }

  public async format(file: string): Promise<string> {
    const formatter = await this.eslintInstance.loadFormatter();
    let resultText = '';
    try {
      const result = await this.eslintInstance.lintFiles(file);
      if (result[0].errorCount) {
        resultText = formatter.format(result) as string;
      }
      return resultText;
    } catch (ex) {
      console.log(ex);
      process.exit(1);
    }
  }

  public generateConfig() {
    if (this.opts.configPath) {
      this.config = this.getConfigFromFile(this.opts.configPath);
    } else {
      const localConfigPath = this.detectLocalRunnerConfig(this.opts.name);
      if (localConfigPath) {
        this.config = this.getConfigFromFile(localConfigPath);
      }
    }
  }
}

export default ESLintParser;

所有的 parser 实现了 format() 方法,作用是输入一个文件的路径,然后进行 lint ,如果有相关的错误则返回错误结果.

Thread Pool

创建一个线程的是有开销的,虽然相比创建进程而言消耗的较小,但是我们也并不能无休止创建线程。线程是需要调度的,如果我们创建了很多线程,那么系统花在线程调度的时间往往会更长,导致的结果是我们开了多个线程,但是执行程序的耗时反而更长了。为了更好的使用线程,我们引入线程池的概念 WikiPedia:

In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program 。

还是WikiPedia的示例图

简单来说,线程池创建了一定数量的线程,每个线程从任务队列中获取任务并执行,然后继续执行下一个任务直到结束。ko中也实现了相关的线程池 threads/Pool.ts.

import { join } from 'path';
import { Worker } from 'worker_threads';
import { IThreadOpts, IParserOpts } from '../interfaces';

class ThreadPool {
  private readonly workers: Worker[] = [];
  private readonly workerPList: Promise<boolean>[] = [];
  private readonly opts: IThreadOpts;
  private queue: string[];
  private stdout: string[] = [];

  constructor(opts: IThreadOpts) {
    console.log('Using Multithreading...');
    this.opts = opts;
    this.queue = this.opts.entries;
    this.format();
  }

  format() {
    const { concurrentNumber, configPath, write, name } = this.opts;
    if (this.workers.length < concurrentNumber) {
      this.workerPList.push(
        this.createWorker({
          configPath,
          write,
          name,
        })
      );
      this.format();
    }
  }

  createWorker(opts: IParserOpts): Promise<boolean> {
    const worker = new Worker(join(__dirname, './Worker.js'), {
      workerData: {
        opts,
      },
    });
    return new Promise(resolve => {
      worker.postMessage(this.queue.shift());
      worker.on('message', (result: string) => {
        this.stdout.push(result);
        if (this.queue.length === 0) {
          resolve(true);
        } else {
          const next = this.queue.shift();
          worker.postMessage(next);
        }
      });
      worker.on('error', err => {
        console.log(err);
        process.exit(1);
      });
      this.workers.push(worker);
    });
  }

  async exec(): Promise<string[]> {
    return Promise.all(this.workerPList).then(() => {
      return this.stdout;
    });
  }
}

export default ThreadPool;

这里的 workers 维护了多个 worker ,相当于线程池的概念,而任务队列对应的则是 queue ,也就是传入的需要 lint 的所有文件,当一个 worker 执行完一个文件的 lint 之后,从 queue 中拿一个新的文件继续执行新的 lint 任务,当 queue 为空时,我们结束任务并返回最终结果.

需要注意的一点是关于 concurrentNumber 也就是我们启动的线程数量,这里我们默认是 Logical Processors 的数量.

结果

那么我们来对比一下多线程和普通情况下的性能,以执行 eslint 为例:

硬件信息:

  • CPU: Apple M1
  • Memory: 8 GB LPDDR4

普通模式下的log为:

exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write
exec eslint with 704 files cost 31.71s

多线程模式下的log为:

exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write --concurrency
Using Multithreading...
exec eslint with 704 files cost 23.60s

可以看到性能有一定程度的提升,但是并没有我们想象中的性能提升多倍,这是为什么呢?我们简单分析一下:

  • 线程启动及其调度消耗了一定的时间
  • 线程内部涉及到了IO操作,而不是单纯的运算

但是可以肯定的是,随着需要 lint 的文件数量增多,两个模式下所用的时间差会增大.

线程安全

在 ko 中, 我们针对 lint 进行了多线程的操作,性能上有了一定程度的提升,但是我们线程间总的来说是相互独立的,没有使用到共享内存的情况。那么当我们需要共享内存时,会遇到一个问题,我们启用了多个线程,线程之间针对共享内存可能存在竞争关系,也就是可能会同时操作共享内存中的数据,这个时候我们就不能保证数据的准确性,专业术语描述为不是线程安全的。遇到这种情况,我们一般会涉及到一个专业术语锁(Lock) 。

我们回到 work_threads ,看一下官方文档中是如何共享内存的:

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);

注意一点,如果我们想共享内存,我们可以传递 ArrayBuffer 或者 SharedArrayBuffer ,那么这两种类型的数据有什么特殊性呢?

答案是 ArrayBuffer 和 SharedArrayBuffer 支持 Atomics 一起使用,可以实现 Lock 相关的概念 。

最后

欢迎关注【袋鼠云数栈UED团队】~ 袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star 。

  • 大数据分布式任务调度系统——Taier
  • 轻量级的 Web IDE UI 框架——Molecule
  • 针对大数据领域的 SQL Parser 项目——dt-sql-parser
  • 袋鼠云数栈前端团队代码评审工程实践文档——code-review-practices
  • 一个速度更快、配置更灵活、使用更简单的模块打包器——ko
  • 一个针对 antd 的组件测试工具库——ant-design-testing

最后此篇关于多线程在打包工具中的运用的文章就讲到这里了,如果你想了解更多关于多线程在打包工具中的运用的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

58 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com