- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值.
本文作者:UED 团队 。
现代操作系统都是「多任务」的,也就是操作系统可以「并发」处理多个任务,比如可以在浏览页面的时候同时播放音乐。但是,一般来说我们的 PC 只有一个物理 CPU ,那么它是如何做到在只有一个 CPU 的情况下,并发处理多个任务的呢?我们简单探究一下.
我们先简单熟悉一下 CPU 硬件相关的术语:
总的来说: Logical Processors = Sockets _ Cores _ SMT(HT) Multiple 逻辑处理器数量也就代表了操作系统认为能「并行」执行的任务的最高数量 。
我们对「并发」和「并行」先下个定义,「并发」指的是系统允许多个任务同时存在,「并行」则指的是系统支持多个任务同时执行,「并发」和「并行」的关键区别在于是否能同时执行。在只有单一逻辑处理器的情况下,我们的操作系统只能「并发」执行任务,比如早期的单核 CPU 电脑。但是我们仍然可以边听歌边浏览网页,这是因为 CPU 速度足够快,可以在系统的使用过程中快速切换任务,这样我们就感觉到多个任务同时存在。在单一逻辑处理器的情况下,虽然我们可以「并发」执行任务,但实际上我们同时也只能执行一个任务,对于 IO 密集类型的任务,我们用到 CPU 的时间不多,决定任务快慢的往往是硬盘以及网络等硬件,「并发」执行也未尝不可,但是对于计算密集型的任务,我们需要占用更多的 CPU 时间,如果「并发」执行,则往往会造成任务的卡顿(响应时间过长),因此我们需要「并行」的执行该任务,而逻辑处理器的数量代表了能「并行」执行任务的最高数量,这也是为什么现在的处理器大多是多核处理器的原因所在.
我们使用的一个个程序可以称为「进程」( process ),而 process 下可以开辟多个「线程」( thread ),这里引用一下 Microsoft 官方对于进程和线程的解释About Processes and Threads
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads. 。
A thread is the entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread's set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread's process. Threads can also have their own security context, which can be used for impersonating clients. 。
在操作系统层面,process 相互独立,拥有一块独立的虚拟地址空间(内存中),而同一 process 下的 thread 共享该虚拟地址空间,这也是 process 和 thread 最典型,最根本的区别 。
假如我们现在要开发一款浏览器,浏览器的基础功能包括 HTTP 请求,GUI 渲染等功能,如果我们采用单线程来开发,那么势必会遇到一个问题: 当需要网络请求的时候,我们的浏览器就会卡住,所有的用户操作如输入等都没有响应,等网络请求完成,我们才可以进行后续操作,非常影响用户体验,这也是为什么像浏览器这样的程序大多都是多线程的原因,我们需要任务同时进行。但是我们前面讲到的多进程也可以多任务同时进行,那么问题就来了,当我们需要实现多任务的时候,多进程和多线程该如何选择呢?
前面我们提到过,进程之间是相互独立的,每个进程有独立的虚址空间,那么当一个进程因为某些原因崩掉了,其他的进程也不会受到影响(主进程挂掉除外,但是主进程一般只负责调度,挂掉的几率较小),所以当我们需要较高的稳定性时,可以考虑多进程。但是创建进程的开销是比较大的,因此要考虑资源问题.
多线程可以共享虚址空间,而且创建一个线程的开销较小,这样我们就可以减少资源的占用。但是正是因为线程之间可以共享虚址空间,当一个线程挂掉了,整个进程会随之挂掉,所以多线程的稳定性相比多进程较差.
Node.js提供了多种方法来创建多进程,例如 child_process 提供的 child_process.spawn() 和 child_process.fork() ,那么什么是 spawn
Spawn in computing refers to a function that loads and executes a new child process. The current process may wait for the child to terminate or may continue to execute concurrent computing. 。
所以 child_process.spawn 的作用是创建了一个子进程,然后在子进程执行一些命令,但是 child_process.spawn() 有一个缺点,就是不能进行进程间通信(IPC: Inter Process Communication),那么当需要进程间通信的时候,就需要使用child_process.fork() .
涉及到现实中多进程的运用,我们往往不会只起一个子进程,当我们需要进程间共享一个端口时,这时候就可以使用Node.js提供的cluster,cluster创建子进程内部也是通过child_process.fork()实现的,支持IPC 。
当我们创建了一个子进程的时候,进程间的通信 Node.js 已经帮我们封装好了,使用 worker.send(message) 和 process.on('message', handle) 就可以实现进程间的通信,以 cluster 为例
if (cluster.isPrimary) {
const worker = cluster.fork();
worker.send('hi there');
} else if (cluster.isWorker) {
process.on('message', (msg) => {
process.send(msg);
});
}
但是需要注意一点,我们发送的 message 会被 structured clone 一份,然后传递给其他进程,因此我们需要注意如果传递了一个 Object 过去,Object 中定义的 Function 及其 prototype 等内容都不会被clone过去。这里发散一下,如果我们需要深拷贝一个对象,而且该对象满足Structured clone的相关算法要求,那么我们可以考虑使用structuredClone(caniuse)或者直接创建一个worker来拷贝(当然不推荐) 。
上述我们讲到进程间的资源是独立的,当我们想共享数据的时候,我们需要structured clone 对应的数据然后传递过去,这在共享数据量较小的时候还可以接受,但是当数据量较多时,克隆数据是一个比较大的开销,这是我们所不能接受的,因此我们需要多线程来共享内存(数据),Node.js 中也提供了相应的方法 worker_threads .
ko 是基于 webpack@5.x 的打包工具,其仓库采用了 Monorepo 的方式进行包管理.
在这里,ko 提供了 concurrency 模式,该模式下使用多线程执行 eslint 、prettier 或 stylelint ,这里简单介绍一下如何实现.
这里使用的是 fast-glob ,主要代码如下所示 factory/runner.ts:
import fg, { Pattern } from 'fast-glob';
protected async getEntries(
patterns: Pattern[],
ignoreFiles: string[]
): Promise<string[]> {
return fg(patterns, {
dot: true,
ignore: this.getIgnorePatterns(...ignoreFiles),
});
}
private getIgnorePatterns(...ignoreFiles: string[]) {
return ['.gitignore', ...ignoreFiles]
.map(fileName => {
const filePath = join(this.cwd, fileName);
if (existsSync(filePath)) {
return readFileSync(filePath, 'utf-8')
.split('\n')
.filter(str => str && !str.startsWith('#'));
}
return [];
})
.reduce((acc, current) => {
current.forEach(p => {
if (!acc.includes(p)) {
acc.push(p);
}
});
return acc;
}, []);
}
返回的是需要 lint 的所有文件路径 。
我们以 eslint 为例eslint/parser.ts
import { eslint } from 'ko-lint-config';
import LintParserFactory from '../factory/parser';
import { IParserOpts } from '../interfaces';
class ESLintParser extends LintParserFactory {
static readonly EXTENSIONS = ['ts', 'tsx', 'js', 'jsx'];
private eslintInstance: eslint.ESLint;
private opts: IParserOpts;
private config: Record<string, any>;
constructor(opts: IParserOpts) {
super();
this.opts = opts;
this.generateConfig();
this.initInstance();
}
private initInstance() {
const { write } = this.opts;
this.eslintInstance = new eslint.ESLint({
fix: write,
overrideConfig: this.config,
useEslintrc: false,
extensions: ESLintParser.EXTENSIONS,
});
}
public async format(file: string): Promise<string> {
const formatter = await this.eslintInstance.loadFormatter();
let resultText = '';
try {
const result = await this.eslintInstance.lintFiles(file);
if (result[0].errorCount) {
resultText = formatter.format(result) as string;
}
return resultText;
} catch (ex) {
console.log(ex);
process.exit(1);
}
}
public generateConfig() {
if (this.opts.configPath) {
this.config = this.getConfigFromFile(this.opts.configPath);
} else {
const localConfigPath = this.detectLocalRunnerConfig(this.opts.name);
if (localConfigPath) {
this.config = this.getConfigFromFile(localConfigPath);
}
}
}
}
export default ESLintParser;
所有的 parser 实现了 format() 方法,作用是输入一个文件的路径,然后进行 lint ,如果有相关的错误则返回错误结果.
创建一个线程的是有开销的,虽然相比创建进程而言消耗的较小,但是我们也并不能无休止创建线程。线程是需要调度的,如果我们创建了很多线程,那么系统花在线程调度的时间往往会更长,导致的结果是我们开了多个线程,但是执行程序的耗时反而更长了。为了更好的使用线程,我们引入线程池的概念 WikiPedia:
In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program 。
还是WikiPedia的示例图
简单来说,线程池创建了一定数量的线程,每个线程从任务队列中获取任务并执行,然后继续执行下一个任务直到结束。ko中也实现了相关的线程池 threads/Pool.ts.
import { join } from 'path';
import { Worker } from 'worker_threads';
import { IThreadOpts, IParserOpts } from '../interfaces';
class ThreadPool {
private readonly workers: Worker[] = [];
private readonly workerPList: Promise<boolean>[] = [];
private readonly opts: IThreadOpts;
private queue: string[];
private stdout: string[] = [];
constructor(opts: IThreadOpts) {
console.log('Using Multithreading...');
this.opts = opts;
this.queue = this.opts.entries;
this.format();
}
format() {
const { concurrentNumber, configPath, write, name } = this.opts;
if (this.workers.length < concurrentNumber) {
this.workerPList.push(
this.createWorker({
configPath,
write,
name,
})
);
this.format();
}
}
createWorker(opts: IParserOpts): Promise<boolean> {
const worker = new Worker(join(__dirname, './Worker.js'), {
workerData: {
opts,
},
});
return new Promise(resolve => {
worker.postMessage(this.queue.shift());
worker.on('message', (result: string) => {
this.stdout.push(result);
if (this.queue.length === 0) {
resolve(true);
} else {
const next = this.queue.shift();
worker.postMessage(next);
}
});
worker.on('error', err => {
console.log(err);
process.exit(1);
});
this.workers.push(worker);
});
}
async exec(): Promise<string[]> {
return Promise.all(this.workerPList).then(() => {
return this.stdout;
});
}
}
export default ThreadPool;
这里的 workers 维护了多个 worker ,相当于线程池的概念,而任务队列对应的则是 queue ,也就是传入的需要 lint 的所有文件,当一个 worker 执行完一个文件的 lint 之后,从 queue 中拿一个新的文件继续执行新的 lint 任务,当 queue 为空时,我们结束任务并返回最终结果.
需要注意的一点是关于 concurrentNumber 也就是我们启动的线程数量,这里我们默认是 Logical Processors 的数量.
那么我们来对比一下多线程和普通情况下的性能,以执行 eslint 为例:
硬件信息:
普通模式下的log为:
exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write
exec eslint with 704 files cost 31.71s
多线程模式下的log为:
exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write --concurrency
Using Multithreading...
exec eslint with 704 files cost 23.60s
可以看到性能有一定程度的提升,但是并没有我们想象中的性能提升多倍,这是为什么呢?我们简单分析一下:
但是可以肯定的是,随着需要 lint 的文件数量增多,两个模式下所用的时间差会增大.
在 ko 中, 我们针对 lint 进行了多线程的操作,性能上有了一定程度的提升,但是我们线程间总的来说是相互独立的,没有使用到共享内存的情况。那么当我们需要共享内存时,会遇到一个问题,我们启用了多个线程,线程之间针对共享内存可能存在竞争关系,也就是可能会同时操作共享内存中的数据,这个时候我们就不能保证数据的准确性,专业术语描述为不是线程安全的。遇到这种情况,我们一般会涉及到一个专业术语锁(Lock) 。
我们回到 work_threads ,看一下官方文档中是如何共享内存的:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);
// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);
// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);
注意一点,如果我们想共享内存,我们可以传递 ArrayBuffer 或者 SharedArrayBuffer ,那么这两种类型的数据有什么特殊性呢?
答案是 ArrayBuffer 和 SharedArrayBuffer 支持 Atomics 一起使用,可以实现 Lock 相关的概念 。
欢迎关注【袋鼠云数栈UED团队】~ 袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star 。
最后此篇关于多线程在打包工具中的运用的文章就讲到这里了,如果你想了解更多关于多线程在打包工具中的运用的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我是一名优秀的程序员,十分优秀!