- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章C和Java没那么香了,Serverless时代Rust即将称王?由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
在这个高并发时代最重要的设计模式无疑是生产者、消费者模式,比如著名的消息队列kafka其实就是一个生产者消费者模式的典型实现。其实生产者消费者问题,也就是有限缓冲问题,可以用以下场景进行简要描述,生产者生成一定量的产品放到库房,并不断重复此过程;与此同时,消费者也在缓冲区消耗这些数据,但由于库房大小有限,所以生产者和消费者之间步调协调,生产者不会在库房满的情况放入端口,消费者也不会在库房空时消耗数据。详见下图:
而如果在生产者与消费者之间完美协调并保持高效,这就是高并发要解决的本质问题.
笔者在前文曾经介绍过TDEngine的相关代码,其中Sheduler模块的相关调度算法就使用了生产、消费者模式进行消息传递功能的实现,也就是有多个生产者(producer)生成并不断向队列中传递消息,也有多个消费者(consumer)不断从队列中取消息.
后面我们也会说明类型功能在Go、Java等高级语言中类似的功能已经被封装好了,但是在C语言中你就必须要用好互斥体( mutex)和信号量(semaphore)并协调他们之间的关系。由于C语言的实现是最复杂的,先来看结构体设计和他的注释:
1
2
3
4
5
6
7
8
9
10
11
12
|
typedef
struct
{
char
label[16];
//消息内容
sem_t emptySem;
//此信号量代表队列的可写状态
sem_t fullSem;
//此信号量代表队列的可读状态
pthread_mutex_t queueMutex;
//此互斥体为保证消息不会被误修改,保证线程程安全
int
fullSlot;
//队尾位置
int
emptySlot;
//队头位置
int
queueSize;#队列长度
int
numOfThreads;
//同时操作的线程数量
pthread_t * qthread;
//线程指针
SSchedMsg * queue;
//队列指针
} SSchedQueue;
|
再来看Shceduler初始化函数,这里需要特别说明的是,两个信号量的创建,其中emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度,fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读。具体代码及我的注释如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
void
*taosInitScheduler(
int
queueSize,
int
numOfThreads,
char
*label) {
pthread_attr_t attr;
SSchedQueue * pSched = (SSchedQueue *)
malloc
(
sizeof
(SSchedQueue));
memset
(pSched, 0,
sizeof
(SSchedQueue));
pSched->queueSize = queueSize;
pSched->numOfThreads = numOfThreads;
strcpy
(pSched->label, label);
if
(pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
pError(
"init %s:queueMutex failed, reason:%s"
, pSched->label,
strerror
(
errno
));
goto
_error;
}
//emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。
if
(sem_init(&pSched->emptySem, 0, (unsigned
int
)pSched->queueSize) != 0) {
pError(
"init %s:empty semaphore failed, reason:%s"
, pSched->label,
strerror
(
errno
));
goto
_error;
}
//fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读
if
(sem_init(&pSched->fullSem, 0, 0) != 0) {
pError(
"init %s:full semaphore failed, reason:%s"
, pSched->label,
strerror
(
errno
));
goto
_error;
}
if
((pSched->queue = (SSchedMsg *)
malloc
((
size_t
)pSched->queueSize *
sizeof
(SSchedMsg))) == NULL) {
pError(
"%s: no enough memory for queue, reason:%s"
, pSched->label,
strerror
(
errno
));
goto
_error;
}
memset
(pSched->queue, 0, (
size_t
)pSched->queueSize *
sizeof
(SSchedMsg));
pSched->fullSlot = 0;
//实始化时队列为空,故队头和队尾的位置都是0
pSched->emptySlot = 0;
//实始化时队列为空,故队头和队尾的位置都是0
pSched->qthread =
malloc
(
sizeof
(pthread_t) * (
size_t
)pSched->numOfThreads);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for
(
int
i = 0; i < pSched->numOfThreads; ++i) {
if
(pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (
void
*)pSched) != 0) {
pError(
"%s: failed to create rpc thread, reason:%s"
, pSched->label,
strerror
(
errno
));
goto
_error;
}
}
pTrace(
"%s scheduler is initialized, numOfThreads:%d"
, pSched->label, pSched->numOfThreads);
return
(
void
*)pSched;
_error:
taosCleanUpScheduler(pSched);
return
NULL;
}
|
再来看读消息的taosProcessSchedQueue函数这其实是消费者一方的实现,这个函数的主要逻辑是 。
1.使用无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续向下处理 。
2.在操作msg前,加入互斥体防止msg被误用.
3.读操作完毕后修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。同时退出互斥体.
4.对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6 。
具体代码及注释如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
void
*taosProcessSchedQueue(
void
*param) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
//注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理
while
(1) {
if
(sem_wait(&pSched->fullSem) != 0) {
pError(
"wait %s fullSem failed, errno:%d, reason:%s"
, pSched->label,
errno
,
strerror
(
errno
));
if
(
errno
== EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
continue
;
}
}
//加入互斥体防止msg被误用。
if
(pthread_mutex_lock(&pSched->queueMutex) != 0)
pError(
"lock %s queueMutex failed, reason:%s"
, pSched->label,
strerror
(
errno
));
msg = pSched->queue[pSched->fullSlot];
memset
(pSched->queue + pSched->fullSlot, 0,
sizeof
(SSchedMsg));
//读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
//读取完毕修改退出互斥体
if
(pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError(
"unlock %s queueMutex failed, reason:%s\n"
, pSched->label,
strerror
(
errno
));
//读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6
if
(sem_post(&pSched->emptySem) != 0)
pError(
"post %s emptySem failed, reason:%s\n"
, pSched->label,
strerror
(
errno
));
if
(msg.fp)
(*(msg.fp))(&msg);
else
if
(msg.tfp)
(*(msg.tfp))(msg.ahandle, msg.thandle);
}
}
|
最后写消息的taosScheduleTask函数也就是生产的实现,其基本逻辑是 。
1.写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态.
2.加入互斥体防止msg被误操作,写入完成后退出互斥体 。
3.写队列完成后对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就继续向下.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
int
taosScheduleTask(
void
*qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
if
(pSched == NULL) {
pError(
"sched is not ready, msg:%p is dropped"
, pMsg);
return
0;
}
//在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。
if
(sem_wait(&pSched->emptySem) != 0) pError(
"wait %s emptySem failed, reason:%s"
, pSched->label,
strerror
(
errno
));
//加入互斥体防止msg被误操作
if
(pthread_mutex_lock(&pSched->queueMutex) != 0)
pError(
"lock %s queueMutex failed, reason:%s"
, pSched->label,
strerror
(
errno
));
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if
(pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError(
"unlock %s queueMutex failed, reason:%s"
, pSched->label,
strerror
(
errno
));
//在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。
if
(sem_post(&pSched->fullSem) != 0) pError(
"post %s fullSem failed, reason:%s"
, pSched->label,
strerror
(
errno
));
return
0;
}
|
从并发模型来看,Go和Rust都有channel这个概念,也都是通过Channel来实现线(协)程间的同步,由于channel带有读写状态且保证数据顺序,而且channel的封装程度和效率明显可以做的更高,因此Go和Rust官方都会建议使用channel(通信)来共享内存,而不是使用共享内存来通信.
为了让帮助大家找到区别,我们先以Java为例来,看一下没有channel的高级语言Java,生产者消费者该如何实现,代码及注释如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
public
class
Storage {
// 仓库最大存储量
private
final
int
MAX_SIZE =
10
;
// 仓库存储的载体
private
LinkedList<Object> list =
new
LinkedList<Object>();
// 锁
private
final
Lock lock =
new
ReentrantLock();
// 仓库满的信号量
private
final
Condition full = lock.newCondition();
// 仓库空的信号量
private
final
Condition empty = lock.newCondition();
public
void
produce()
{
// 获得锁
lock.lock();
while
(list.size() +
1
> MAX_SIZE) {
System.out.println(
"【生产者"
+ Thread.currentThread().getName()
+
"】仓库已满"
);
try
{
full.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
list.add(
new
Object());
System.out.println(
"【生产者"
+ Thread.currentThread().getName()
+
"】生产一个产品,现库存"
+ list.size());
empty.signalAll();
lock.unlock();
}
public
void
consume()
{
// 获得锁
lock.lock();
while
(list.size() ==
0
) {
System.out.println(
"【消费者"
+ Thread.currentThread().getName()
+
"】仓库为空"
);
try
{
empty.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println(
"【消费者"
+ Thread.currentThread().getName()
+
"】消费一个产品,现库存"
+ list.size());
full.signalAll();
lock.unlock();
}
}
|
在Java、C#这种面向对象,但是没有channel语言中,生产者、消费者模式至少要借助一个lock和两个信号量共同完成。其中锁的作用是保证同是时间,仓库中只有一个用户进行数据的修改,而还需要表示仓库满的信号量,一旦达到仓库满的情况则将此信号量置为阻塞状态,从而阻止其它生产者再向仓库运商品了,反之仓库空的信号量也是一样,一旦仓库空了,也要阻其它消费者再前来消费了.
我们刚刚也介绍过了Go语言中官方推荐使用channel来实现协程间通信,所以不需要再添加lock和信号量就能实现模式了,以下代码中我们通过子goroutine完成了生产者的功能,在在另一个子goroutine中实现了消费者的功能,注意要阻塞主goroutine以确保子goroutine能够执行,从而轻而易举的就这完成了生产者消费者模式。下面我们就通过具体实践中来看一下生产者消费者模型的实现.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package main
import (
"fmt"
"time"
)
func Product(ch chan<- int) { //生产者
for i := 0; i < 3; i++ {
fmt.Println("Product produceed", i)
ch <- i //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.
}
}
func Consumer(ch <-chan int) {
for i := 0; i < 3; i++ {
j := <-ch //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.
fmt.Println("Consmuer consumed ", j)
}
}
func main() {
ch := make(chan int)
go Product(ch)//注意生产者与消费者放在不同goroutine中
go Consumer(ch)//注意生产者与消费者放在不同goroutine中
time.Sleep(time.Second * 1)//防止主goroutine退出
/*运行结果并不确定,可能为
Product produceed 0
Product produceed 1
Consmuer consumed 0
Consmuer consumed 1
Product produceed 2
Consmuer consumed 2
*/
}
|
可以看到和Java比起来使用GO来实现并发式的生产者消费者模式的确是更为清爽了.
不得不说Rust的难度实在太高了,虽然笔者之前在汇编、C、Java等方面的经验可以帮助我快速掌握Go语言。但是假期看了两天Rust真想大呼告辞,这尼玛也太劝退了。在Rust官方提供的功能中,其实并不包括多生产者、多消费者的channel,std:sync空间下只有一个多生产者单消费者(mpsc)的channel。其样例实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
use std::sync::mpsc;
use std::
thread
;
use std::
time
::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
let tx2 = mpsc::Sender::clone(&tx);
thread
::spawn(move || {
let vals = vec![
String::from(
"1"
),
String::from(
"3"
),
String::from(
"5"
),
String::from(
"7"
),
];
for
val in vals {
tx1.send(val).unwrap();
thread
::sleep(Duration::from_secs(1));
}
});
thread
::spawn(move || {
let vals = vec![
String::from(
"11"
),
String::from(
"13"
),
String::from(
"15"
),
String::from(
"17"
),
];
for
val in vals {
tx.send(val).unwrap();
thread
::sleep(Duration::from_secs(1));
}
});
thread
::spawn(move || {
let vals = vec![
String::from(
"21"
),
String::from(
"23"
),
String::from(
"25"
),
String::from(
"27"
),
];
for
val in vals {
tx2.send(val).unwrap();
thread
::sleep(Duration::from_secs(1));
}
});
for
rec in rx {
println!(
"Got: {}"
, rec);
}
}
|
可以看到在Rust下实现生产者消费者是不难的,但是生产者可以clone多个,不过消费者却只能有一个,究其原因是因为Rust下没有GC也就是垃圾回收功能,而想保证安全Rust就必须要对于变更使用权限进行严格管理。在Rust下使用move关键字进行变更的所有权转移,但是按照Rust对于变更生产周期的管理规定,线程间权限转移的所有权接收者在同一时间只能有一个,这也是Rust官方只提供MPSC的原因, 。
1
2
3
4
5
6
7
8
|
use std::
thread
;
fn main() {
let s =
"hello"
;
let handle =
thread
::spawn(move || {
println!(
"{}"
, s);
});
handle.join().unwrap();
}
|
当然Rust下有一个API比较贴心就是join,他可以所有子线程都执行结束再退出主线程,这比Go中要手工阻塞还是要有一定的提高。而如果你想用多生产者、多消费者的功能,就要入手crossbeam模块了,这个模块掌握起来难度也真的不低.
通过上面的比较我们可以用一张表格来说明几种主流语言的情况对比:
语言 | 安全性 | 运行速度 | 进程启动速度 | 学习难度 |
C | 低 | 极快 | 极快 | 困难 |
Java | 高 | 一般 | 一般 | 一般 |
Go | 高 | 较快 | 较快 | 一般 |
Rust | 高 | 极快(基本比肩C) | 极快(基本比肩C) | 极困难 |
可以看到Rust以其高安全性、基本比肩C的运行及启动速度必将在Serverless的时代独占鳌头,Go基本也能紧随其后,而C语言程序中难以避免的野指针,Java相对较低的运行及启动速度,可能都不太适用于函数式运算的场景,Java在企业级开发的时代打败各种C#之类的对手,但是在云时代好像还真没有之前统治力那么强了,真可谓是打败你的往往不是你的对手,而是其它空间的降维打击.
这篇文章的内容就到这了,希望能给你带来帮助,也希望您可以多多关注我的更多内容! 。
原文链接:https://blog.csdn.net/BEYONDMA/article/details/117868448 。
最后此篇关于C和Java没那么香了,Serverless时代Rust即将称王?的文章就讲到这里了,如果你想了解更多关于C和Java没那么香了,Serverless时代Rust即将称王?的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
在我的 serverless.yml 中,我使用 serverless-tscpaths 插件来解析 tsconfig 中定义的路径,并使用 serverless-plugin-optimize 来缩
我阅读了这个文档:https://serverless.com/framework/docs/providers/google/guide/services/ users/ serverless.
使用 Serverless framework ,我希望能够从环境变量更改 AWS 区域。 provider: name: aws region: ${env:AWS_REGION} 然后,A
我想开始使用无服务器框架来管理我公司的 lambda 部署,但我们处理 PHI 的安全性非常严格。我们的合规总监和 CTO 担心将我们的 AWS key 和 secret 传递给另一家公司。 当做 s
这是我的 serverless.yml 文件中的一个片段: Resources: LogGroupInfo: Type: 'AWS::Logs::LogGroup' Propert
我正在使用 AWS Aurora serverless 设置一个新数据库,并且需要启用 binlog。我想我已经按原样遵循了文档,但无法使其正常工作。我该如何设置? 按照文档,以下是我尝试启用 bin
我不想在 serverless.yml 中手动定义服务名称,而是想从 package.json 中读取 name 最佳答案 为了实现基于 package.json 应用程序名称的动态服务名称,我利用了
我将以下内容用作自定义 serverless-dotenv-plugin 插件配置: 风俗: dotenv: 路径:.env-${opt:stage, 'local'} 但我真正想得到的是当我不提供任
我正在使用无服务器模板并且一切正常,直到突然我的所有部署开始忽略 .env文件。 我搜索了documentation它说如果我想使用 .env 文件中的环境变量,我现在必须添加 useDotenv:
我正在使用新的无服务器 TypeScript monorepo 启动一个新项目!用过 aws-nodejs-typescript模板,它给出了 serverless.ts配置文件。几周后,我现在在命令
https://serverless.com/framework/docs/providers/aws/guide/serverless.yml/ provider: stackTags:
各位 我正在尝试设置我的第一个 NestJS 应用程序。它由 AWS 上的无服务器提供支持。 我创建了一个简单的 Controller ,它有一个服务作为依赖项。当我使用 HTTP 客户端访问端点时,
AWS::Serverless::Api 和 AWS::Serverless::HttpApi 之间有什么区别? 据我了解,AWS::Serverless::HttpApi 似乎配置了 HTTP AP
我正在尝试使用 serverless.yml 替换 @vendia/serverless-express v2 示例中的默认 sam-template,以便通过无服务器部署进行部署 https://g
I am new to using serverless framework ,I wanted to create three different environments dev,Qa,pr
我希望向本地运行的无服务器框架 Node 应用程序添加状态。我遇到了官方的 DynamoDb docker 镜像,我想使用无服务器框架以及在 localhost:8000 公开的 docker 上运行
我有一个 AWS CodeBuild 项目,它尝试安装无服务器框架,但在标题中返回错误,但它说框架已成功安装。我的理解是这无法安装 snappy 模块。为什么会这样?我该如何解决? 我在 builds
无服务器是云原生的子集或属性吗?或者是另一种方式——云原生是无服务器的子集或属性吗? Nathan Aw(新加坡) 最佳答案 云原生 是一种更通用的方法来构建和运行利用云计算的应用程序。 无服务器 更
Serverless 架构演进 Serverless架构风格挑战了软件设计和软件部署基础的现状,以实现最佳开发、最优运营和最优的管理开销。虽然它继承了微服务架构MSA的基本概念,但它已被赋予了新的
前言 当您第一次接触 Serverless 的时候,有一个不那么明显的新使用方式:与传统的基于服务器的方法相比,Serverless 服务平台可以使您的应用快速水平扩展,并行处理的工作更加有效。这
我是一名优秀的程序员,十分优秀!