- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
之前通过看书、看视频和博客拼凑了一个webserver,然后有一段时间没有继续整这个项目 现在在去看之前的代码,真的是相当之简陋,而且代码设计得很混乱,我认为没有必要继续在屎堆上修改了,于是开始阅读别人的较为规范的开源实现 目的是尝试理解一个可用级别的webserver需要具备哪些特性,以及在具体实现过程中要掌握的设计方法 下面是阅读源码时的记录,个人理解,仅供参考 。
或者按之前的理解,也可以叫做"I/O处理单元" 。
这部分在原来的代码中,我是混在一起写的,这样不好,很混乱,可维护性差 。
在TinyWeb中,这部分被分为 eventListen 和 eventLoop 两部分 。
该函数应该使用socketAPI来创建一系列负责监听的文件描述符 。
流程大概是:创建socket文件描述符->使用setsockopt设置套接字(优雅关闭)->创建一个address结构体用于存放socket的地址->bind绑定端口->监听所创建的文件描述符->用epoll创建内核事件表->将监听的fd加入epoll对象中 。
上述流程是Linux下经典的网络编程,用代码写出来大概就是:
void WebServer::eventListen(){
m_listenfd = socket(PF_INET, SOCK_STREAM, 0);//创建一个套接字用于监听
assert(m_listenfd >= 0);
//优雅关闭连接(就是在关闭套接字前等待一会)
if (0 == m_OPT_LINGER){//"Yoda 表达式"
struct linger tmp = {0, 1};
//设置 SO_LINGER 选项,用于控制关闭套接字时的行为,包括处理未发送完的数据。
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
else if (1 == m_OPT_LINGER){
struct linger tmp = {1, 1};
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
int ret = 0;//检查这两个函数是否成功执行
struct sockaddr_in address;//创建一个address结构体
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(m_port);
int flag = 1;
setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address));//绑定端口
assert(ret >= 0);
ret = listen(m_listenfd, 5);//监听
assert(ret >= 0);
utils.init(TIMESLOT);//初始化定时器,在头文件中定义,最小超时时间为5秒
//epoll创建内核事件表
epoll_event events[MAX_EVENT_NUMBER];
m_epollfd = epoll_create(5);//创建epoll对象
assert(m_epollfd != -1);
//将监听的文件描述符添加到epoll对象中
utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);
http_conn::m_epollfd = m_epollfd;//赋值
}
在之前版本的代码中,监听部分就到此结束了,然而一个比较完备的webserver还需要一些额外的东西.
上述代码中,我们还在该函数内初始化了一个定时器,这主要是为了避免在使用管道处理各种信号时发生竞态现象 。
因此在该函数中,我们还需要创建一个管道用于处理信号 。
使用socketpair创建一个无名管道(因为是在同一台机器上进行信号通信,由操作系统内核维护)m_pipefd 。
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);
assert(ret != -1);
对m_pipefd进行一些设置 。
utils.setnonblocking(m_pipefd[1]);//将m_pipefd[1](写端)设置为非阻塞模式,以确保在读写操作时不会被阻塞。
utils.addfd(m_epollfd, m_pipefd[0], false, 0);//将m_pipefd[0](读端)添加到m_epollfd所代表的epoll事件监听集合中,用于监听该文件描述符上的读事件。
utils.addsig(SIGPIPE, SIG_IGN);//忽略SIGPIPE信号,这样当向一个已关闭的socket发送数据时,不会产生SIGPIPE信号导致进程异常终止。
//注册信号处理函数utils.sig_handler来处理SIGALRM和SIGTERM信号。
utils.addsig(SIGALRM, utils.sig_handler, false);
utils.addsig(SIGTERM, utils.sig_handler, false);
Utils是在lst_timer.h中声明的一个工具类; 。
addsig是Utils的成员函数,用于设置信号函数; 。
然后,设置一个alarm,当时间达到TIMESLOT就触发告警,最后将经由管道接收的m_pipefd和m_epollfd赋值给静态成员变量 。
alarm(TIMESLOT);//5秒后,进程将收到 SIGALRM
//工具类,信号和描述符基础操作
//通过管道,将接收到的m_pipefd和m_epollfd赋值给静态成员变量
Utils::u_pipefd = m_pipefd;
Utils::u_epollfd = m_epollfd;
自此,"事件监听"函数完成了 。
回顾一下,如果要使用 同步 I/O (以 epoll_wait 为例)实现 Reactor 模式,那么工作流程是:
- 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
- 主线程调用 epoll_wait 等待 socket 上有数据可读。
- 当 socket 上有数据可读时, epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它从 socket 读取数据,并处理客户请求,然后往 epoll
内核事件表中注册该 socket 上的写就绪事件。- 当主线程调用 epoll_wait 等待 socket 可写。
- 当 socket 可写时,epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。
- 睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户请求的结果。
在事件监听函数中,已经实现了第一点。我们创建了套接字,为其设置地址并绑定,然后监听它,把它扔到epoll对象中,由此添加进内核事件表.
此后,由创建了一个无名管道用于接收某个东西传过来的信号,管道的写端设置为非阻塞,读端也被仍到epoll对象 。
管道接收的是谁的信号呢?从"工作流程"中可以推出,应该是epoll_wait.
这里的管道是在等待epoll_wait发出的信号,即事件触发。这部分则由"事件循环"函数负责 。
这下轮到epoll_wait出场了! 。
在之前代码的讨论中,我有提到"Reactor组件",该组件是Reactor模式的核心 。
那么事件循环函数就是Reactor组件.
循环嘛,顾名思义,这玩意主要就是由一个while循环构成(大概率是死循环),不断循环检测有无事件发生 。
void WebServer::eventLoop(){
bool timeout = false;
bool stop_server = false;
while (!stop_server){
}
}
在while循环中,主角是epoll_wait。epoll_wait()会等待事件的发生,一旦事件发生,会将该事件的相关信息存储到events数组(头文件中声明并定义)中,主线程会遍历该数组并处理所有发生的事件.
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
epoll_wait()的返回值是可用于 I/O 操作的文件描述符(套接字)的数量。(非阻塞调用) 。
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR){
LOG_ERROR("%s", "epoll failure");
break;
}
拿到当前触发的事件后,遍历这些事件,一件一件将其处理掉。这些储存在事件数组event中的所谓的"事件",其实就是一些fd,因此先从event中取出这些fd.
for (int i = 0; i < number; i++){
int sockfd = events[i].data.fd;//这里把它们命名为sockfd
}
还记得事件监听函数中创建的socket(m_listenfd)吗?该socket用于监听指定的网络端口(就是你绑定的那个) 。
如果监听socket触发了可读事件,那么我们需要在事件循环中调用相应的回调函数进行处理.
需要处理的事情挺多的,大概有: 处理新的客户连接 、 对方异常断开或错误异常 、 处理信号(管道读事件) 、 可读事件 、 可写事件 ,一个个看 。
如果我们从event拿到的是一个和m_listenfd相等的玩意,那就表明有新的客户连接.
if (sockfd == m_listenfd){//处理新到的客户连接
bool flag = dealclinetdata();
if (false == flag)
continue;
}
...
此时需要调用客户端处理函数 bool WebServer::dealclinetdata() 。
这里为了考虑到服务器的功能性,提供m_LISTENTrigmode来设置处理客户端连接的模式(一般采用非阻塞即可,在之前版本代码中默认非阻塞) 。
bool WebServer::dealclinetdata(){
struct sockaddr_in client_address;//创建一个sockaddr_in结构体
socklen_t client_addrlength = sizeof(client_address);//用于保存客户端地址长度
if (0 == m_LISTENTrigmode){//非阻塞模式监听
...
}
else{//阻塞模式监听
...
return false;
}
return true;
}
非阻塞模式下,当新的连接产生时服务器会使用accept函数创建一个新的连接socket(connfd),这个新的socket会与客户端的socket建立起通信连接.
然后,通过调用 void WebServer::timer(int connfd, struct sockaddr_in client_address) 来设置连接的定时器.
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0){
LOG_ERROR("%s:errno is:%d", "accept error", errno);
return false;
}
if (http_conn::m_user_count >= MAX_FD){//目前连接满了
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
return false;
}
timer(connfd, client_address);
来看一下定时器又干了什么,定时器函数接收一个与客户端连接socket(即connfd)和客户端地址信息client_address.
简单来说:调用定时器函数,将socket(connfd)的 连接状态 和 信息 加入定时器,进行管理.
详细的关于定时器分析见: 定时器 。本小节还是以梳理处理新客户端的连接流程为主 。
总而言之,dealclinetdata()对新连接的客户端做的处理就是:
1、帮它创了个新的fd;(conndfd) 。
2、然后把这个fd加入定时器; 。
3、该定时器为fd创建了http_conn对象用于管理当前新建连接上客户端的所有操作; 。
我们从events数组中获取一个fd,并获取该fd的events也就是当前fd发生的事件,如果事件信息中包含EPOLLRDHUP | EPOLLHUP | EPOLLERR其中之一,那么意味着这个fd出现了异常断开或错误异常。此时要通过fd获取到该连接的定时器,并将其删除 。
连接就不用删了,应为可能已经断开不存在了 。
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//服务器端关闭连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
这里的删除操作调用的是 WebServer::deal_timer(util_timer *timer, int sockfd) ,其实现如下:
void WebServer::deal_timer(util_timer *timer, int sockfd){
timer->cb_func(&users_timer[sockfd]);
if (timer){
utils.m_timer_lst.del_timer(timer);
}
LOG_INFO("close fd %d", users_timer[sockfd].sockfd);
}
该函数实现很简单,就是直接去触发定时器的回调函数,并且调用定时器中的del_timer将定时器删除 。
补充一下各个信号的意思:
EPOLLRDHUP
:表示套接字连接被对方关闭。它是对等方(graceful shutdown)关闭连接的一种方式。EPOLLHUP
:表示发生了挂起事件。它指示套接字上发生了一些异常情况,例如连接重置或对端关闭连接。EPOLLERR
:表示发生了错误事件。它指示套接字上发生了一些错误条件,例如连接错误或非阻塞操作产生的错误。
在事件监听中,我们创建了一个无名管道用来获取epoll_wait监听到的信息 。
m_pipefd[0] 表示管道的读取端文件描述符,当 sockfd 等于 m_pipefd[0] 时,表示有数据可供从管道中读取.
同时,我们检查当前fd的事件标志,EPOLLIN表示读事件。两者结合表示当前管道的读取端存在可读数据,于是调用dealwithsignal来处理读事件.
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)){
bool flag = dealwithsignal(timeout, stop_server);
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
以下是dealwithsignal的定义 。
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server){
int ret = 0;
int sig;
char signals[1024];
ret = recv(m_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1){
return false;
}
else if (ret == 0){
return false;
}
else{
for (int i = 0; i < ret; ++i){
switch (signals[i]){
case SIGALRM:
{
timeout = true;
break;
}
case SIGTERM:
{
stop_server = true;
break;
}
}
}
}
return true;
}
该函数从管道中接收读取到的数据,当没收到或者收到数据为0时均返回false 。
当正常接收到数据后,遍历,获取数据中的signals,根据不同值进行不同处理 。
该事件只是负责处理读数据过程中出现上述信号时的情况,如果读数据时还没读完是不会出现上述超时或者停止事件循环的信号的 。
下面要介绍的才是真正用于读取数据的事件 。
与上面情况一样,EPOLLIN表示读事件发生,只不过我们这次处理的是从读事件中拿到的数据 。
else if (events[i].events & EPOLLIN){
dealwithread(sockfd);
}
调用的处理函数是 WebServer::dealwithread(int sockfd) 。
void WebServer::dealwithread(int sockfd){
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel){//为什么这样就代表了reactor
if (timer){//如果当前fd绑定了定时器,调用一下adjust_timer将其调整到正确位置
adjust_timer(timer);
}
//若监测到读事件,将该事件放入请求队列
m_pool->append(users + sockfd, 0);
while (true){//为什么用while
if (1 == users[sockfd].improv){
if (1 == users[sockfd].timer_flag){
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else{
//proactor
if (users[sockfd].read_once()){
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
//若监测到读事件,将该事件放入请求队列
m_pool->append_p(users + sockfd);
if (timer){
adjust_timer(timer);
}
}
else{
deal_timer(timer, sockfd);
}
}
}
在Reactor模式中,首先会检测到有读事件发生,即有客户端发送数据到达。如果使用的是 m_actormodel 为1(即Reactor模式),那么会进行以下操作:
先检查与该 sockfd 关联的定时器 timer ,并调整定时器( adjust_timer(timer) )。然后将该读事件放入线程池的请求队列中,线程池中的run()函数对事件进行处理。然后会进入;一个死循环等待请求处理完成,在该循环中会不断检查 users[sockfd] 的状态标志,如果请求处理完成( users[sockfd].improv == 1 ,该标志由线程池中的run函数修改),则退出循环,继续处理其他事件.
当events信号为EPOLLOUT代表发生写事件 。
else if (events[i].events & EPOLLOUT){
dealwithwrite(sockfd);
}
此时调用 WebServer::dealwithwrite(int sockfd) 函数 。
void WebServer::dealwithwrite(int sockfd){
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel){//同疑问,为什么这就是reactor了
if (timer){//如果当前fd绑定了定时器,调用一下adjust_timer将其调整到正确位置
adjust_timer(timer);
}
m_pool->append(users + sockfd, 1);//将该fd加入到线程池的正确位置
while (true){//没理解这个循环的作用
if (1 == users[sockfd].improv){
if (1 == users[sockfd].timer_flag){
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else{
//proactor
if (users[sockfd].write()){
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
if (timer){
adjust_timer(timer);
}
}
else{
deal_timer(timer, sockfd);
}
}
}
与读事件的处理逻辑类似 。
void WebServer::eventLoop()
{
bool timeout = false;
bool stop_server = false;
while (!stop_server)
{
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR)
{
LOG_ERROR("%s", "epoll failure");
break;
}
for (int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
//处理新到的客户连接
if (sockfd == m_listenfd)
{
bool flag = dealclinetdata();
if (false == flag)
continue;
}
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
{
//服务器端关闭连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
//处理信号
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN))
{
bool flag = dealwithsignal(timeout, stop_server);
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
//处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN)
{
dealwithread(sockfd);
}
else if (events[i].events & EPOLLOUT)
{
dealwithwrite(sockfd);
}
}
if (timeout)
{
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
}
}
以上是一个webserver中的事件循环函数,用于处理监听到的各类事件,下面是负责处理管道读事件和写事件的函数 。
void WebServer::dealwithread(int sockfd)
{
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel)
{
if (timer)
{
adjust_timer(timer);
}
//若监测到读事件,将该事件放入请求队列
m_pool->append(users + sockfd, 0);
while (true)
{
if (1 == users[sockfd].improv)
{
if (1 == users[sockfd].timer_flag)
{
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else
{
//proactor
if (users[sockfd].read_once())
{
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
//若监测到读事件,将该事件放入请求队列
m_pool->append_p(users + sockfd);
if (timer)
{
adjust_timer(timer);
}
}
else
{
deal_timer(timer, sockfd);
}
}
}
void WebServer::dealwithwrite(int sockfd)
{
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel)
{
if (timer)
{
adjust_timer(timer);
}
m_pool->append(users + sockfd, 1);
while (true)
{
if (1 == users[sockfd].improv)
{
if (1 == users[sockfd].timer_flag)
{
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else
{
//proactor
if (users[sockfd].write())
{
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
if (timer)
{
adjust_timer(timer);
}
}
else
{
deal_timer(timer, sockfd);
}
}
}
请结合所给的代码,详细说明dealwithread和dealwithwrite函数分别都是怎么实现reactor模式的,为什么?
最后,这里还需要不断对定时器进行重新定时,以防止其触发超时信号导致连接被杀掉(?理解存疑) 。
if (timeout){
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
timer_handler()函数的定义如下 。
//定时处理任务,重新定时以不断触发SIGALRM信号
void Utils::timer_handler(){
m_timer_lst.tick();//详见补充说明:定时器链表实现
alarm(m_TIMESLOT);
}
至此,事件循环结束 。
(总结一下所有流程,然后概括reactor究竟是怎么实现的,与代码怎么对应上的) 。
因为在事件循环中要处理事件,而处理时要使用定时器对每个连接fd进行管理,所以都用到了定时器,单独抽出来细说 。
定时器的作用是 管理连接 和 超时处理 ,其接收一个与客户端连接的socket(即connfd)和客户端地址信息client_address作为参数,然后进行users数组的初始化操作 。
void WebServer::timer(int connfd, struct sockaddr_in client_address){
users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
...
}
users是在头文件webser.h中声明的一个数组,该数组中的数据类型是 http_conn* ,也就是指向一个http_conn的指针,每个http_conn对象代表一个客户端连接.
http_conn 类封装了处理客户端 HTTP 请求的功能和操作(在http_conn.h中声明)。它包含了处理请求报文、解析请求、生成响应等一系列与 HTTP 协议相关的操作。先不展开说明.
现在我们为connfd初始化好了一个 http_conn 对象,这玩意就保存在 users 数组的对应索引下。使用传入的connfd作为索引从该数组中取一个http_conn对象(?↓), 该对象就代表着新建立的连接中进行的一系列HTTP协议下的操作 .
然后,对该对象进行初始化(调用 http_conn 类的初始函数) 。
ps:看到这里的时候,我其实有一个问题: 谁最早创建了http_conn对象并把它加到users数组中? 是定时器↓ 。
connfd是我们通过事件循环中epoll_wait函数监听捕获信号后创建的socket,其代表着有新的客户端连接到webserver,因此此时会通过dealclinetdata()去处理该socket,处理的方式就是调用定时器函数并初始化定时器。(因为要用定时器管理连接) 。
在 timer() 函数中,通过 users 数组的索引 connfd 创建一个 http_conn 对象,该对象负责保存connfd与webserver交互过程中的所有操作.
梳理清楚之后继续 。
定时器中还创建一个新的 util_timer 对象,并为其设置相关属性.
将客户端的 地址信息 存储在 users_timer 数组(webser.h)中与 connfd 对应的位置.
将 当前连接的文件描述符 connfd 存储在 users_timer 数组中与 connfd 对应的位置.
...
//初始化client_data数据
//创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中
users_timer[connfd].address = client_address;
users_timer[connfd].sockfd = connfd;
util_timer *timer = new util_timer;
...
users_timer 数组的作用是为每个连接存储相关的定时器信息和客户端地址信息。这样可以方便地通过文件描述符查找对应的定时器或者客户端地址信息,并进行相关的操作,例如处理定时事件或关闭连接.
继续 。
前面我们创建了一个新的 util_timer 对象,timer,现在将 users_timer[connfd] 的地址赋值给 timer 的 user_data 成员变量,users_timer是一个数组,里面存储着每个连接的计时器信息.
将回调函数 cb_func (该函数在lst_timer.cpp中定义)赋值给 timer 的 cb_func 成员变量。这里的回调函数指的是在定时器到期时要执行的函数(详见: 定时器链表的实现 ).
...
timer->user_data = &users_timer[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);//获取当前时间
timer->expire = cur + 3 * TIMESLOT;
users_timer[connfd].timer = timer;
utils.m_timer_lst.add_timer(timer);
}
然后就是获取当前时间并根据TIMESLOT参数计算定时器的超时时间,将创建的定时器对象 timer 赋值给对应连接(connfd)的计时器信息( users_timer[connfd].timer ).
当上述一切操作处理完后,新建立的连接fd得到了它的定时器信息,但是我们要管理这些定时器对象,如何做?用 链表 呗 。
在定时器函数的最后一行代码中,将定时器对象添加到了定时器链表 m_timer_lst 中,该链表的定义位于lst_timer.cpp(详见: 定时器链表的实现 ) 。
至此,定时器函数说明完毕。其创建一个了定时器对象,并为该对象设置相关参数,然后将定时器添加到定时器链表中,以便在事件循环中进行定时器的管理和触发.
从上面的梳理也能看到,Web服务器中的定时器通常需要同时管理多个定时任务,为了提供高效的插入、删除和排序操作,节省内存空间,并具有灵活性和可扩展性,我们需要通过链表来实现定时器 。
定时器链表与定时器函数的实现代码是分开的, 定时器链表管理类sort_timer_lst 是以一个类的形式在lst_timer.h中声明 。
class sort_timer_lst{
public:
sort_timer_lst();
~sort_timer_lst();
void add_timer(util_timer *timer);//用于向定时器链表中添加定时器
void adjust_timer(util_timer *timer);//用于调整定时器的位置,当定时器的到期时间延后时需要调用该函数
void del_timer(util_timer *timer);//用于从链表中删除定时器
void tick();//用于处理到期的定时器
private:
void add_timer(util_timer *timer, util_timer *lst_head);//辅助函数,用于插入定时器到指定节点之后
util_timer *head;//分别指向链表的头部和尾部
util_timer *tail;
};
严格来说,sort_timer_lst也不是真正"定时器链表",这个类只是去使用了定时器链表,并提供一系列配套的成员函数用以管理定时器链表.
下面将分别介绍.
在定时器函数timer()中,我们创建的是定时器( util_timer )对象,这些就是我们需要管理的节点,即 链表节点 .
以下是"节点类"util_timer的定义,该类真正给出了"定时器链表"的定义,由此可知定时器链表被定义为一个双向链表 。
class util_timer
{
public:
util_timer() : prev(NULL), next(NULL) {}
public:
time_t expire;
void (* cb_func)(client_data *);
client_data* user_data;
util_timer* prev;
util_timer* next;
};
从util_timer提供的构造函数看,这个类充当的是一个双向链表的节点,其与普通的链表有有点不同,该节点中还保存了定时器超时时间expire以及客户端的相关数据user_data,并且还提供一个指针指向回调函数.
expire是一个时间变量,其数据类型为time_t 。
平时刷题时定义的ListNode通常有一个val用来存放节点值,这里的user_data就是节点值。 client_data 是一个结构体,其中存放了客户端的相关数据 。
struct client_data{
sockaddr_in address;//存储客户端的地址信息
int sockfd;//表示客户端的套接字描述符
util_timer *timer;//关联客户端和定时器
};
因此,user_data变量保存了客户端连接的地址信息和fd,以及与客户端绑定的定时器(从定时器函数的代码中可知,该定时器由add_timer函数添加到client_data结构体中) 。
然后来看回调函数.
所谓回调函数(Callback Function)是指将某种可以作为参数传递给另一个函数的函数。这种函数可以作为参数传递给另一个函数,当特定的事件发生后,调用传入的"参数函数"进行特定的操作。(常用于异步编程、事件处理) 。
class util_timer{
...
public:
...
void (* cb_func)(client_data *);
...
};
在util_timer中,有一个用于指向回调函数cb_func的指针,该回调函数的定义位于lst_timer.cpp 。
cb_func用于处理定时器到期时的操作。回调函数的参数是一个指向 client_data 结构体的指针.
void cb_func(client_data *user_data){
//使用epoll_ctl函数从 epoll 实例中删除文件描述符(socket)。
epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);//确保user_data指针不为空。
close(user_data->sockfd);//关闭之前处理的文件描述符(socket),释放资源
http_conn::m_user_count--;//将http_conn类中的静态成员变量m_user_count减少1
}
说白了就是,定时器链表中管理的某个连接fd的定时器超时后,该定时器对象自身会调用一个回调函数,通过回调函数释放当前连接的fd所占用的资源.
看完了定时器链表节点的定义,现在来看看负责实际管理链表的一些成员函数.
由这些功能函数构建的定时器容器为带头尾结点的升序双向链表。sort_timer_lst为每个连接创建一个定时器,将其添加到链表中,并按照超时时间升序排列。执行定时任务时,将到期的定时器从链表中删除.
class sort_timer_lst{
public:
sort_timer_lst();
~sort_timer_lst();
void add_timer(util_timer *timer);//用于向定时器链表中添加定时器
void adjust_timer(util_timer *timer);//用于调整定时器的位置,当定时器的到期时间延后时需要调用该函数
void del_timer(util_timer *timer);//用于从链表中删除定时器
void tick();//用于处理到期的定时器
private:
void add_timer(util_timer *timer, util_timer *lst_head);//辅助函数,用于插入定时器到指定节点之后
...
};
add_timer 函数就是 用于构造链表的函数 ,该函数将目标定时器添加到链表中,添加时按照升序添加.
如果链表为空,则直接将定时器作为首节点插入。如果定时器的到期时间小于链表头部定时器的到期时间,将定时器作为新的首节点插入。否则,调用辅助函数 add_timer(timer, head) 插入定时器.
void sort_timer_lst::add_timer(util_timer* timer){
if (!timer) return;
if (!head){//当前链表为空,头节点和尾节点是同一个
head = tail = timer;
return;
}
if (timer->expire < head->expire){//如果当前节点的超时时间小于头节点,令其为新的头节点
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}
adjust_timer 函数用于调整定时器的位置,当定时器的到期时间延后时需要调用该函数.
首先找到定时器的下一个节点 tmp , 。
如果 tmp 为空或者定时器的到期时间小于 tmp 的到期时间,说明定时器无需调整位置,直接返回.
如果待插入的定时器timer节点的过期时间比链表中其他节点的过期时间都要小,那么该节点要称为链表的头部节点,因此要修改头部指针并重新调用 add_timer 函数插入定时器.
否则,修改定时器的前后指针,并调用 add_timer(timer, timer->next) 插入定时器.
void sort_timer_lst::adjust_timer(util_timer* timer){
if (!timer) return;
util_timer* tmp = timer->next;
if (!tmp || (timer->expire < tmp->expire)){//定时器无需调整位置
return;
}
if (timer == head){//当前节点timer如果超时时间和头节点一样,那就要把它插到头节点后面
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
}
else{//先删除当前节点timer,然后再使用add_timer将其插入到正确的位置
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}
解释一下else的情况,else意味着当前节点的超时时间变大了,没有变小,因此不会将其插到head后面 。
什么意思呢?简单来说,如果满足else,那么现在这个timer不是新的timer,而是一个之前就存在于链表中的timer 。
这个timer随着时间的推移,其超时时间 expire 值已经发生了变化(肯定变大了),所以要更新这个timer的位置(往链表尾部移动) 。
在调整节点位置之前,我们必须从链表中将其删除。这是因为节点的 expire 值已经改变,如果我们不将其删除,它可能会位于错误的位置.
删完之后,使用add_timer再将其插入正确的位置(根据 expire 值找到正确的位置) 。
总结起来, adjust_timer 函数的目的是重新调整定时器链表中节点的位置,以使链表仍然保持按照时间顺序排序。为了达到这个目的,我们需要先将要调整的节点从链表中删除,然后根据其新的 expire 值重新插入到正确的位置.
del_timer 函数用于从链表中删除定时器。首先判断定时器是否是链表中唯一一个节点,如果是,则直接删除并将头尾指针置空。否则,根据定时器是否是头部或尾部节点进行不同的处理,然后修改前后节点的指针,最后删除定时器对象.
void sort_timer_lst::del_timer(util_timer *timer){
if (!timer) return;
if ((timer == head) && (timer == tail)){//如果链表中只有当前一个定时器
delete timer;//删除后将头尾指针置空
head = NULL;
tail = NULL;
return;
}
//当前节点位于链表头尾处时,分别处理
if (timer == head){//如果当前定时器是头节点,将head指针指向下一个节点
head = head->next;
head->prev = NULL;//其prev再指向空便完成删除
delete timer;//删除当前节点
return;
}
if (timer == tail){//如果当前定时器位于链表尾部,将tail指针指向 前一个节点
tail = tail->prev;
tail->next = NULL;//next指针指向空
delete timer;//删除当前节点
return;
}//在其他位置就直接删除就行
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
tick 函数用于处理到期的定时器.
void sort_timer_lst::tick(){
if (!head) return;//如果链表中没有定时器,那么就直接返回
time_t cur = time(NULL);//获取当前系统时间
util_timer* tmp = head;//将一个用于遍历的指针指向head
while (tmp){
if (cur < tmp->expire){//此时还没有超时
break;//所以退出循环
}//如果超时了,调用回调函数把连接对象删除了,即删除当前tmp指向的节点
tmp->cb_func(tmp->user_data);
head = tmp->next;//删除tmp
if (head){//如果删的是头节点的话,还要改一下prev,因为删除时,头节点已经变为原头节点的下一个节点了
head->prev = NULL;
}
delete tmp;
tmp = head;//更新tmp指针
}
}
解释一下 cur < tmp->expire ,也就是定时器设定时间的机制 。
整个流程是这样的:当有客户端连接时,我们为其创建一个socket,该fd同时会被一个定时器绑定,整个定时器在初始化时获取的时间是通过系统函数time得到的系统时间,然后因为我们人为的设定了一个超时时间参数TIMESLOT(15秒),因此定时器对象中的超时时间expire便是: 当前系统时间+15秒 。在检查定时器时间时,我们也是通过time获取系统时间cur,显然cur会逐渐接近超时时间expire,当 cur < tmp->expire 时,定时器还没有超时,大于等于就超时了,此时启动对该定时器的删除工作.
主要的成员函数介绍完了,回顾一下: add_timer 函数构造双向链表、 adjust_timer 函数调整定时器在链表中的位置、 del_timer 函数删除某个定时器、 tick 函数处理超时定时器.
接下来要介绍一下辅助函数 add_timer(util_timer *timer, util_timer *lst_head) ,用于插入定时器到指定节点之后 。
void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head){
util_timer *prev = lst_head;//双指针,第一个指向head
util_timer *tmp = prev->next;
while (tmp){//遍历链表
if (timer->expire < tmp->expire){//若当前遍历定时器的超时时间晚于输入定时器的超时时间
//那么输入的定时器timer就应该在这里插入
prev->next = timer;//将timer插入到两个指针之间
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}//不满足插入条件即timer的过期时间大于或等于tmp的过期时间,就移动双指针
prev = tmp;
tmp = tmp->next;
}//遍历结束,还没有找到插入位置,就把节点插到链表尾部
if (!tmp){//将timer插入到prev和链表末尾之间
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
至此,定时器链表的实现说明完毕 。
回顾一下流程:不论是什么处理任务(客户端产生新连接或者别的也好),只要调用了定时器函数,该定时器函数就会与输入的一个fd进行绑定。在初始化时,定时器会调用当前的系统时间,并在此基础上加上超时时间,形成当前定时器的超时时间.
之后就不断调用系统时间与超时时间比较,同时在链表中也不断比较定时器节点之间的时间间隔,由此移动定时器链表.
当系统时间也到达超时时间后,定时器超时,使用成员函数将其从定时器链表中删除.
主函数main.cpp调用了log_write(),该函数的定义位于webserver.cpp中 。
void WebServer::log_write(){
if (0 == m_close_log){
//初始化日志
if (1 == m_log_write)
Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 800);
else
Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 0);
}
}
可以看到这里面主要是调用Log类中的方法来实现功能,那么Log类应该就是要分析的日志系统了 。
不出意外,日志系统有两个文件:log.cpp和log.h,先看看声明 。
class Log{
public:
//C++11以后,使用局部变量懒汉不用加锁
static Log *get_instance(){
static Log instance;
return &instance;
}
static void *flush_log_thread(void *args){
Log::get_instance()->async_write_log();
}
//可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列
bool init(const char *file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
void write_log(int level, const char *format, ...);
void flush(void);
private:
Log();
virtual ~Log();
void *async_write_log(){
string single_log;
//从阻塞队列中取出一个日志string,写入文件
while (m_log_queue->pop(single_log))
{
m_mutex.lock();
fputs(single_log.c_str(), m_fp);
m_mutex.unlock();
}
}
private:
char dir_name[128]; //路径名
char log_name[128]; //log文件名
int m_split_lines; //日志最大行数
int m_log_buf_size; //日志缓冲区大小
long long m_count; //日志行数记录
int m_today; //因为按天分类,记录当前时间是那一天
FILE *m_fp; //打开log的文件指针
char *m_buf;
block_queue<string> *m_log_queue; //阻塞队列
bool m_is_async; //是否同步标志位
locker m_mutex;
int m_close_log; //关闭日志
};
有点复杂啊我靠 。
其实仔细看的话,主要的函数就只有:初始化函数init、写日志函数write_log以及flush函数,除此之外还有一个flush_log_thread函数(有点怪,应该是负责一步任务的) 。
先从初始化函数开始吧,在这之前,还需要了解一下 单例模式 。
什么是单例模式?单例模式(Singleton Pattern)是一种创建和使用对象的设计模式。它确保类只有一个实例,并提供全局访问点以便其他对象可以使用该实例.
单例模式通常用于需要全局访问点且只能有一个实例的情况,例如 日志记录器 、 数据库连接池 等。其主要特点是:
GetInstance
或 Instance
),该函数负责返回类的唯一实例。 单例模式的实现通常遵循以下步骤:
为什么一定要使用静态成员变量?
主要是为了保证数据的一致性和共享性 。
静态成员变量是属于类而不是实例的。这意味着 无论创建多少个类的实例,静态成员变量只有一份拷贝 。这样就可以确保所有实例都共享同一个变量.
在某些情况下,需要在不同的实例之间共享数据。例如,在一个多线程的环境中,如果多个实例需要访问和修改同一个数据,将数据定义为静态成员变量可以避免数据不一致的问题.
下面给一个例子来说明单例模式的代码实现 。
在饿汉式中,实例在类加载时就被创建 ,并且在整个程序生命周期内存在。这种方式 确保了线程安全 ,但可能会 增加程序启动时间和内存消耗 .
class Singleton {//饿汉式
private:
static Singleton* instance;
Singleton() {} // 私有构造函数
public:
static Singleton* getInstance() {
return instance;
}
};
Singleton* Singleton::instance = new Singleton(); // 在静态成员变量初始化时创建实例
int main() {
Singleton* singletonObj = Singleton::getInstance();
return 0;
}
在懒汉式中,实例在首次调用 getInstance() 方法时才会被创建 。这种方式延迟了实例的创建, 节省了内存资源 。然而, 懒汉式在多线程环境下需要进行额外的线程安全处理 ,以避免多个线程同时创建实例的问题.
class Singleton {
private:
static Singleton* instance;
Singleton() {} // 私有构造函数
public:
static Singleton* getInstance() {
if (instance == nullptr) {
instance = new Singleton();
}
return instance;
}
};
Singleton* Singleton::instance = nullptr; // 初始化为nullptr
int main() {
Singleton* singletonObj = Singleton::getInstance();
return 0;
}
单例模式是为了满足一些需要保证数据一致性的开发场景而设计的,简单来说就是通过一些处理让某一个功能类只能产生一个实例,且外部不能创建该类的实例.
为了实现上述目的,我们需要将类的构造函数私有化,同时创建一个静态成员变量来保存类的唯一实例。因为类的实例终究还是要提供给外界使用的,所以我们还要需要定义一个公共的静态成员函数,负责返回单例类的唯一实例 。
然后,在单例类的代码实现中,有饿汉式和懒汉式两种方式。饿汉式就是单例类在加载时其实例就会被创建,懒汉式则是需要首次调用公共静态成员函数(请求返回唯一实例)时才会创建.
有了上面的前置知识,现在可以来研究日志类是如何被设计为一个单例类的了 。
在Log类的声明中(Log.h),其构造和析构函数被声明为私有,以防止外部直接创建Log类的对象.
class Log{
private:
Log();
virtual ~Log();
};
按照单例模式的流程,现在我们需要创建一个静态成员变量来保存唯一实例并提供一个公共的静态成员函数供外界获取唯一实例 。
在该日志类中,使用公共的静态成员函数 get_instance() 来完成上述两步 。
class Log{
public:
static Log *get_instance(){
static Log instance;
return &instance;
}
};
在 get_instance() 中,创建一个静态的Log类指针变量 instance ,并将其初始化为一个Log类的唯一实例,调用该函数即可返回唯一实例 instance 。
为什么这里不用将static Log instance;(静态成员变量)声明为私有的?
因为在C++11之后,对于局部静态变量的初始化具备线程安全性 .
将其定义为局部静态变量即可,在作用域(包含它的函数或代码块)之外该变量是不可见的.
ps:日志类这里使用的是懒汉式 。
除了单例模式需要特别说明一下外,日志类Log本质上还是一个类,该怎么使用还是怎么使用就行 。
在创建唯一实例的时候也需要调用初始化函数 bool Log::init 。
bool Log::init(const char *file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size){
//file_name表示日志文件的路径和名称,close_log表示是否关闭日志,log_buf_size表示日志缓冲区的大小,split_lines表示日志文件达到的最大行数时进行切割,max_queue_size表示异步模式下阻塞队列的长度。
}
初始化时,先判断是否要以异步模式运行,一般来说肯定是异步启动的 。
...
//如果设置了max_queue_size,则设置为异步
if (max_queue_size >= 1){
m_is_async = true;
m_log_queue = new block_queue<string>(max_queue_size);
pthread_t tid;
//flush_log_thread为回调函数,这里表示创建线程异步写日志
pthread_create(&tid, NULL, flush_log_thread, NULL);
}
...
如果异步启动,那么将 m_is_async 标志设置为 true ,然后创建一个阻塞队列 m_log_queue ,指定该队列的最大长度为 max_queue_size .
然后用 pthread_create 起一个新线程,该线程的目的是从队列中获取日志条目,并以异步方式将它们写入日志文件.
flush_log_thread ( 详见 )作为回调函数传入新线程中(注意不是pthread_create),该函数负责从队列中获取日志条目并将其写入日志文件.
然后是一些参数的设置 。
m_close_log = close_log;//将close_log参数的值赋给成员变量m_close_log。该变量指示是否关闭日志功能。
m_log_buf_size = log_buf_size;//确定了日志缓冲区的大小,即缓冲区中可以存储的最大字符数
m_buf = new char[m_log_buf_size];//动态分配了一个大小为m_log_buf_size的字符数组(缓冲区)。指向这个分配内存的指针存储在成员变量m_buf中,代表日志缓冲区。
memset(m_buf, '\0', m_log_buf_size);//使用空字符('\0')对日志缓冲区进行初始化。确保缓冲区最初为空,准备存储日志消息。
m_split_lines = split_lines;//指定每个日志文件中的最大行数,在超过此限制后会创建一个新的日志文件。
上述代码设置了Log类的各种配置参数,如日志缓冲区大小、每个日志文件的最大行数以及是否关闭日志功能.
因为我们是要生成日志嘛,日志最重要的信息就是时间,因此我们在初始化时需要把当前系统时间保存到一个结构体struct tm中,以便后续生成时间戳的时候使用.
time_t t = time(NULL);//获取当前时间的秒数
struct tm* sys_tm = localtime(&t);//使用localtime()函数将时间转换为本地时间
struct tm my_tm = *sys_tm;//通过解引用sys_tm指针,将其中存储的struct tm结构体的内容复制到另一个名为my_tm的结构体中。这样可以在后续代码中使用my_tm来访问年、月、日等日期和时间信息。
然后就是要真正开始写日志文件,定义了一个 const char* 类型的指针 p ,并通过调用 strrchr(file_name, '/') 函数来查找 file_name 字符串中最后一个出现的斜杠字符('/')的位置。如果找不到斜杠字符, p 将被赋值为 NULL .
const char *p = strrchr(file_name, '/');
char log_full_name[256] = {0};//定义一个大小为256的字符数组log_full_name,并初始化为全零。
然后,使用条件语句检查 p 是否为 NULL 。如果 p 是 NULL ,表示 file_name 字符串中没有斜杠字符,即该字符串只包含文件名而不包含路径信息。在这种情况下,使用 snprintf 函数将日期和文件名格式化为新的字符串,并存储在 log_full_name 中.
如果 p 不为 NULL ,表示 file_name 字符串中存在斜杠字符,即该字符串包含路径信息。在这种情况下,使用 strcpy 函数将 p + 1 处开始的子字符串(即去除斜杠字符)复制到 log_name 字符数组中。同时,使用 strncpy 函数将从 file_name 的开头到 p - file_name + 1 个字符(包括斜杠字符)的子字符串复制到 dir_name 字符数组中。最后,使用 snprintf 函数将路径、日期和文件名格式化为新的字符串,并存储在 log_full_name 中.
if (p == NULL){
snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name);
}
else{
strcpy(log_name, p + 1);
strncpy(dir_name, file_name, p - file_name + 1);
snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name);
}
接下来,将当前日期的日部分( my_tm.tm_mday )赋值给成员变量 m_today .
最后,使用 fopen(log_full_name, "a") 函数以追加模式("a")打开 log_full_name 指定的日志文件。如果文件打开失败(返回 NULL ),则返回 false .
m_today = my_tm.tm_mday;
m_fp = fopen(log_full_name, "a");
if (m_fp == NULL) return false;
至此,日志类初始化完成 。
我们确定类该实例的运行模式(异步),然后为该实例起了一个新线程,在该线程中维护一个阻塞队列,该队列采用生产者-消费者模式设计,使用循环数组实现。里面保存的是字符串类型的日志数据(例如: string single_log; ) 。
然后我们还会获取系统时间并对日志缓冲区的大小等参数进行设置,最后打开一个日志文件准备记录日志信息.
该函数没有定义,只有在头文件中的一个声明, void * 表示返回类型为无类型指针。(使用了C++中的多线程编程和指针语法) 。
static void* flush_log_thread(void *args){
Log::get_instance()->async_write_log();
}
flush_log_thread 去调用了一个日志类实例中的私有方法async_write_log()来异步地写入日志,该函数的定义如下:
void* async_write_log(){
string single_log;
//从阻塞队列中取出一个日志string,写入文件
while (m_log_queue->pop(single_log))
{
m_mutex.lock();
fputs(single_log.c_str(), m_fp);
m_mutex.unlock();
}
}
在async_write_log()中,通过 m_log_queue->pop(single_log) 从阻塞队列( 详见 )中取出一个日志字符串 single_log .
使用 fputs(single_log.c_str(), m_fp) 将日志字符串写入文件。注意,这里还使用了互斥锁 m_mutex 来保护对文件指针 m_fp 的访问.
fputs 函数是C和C++标准库中的一个函数,用于将字符串写入文件.
block_queue顾名思义其实现了一个阻塞队列,以循环数组的方式 。
该阻塞队列中的元素是通过 void *async_write_log() 。
先来看该队列的初始化部分 。
#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H
#include <iostream>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "../lock/locker.h"
using namespace std;
template <class T>
class block_queue{
public:
block_queue(int max_size = 1000){
if (max_size <= 0){
exit(-1);
}
m_max_size = max_size;
m_array = new T[max_size];
m_size = 0;
m_front = -1;
m_back = -1;
}
...
private:
locker m_mutex;
cond m_cond;
T *m_array;
int m_size;
int m_max_size;
int m_front;
int m_back;
};
#endif
该阻塞队列以类的形式存在,在该类的构造函数中,通过 new T[max_size] 创建了一个大小为 max_size 的数组 m_array ,用于存储元素。因为要实现的是一个“队列”,所以该数组要通过 头尾指针更新 来管理数据存放的位置 。
在说明为何使用头尾指针更新的策略之前需要先了解队列的基本概念 。
首先,队列是一种 先进先出 (FIFO)的数据结构,其中元素按照插入的顺序进行访问和移除。因此队列有两个关键操作: 入队 (enqueue)将元素添加到队列的尾部, 出队 (dequeue)将队列的头部元素移除并返回.
在代码实现中就是push和pop 。
队列通常使用 头指针 (front)和 尾指针 (rear)来管理元素的位置。这两个指针用于确定队列的起始点和结束点,从而允许我们在队列的两端进行插入和删除操作.
初始化一个空队列时,头指针和尾指针都指向同一个位置 (例如,初始值为0)。当我们执行入队操作时,尾指针会递增,并将新元素放在尾指针所指向的位置。而在出队操作时,头指针会递增,并移动到下一个元素所在的位置.
头尾指针更新的一般过程:
1、初始化队列时,头指针和尾指针均指向同一个位置.
2、执行 入队操作 时, 尾指针递增 ,并将 新元素放在尾指针所指向的位置 .
3、执行 出队操作 时, 头指针递增 ,并移动到下一个元素所在的位置。注意,在出队操作之前,我们需要检查队列是否为空.
ok回到代码 。
前面我们说到,为了实现阻塞队列,代码中使用了循环数组,通过头指针和尾指针来管理数据的插入操作 。
队列的头指针 m_front 和尾指针 m_back 都是通过取模运算 (m_back + 1) % m_max_size 来实现循环的。(后面会有解释) 。
这意味着当队列的最后一个位置被占用时,下一个元素会从数组的起始位置重新开始存放。这样就形成了循环的效果.
该阻塞队列实现了 线程安全的生产者-消费者模型 。这种模型是多线程编程中常见的一种设计模式,用于解决生产者线程和消费者线程之间的数据同步和通信问题.
简单来说,这个类实现的所谓"阻塞队列"中维护着一个数据结构,外部可以将数据输入该数据结构也可以从中取出数据.
在多线程的背景下, 。
调用 push() 函数 将 日志字符串添加到阻塞队列中 的线程就是 生产者线程 ;【在这里就是 void Log::write_log 】 。
通过 调用 pop() 函数 从阻塞队列中 取出日志字符串并写入文件 的线程就是 消费者线程 ;【在这里就是 void *async_write_log() 】 。
前面说过,这两个函数分别实现阻塞队列的入队和出队操作.
push(const T &item) : 入队操作.
//往队列添加元素,需要将所有使用队列的线程先唤醒
//当有元素push进队列,相当于生产者生产了一个元素
//若当前没有线程等待条件变量,则唤醒无意义
bool push(const T &item){
m_mutex.lock();//锁
if (m_size >= m_max_size){//当前队列大小超过上限
m_cond.broadcast();//broadcast()是对pthread_cond_broadcast的一个封装
m_mutex.unlock();//解锁
return false;
}
m_back = (m_back + 1) % m_max_size;//计算元素push到队列之后的位置
m_array[m_back] = item;//将该元素放到指定位置
m_size++;//队列长度增加
m_cond.broadcast();//唤醒所有等待在条件变量 m_cond 上的线程,当调用 m_cond.broadcast() 时,所有正等待在 m_cond 上的线程都将被唤醒,并且它们将重新竞争获取相关的资源或执行特定的操作。这种广播机制确保没有线程会永久地阻塞在条件变量上,因为即使其中一个线程通过信号或其他方式唤醒,其他线程仍然可以继续执行。
m_mutex.unlock();
return true;
}
首先获取互斥锁 m_mutex ,然后检查队列是否已满.
如果队列已满,就唤醒所有等待条件变量 m_cond 的线程,并返回false表示入队失败.
如果队列未满,则将元素插入队尾,并更新队列的大小。接着唤醒所有等待条件变量的线程,并释放互斥锁,最后返回true表示入队成功.
m_back = (m_back + 1) % m_max_size; 的作用是将 m_back 后移一位,并且通过取模运算确保 m_back 在有效索引范围内循环更新,从而 实现队列的添加操作 。【 作用于队尾 】 。
举个例子:
假设当前队列的长度为 m_max_size ,则 m_back 的范围是从 0 到 m_max_size-1 。当插入一个新的元素时,我们需要将 m_back 后移一位来指向新的队尾.
如果 m_back 已经指向了队列的最后一个位置(即 m_back == m_max_size - 1 ),则 (m_back + 1) % m_max_size 的结果就是 0,即将 m_back 更新为 0,重新回到数组的开头.
这种循环更新索引的方式使得整个数组成为一个环形结构,实现了循环队列的特性.
pop(T &item) : 出队操作.
//pop时,如果当前队列没有元素,将会等待条件变量
bool pop(T &item){
m_mutex.lock();
while (m_size <= 0){
if (!m_cond.wait(m_mutex.get())){//如果没有其他线程push进元素,那就没东西可pop,返回true,解锁
m_mutex.unlock();
return false;
}
}//有东西可以弹出,计算队头要移动的位置
m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];//提供给用户一个接口,让他们能够获得从队列中弹出的元素(如果想获取的话)
m_size--;
m_mutex.unlock();
return true;
}
首先获取互斥锁 m_mutex ,然后检查队列是否为空.
如果队列为空,就进入循环等待条件变量 m_cond ,直到有新元素被加入队列.
m_cond.wait(m_mutex.get()) 是一个条件变量(Condition Variable)的等待操作,条件变量通常与互斥锁一起使用,以实现线程间的同步.
m_cond 是一个条件变量对象, m_mutex.get() 获取互斥锁对象.
在阻塞队列中,当队列为空时,调用 pop 函数会进入等待状态,直到有元素可供弹出或者超时.
如果等待失败,即没有其他线程通过 push 操作插入新的元素,那么 !m_cond.wait(m_mutex.get()) 返回 true ,即等待失败。在这种情况下,函数将立即返回并返回 false ,表示弹出操作未成功.
一旦有新元素加入队列,或者超时时间达到,就从队头取出一个元素并更新队列的大小。最后释放互斥锁并返回true表示出队成功.
与push函数中的类似, m_front = (m_front + 1) % m_max_size; 的作用是将 m_front 后移一位,并且通过取模运算确保 m_front 在有效索引范围内循环更新,从而 实现队列的弹出操作 。【 作用于队头 】 。
这里有一个问题,如果观察的话会发现,在pop函数中,我们只是获取了队列头部的元素赋值给item, 并没有直接删除m_array中对应位置的元素 ,那这也能算pop吗?
是的 ,因为这里使用的是循环数组, 在下一次push操作时,新的元素将会覆盖掉之前m_front所指向的位置,相当于间接删除了该元素 .
循环数组的索引m_front和m_back被用来追踪队列的头部和尾部。当调用pop函数时,我们通过更新m_front索引和减小队列大小(m_size)来模拟弹出元素的操作。这样做的好处是避免了频繁地移动数组中的元素,从而提高了性能.
上面介绍的是实现阻塞队列的核心函数,除此之外,还需要提供一些方便的功能函数来辅助用户完成某些功能 。
例如:full()函数可以判断队列是否满了、empty()判断队列是否为空、front()可以直接返回队首元素(其实就是直接返回m_front处的元素),同理还有back()等 。
初始化完毕后肯定要开始写日志, void Log::write_log 负责这部分的工作 。
同时,由于该函数与要写入日志信息,因此要往阻塞队列中push数据,所以该类就是阻塞队列的消费者(准确的说是调用日志写入函数的某个线程是消费者) 。
void Log::write_log(int level, const char *format, ...){
//level表示日志级别,format是一个格式化字符串,类似于printf函数的格式化字符串,用于指定日志消息的内容。
}
日志最重要的是时间,首先我们获取时间 。
void Log::write_log(int level, const char *format, ...){
struct timeval now = {0, 0};
gettimeofday(&now, NULL);//使用 gettimeofday 函数获取当前的时间,并将其存储在 now 变量中
time_t t = now.tv_sec;//将now结构体中的tv_sec字段(表示从Unix纪元以来的秒数)赋值给time_t类型的变量t。
struct tm *sys_tm = localtime(&t);
struct tm my_tm = *sys_tm;
...
}
然后要根据传入的日志级别 level 的不同,将相应的日志级别字符串复制到字符数组 s 中.
...
char s[16] = {0};
switch (level)
{
case 0:
strcpy(s, "[debug]:");
break;
case 1:
strcpy(s, "[info]:");
break;
case 2:
strcpy(s, "[warn]:");
break;
case 3:
strcpy(s, "[erro]:");
break;
default:
strcpy(s, "[info]:");
break;
}
...
下面开始写入一个log,先获取锁,然后m_count++(日志数量加一) 。
...
m_mutex.lock();
m_count++;
...
创建日志前,检查是否需要创建,如果当前日期已经进入新的一天或者当前日志文件达到最大行数限制,则需要创建新的日志文件 。
if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0){//新一天||最大行数
char new_log[256] = {0};//创建一个新的日志文件名缓冲区。
fflush(m_fp);//刷新文件流,主要作用是确保缓冲区中的数据被立即写入到文件,而不是等待缓冲区满或者文件关闭时才进行写入。
fclose(m_fp);//关闭当前的日志文件//m_fp是一个用于打开log的文件指针,定义在头文件中
char tail[16] = {0};//创建一个后缀字符串缓冲区,用于表示日期部分的文件名后缀。
//使用日期信息将后缀字符串格式化为 年_月_日_ 的形式。
snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday);
if (m_today != my_tm.tm_mday){//检查当前日期是否与上次写入日志的日期不同。
//将目录名、日期后缀和日志文件名合并成一个完整的新日志文件名。
snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name);
m_today = my_tm.tm_mday;//如果是新的一天,则需要重置计数器和创建新的文件名。
m_count = 0;
}
else{//如果不是新的一天,根据计数器值创建带有序号的新日志文件名。
snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines);
}
m_fp = fopen(new_log, "a");//打开新的日志文件以进行追加写入操作。
}
m_mutex.unlock();//释放互斥锁
在写入每条日志时检查是否需要创建新的日志文件。如果已经进入新的一天或者当前日志文件的行数达到最大限制,就会创建一个新的日志文件,并更新相关的计数器和日期信息.
然后将日志的时间、级别和具体内容格式化为字符串,并存储在 log_str 变量中.
...
va_list valst;//大概就是提供了一种处理可变参数的机制,允许函数在运行时根据传入的参数数量和类型来进行相应的操作。
va_start(valst, format);
string log_str;
m_mutex.lock();//拿锁
//写入的具体时间内容格式
int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ",
my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday,
my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s);
//将可变参数列表中的参数和格式字符串 format 进行格式化
int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst);
m_buf[n + m] = '\n';
m_buf[n + m + 1] = '\0';
log_str = m_buf;
m_mutex.unlock();
...
至此,我们将格式化后的日志字符串信息保存到了 log_str 变量中 。
以下将对日志变量 log_str 进行使用 。
if (m_is_async && !m_log_queue->full()){
m_log_queue->push(log_str);
}
else{
m_mutex.lock();
fputs(log_str.c_str(), m_fp);//log_str的内容使用fputs()函数写入到文件指针m_fp所代表的文件中。
m_mutex.unlock();
}
va_end(valst);
}
如果日志模块 是异步 写入模式,并且日志队列( m_log_queue )没有满,则将 log_str 推送到日志队列中(通过调用 m_log_queue->push(log_str) )。这意味着日志内容将被放入队列中以供后续处理.
如果日志模块 不是异步 模式,或者 日志队列已满 ,则直接将日志内容写入文件.
无论是将日志内容推送到队列还是直接写入文件,最终都可以将日志内容记录下来.
还记得之前的 阻塞队列 吗?如果是异步写入模式, log_str 就已经被push到阻塞队列中了,此时,调用 void Log::write_log 的线程就在充当一个生产者.
到这里,算是把日志类的核心流程走完了。日志类维护着一个阻塞队列,与该队列进行数据交互时遵循生产者-消费者模式,生产者也就是这里的日志写入函数,会将标准化后的日志字符串作为元素push到队列中,等到被外界线程pop获取.
ps:疑问,到底是谁最后向阻塞队列请求日志数据?
flush 函数用于强制刷新写入流缓冲区,确保所有的日志内容都被写入到文件中 。
void Log::flush(void){
m_mutex.lock();
//强制刷新写入流缓冲区
fflush(m_fp);
m_mutex.unlock();
}
该函数首先获取互斥锁 m_mutex ,以确保在执行刷新操作时不会与其他线程产生竞争条件。然后调用 fflush(m_fp) 函数,该函数用于将流(在此处为日志文件 m_fp )的缓冲区内容立即写入到文件,并清空缓冲区。最后释放互斥锁,完成刷新操作.
通过调用 flush 函数,可以确保在需要立即将日志内容写入磁盘的情况下, 不必等待缓冲区满或文件关闭时才进行写入,从而避免丢失重要的日志信息.
基于之前手写时的思路来看就行,大部分思路是一致的( 详见 ) 。
该项目中,线程池文件存放在TinyWeb/threadpool中。使用模板来实现了线程池,在threadpool.h中实现了该模板类 。
写法上基本上与之前的一致,但是理解上有不同。之前因为没有在一个完整项目的角度来理解线程池,多少会有一些偏差,因此这里重新对线程池进行一个梳理,加深在 Reactor 模式下对其的理解.
与上一版代码一致,这里的线程池类也被声明为一个模板类 。
template <typename T>
class threadpool{
public:
/*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
bool append(T *request, int state);
bool append_p(T *request);
private:
/*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
static void *worker(void *arg);
void run();
private:
int m_thread_number; //线程池中的线程数
int m_max_requests; //请求队列中允许的最大请求数
pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; //请求队列
locker m_queuelocker; //保护请求队列的互斥锁
sem m_queuestat; //是否有任务需要处理
connection_pool *m_connPool; //数据库
int m_actor_model; //模型切换
};
template <typename T>//通过参数列表进行初始化
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool){
if (thread_number <= 0 || max_requests <= 0)//异常判断,线程数和最大请求数小于0,报错
throw std::exception();
m_threads = new pthread_t[m_thread_number];//创建线程池数组
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i){
printf("创建第 %d 个线程\n", i);
if (pthread_create(m_threads + i, NULL, worker, this) != 0){
delete[] m_threads;
throw std::exception();
}//在调用pthread_detach()函数之后,线程将进入“分离”状态,这意味着它不能再被其他线程或主线程等待和加入。
if (pthread_detach(m_threads[i])){
delete[] m_threads;
throw std::exception();
}
}
}
总体来说,该构造函数创建了一组指定数量的线程,并将它们设置为可分离状态。这些线程将用于处理任务队列中的请求,实现了线程池的基本功能.
其他函数就先不看了,直接看入口函数worker 。
worker函数会在线程池初始化时被构造函数调用(作为参数输入pthread_create, 具体见 ) 。
void *threadpool<T>::worker(void *arg){
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
而该函数调用了run函数(这个run函数明显比之前写的要处理更多的事情) 。
template <typename T>
void threadpool<T>::run(){
while (true){
m_queuestat.wait();//阻塞等待捕获sem信号量
m_queuelocker.lock();//拿到信号量之后上锁
if (m_workqueue.empty()){//若队列为空就解锁
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();//取出队列头部的请求
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)//没有东西就继续循环
continue;
}
首先,先阻塞等待,拿锁。然后判断队列里面有没有东西,有就取出来。判断一下取出来的大小里面有没有请求,没有就继续循环 。
从代码上看,线程池中维护的队列 m_workqueue 中,接收一个模板元素作为输入,该元素便是request 。
结合 webserver.cpp 中对线程池的使用来看, request应该是http_conn类的实例化对象 .
也就是说,线程池中维护着工作队列 m_workqueue ,该队列中的元素则为http_conn对象 。
好了,如果获取到http_conn对象,接下来要对其进行相应的处理了 。
...
if (1 == m_actor_model){
if (0 == request->m_state){//读取http
if (request->read_once()){
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}else{
request->improv = 1;
request->timer_flag = 1;
}
}
else{//写
if (request->write()){
request->improv = 1;
}
else{
request->improv = 1;
request->timer_flag = 1;
}
}
}else{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
这里需要确定使用的模式,来进行对应的处理 。
在 1 == m_actor_model 的条件下,使用了主从模式,往后面看就会知道,这里所谓的request(其实就是http_conn对象)中的process()函数使用了process_read()函数,而后者采用主从状态机模式进行设计实现,能够根据请求的状态进行相应的处理操作.
获取http_conn对象(request)中的成员属性m_state,从http_conn.h中可知 。
int m_state; //读为0, 写为1
首先看一下写的时候的操作 。
...
if (request->read_once()){
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);//数据库部分再说
request->process();
}else{
request->improv = 1;
request->timer_flag = 1;
}
...
调用http_conn对象中的 bool http_conn::read_once() 函数,该函数用于循环读取客户数据,直到无数据可读或对方关闭连接。此外,该函数还分别支持非阻塞ET工作模式和LT工作模式( 详见 ) 。
读取完数据之后,read_once()返回true,然后将http_conn中的improv属性置为1 。
然后创建一个connectionRAII对象mysqlcon(这里后面介绍连接池会说: 跳转 )获取连接池中的一个连接资源,将当前http_conn对象的信息给到这个连接,以便查询数据库中的相关信息(登录信息) 。
最后就是调用http_conn自身的处理函数process()来处理接受到的数据;( 详见 ) 。
如果读取数据后没有返回true,那么说明读取过程出现了错误,此时也要将improv置为1,同时还要把timer_flag也置为1.
然后到 写的操作 。
...
else{
if (request->write()){
request->improv = 1;
}
else{
request->improv = 1;
request->timer_flag = 1;
}
}
...
读操作就比较简单,直接调用http_conn对象中的write()函数即可 。
bool http_conn::write(){
int temp = 0;
if (bytes_to_send == 0){
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
init();
return true;
}
...
}
该函数首先定义了一个临时变量 temp ,用于 保存每次写操作发送的字节数 .
接下来,它检查是否需要发送的字节数为零。如果是,则调用 modfd() 函数修改文件描述符 m_sockfd 在 m_epollfd 中的事件,将其设置为监听读事件( EPOLLIN ),并调用 init() 函数重置HTTP连接的状态。然后返回 true 表示写操作完成.
...
while (1){
temp = writev(m_sockfd, m_iv, m_iv_count);
if (temp < 0){
if (errno == EAGAIN){//表示写缓冲区已满,无法继续发送数据
//修改文件描述符`m_sockfd`在`m_epollfd`中的事件,将其设置为监听写事件(`EPOLLOUT`)
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);
return true;
}//表示写操作发生错误
unmap();//调用`unmap()`函数取消映射文件,并返回`false`表示写操作失败。
return false;
}
...
如果需要发送的字节数不为零,则进入一个循环.
在循环中,调用 writev() 函数将写缓冲区中的数据发送到套接字 m_sockfd 。 writev() 函数可以一次性发送多个缓冲区的数据。如果发送成功, writev() 函数返回发送的字节数。如果发送失败,会根据错误类型进行相应的处理.
...
bytes_have_send += temp;
bytes_to_send -= temp;//如果已发送的字节数bytes_have_send大于等于m_iv[0].iov_len
if (bytes_have_send >= m_iv[0].iov_len){//,表示当前的写缓冲区数据已经发送完毕。
m_iv[0].iov_len = 0;//将m_iv[0].iov_len设为0,表示不再发送写缓冲区的数据
//文件地址加上已发送字节数与m_write_idx的差值,表示下一次发送的数据是文件内容的剩余部分
m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
m_iv[1].iov_len = bytes_to_send;//更新m_iv[1].iov_len为剩余待发送的字节数
}
else{//已发送的字节数bytes_have_send小于m_iv[0].iov_len,表示当前的写缓冲区数据还未完全发送。
m_iv[0].iov_base = m_write_buf + bytes_have_send;//更新为写缓冲区中剩余数据的起始地址
m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;//更新为剩余待发送的字节数。
}
...
在成功发送数据后,更新已发送的字节数 bytes_have_send 和剩余待发送的字节数 bytes_to_send 。然后根据当前的发送情况,更新 m_iv 数组中的数据.
...
if (bytes_to_send <= 0){//检查剩余待发送的字节数是否小于等于0。满足则表示所有数据都已发送完毕。
unmap();//取消映射文件//↓修改文件描述符m_sockfd在m_epollfd中的事件,将其设置为监听读事件(EPOLLIN)。
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
if (m_linger){//设置了长连接
init();//调用init()函数重置HTTP连接的状态,并返回true表示写操作完成。
return true;
}//未设置长连接,则直接返回false表示写操作完成。
else return false;
}
}
}
回到run函数主体,如果 0 == m_actor_model ,则表示线程池采用同步模式工作 。
在同步模式中,线程池中的线程按顺序依次处理请求,每个线程处理完一个请求后再处理下一个请求.
当有新的请求到达时,线程池中的线程会依次处理这些请求,直到所有请求都得到处理.
...
else{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
1 == m_actor_model
时是主从模式? TBD 。
该函数使用recv函数从socketfd接收数据 。
bool http_conn::read_once(){//先判断一下当前要接收的数据是否已经超出缓冲区大小
if (m_read_idx >= READ_BUFFER_SIZE){
return false;//读取失败
}
int bytes_read = 0;
提供两种读取模式:边缘触发和水平触发,两种都是常用的事件触发机制,用于处理非阻塞I/O事件。分别来看 。
...
//LT读取数据
if (0 == m_TRIGMode){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
m_read_idx += bytes_read;
if (bytes_read <= 0) return false;
return true;
}
...
水平触发模式(Level Triggered Mode)下,只调用一次 recv 函数来读取数据。这是因为在LT模式下,当套接字可读时,会一直触发可读事件(由事件循环实现),直到读取缓冲区中的数据为空。(没有读取完数据,函数会返回 true ,表示已经读取了一部分数据。下次可读事件到达时,会再次调用 read_once 函数来继续读取剩余的数据,直到没有更多数据可读为止。) 。
如果在LT模式下没有读取完数据,那么在下一次可读事件到达时,会再次调用 read_once 函数来读取剩余的数据。这样可以确保在每个可读事件中尽可能地读取更多的数据.
水平触发模式的优点: 简单可靠 。缺点是:频繁的事件通知增大开销,无法及时处理事件导致阻塞.
水平触发模式相对较简单,只要应用程序处理完整个事件,系统就会持续通知,不需要过于细致的处理逻辑。且该模式持续通知直到事件处理完成,确保事件不会被丢失,应用程序有足够的时间处理事件.
再来看边沿触发模式(Edge Triggered Mode) 。
...
else{//ET读数据
while (true){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
if (bytes_read == -1){
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
return false;
}
else if (bytes_read == 0) return false;
m_read_idx += bytes_read;
}
return true;
}
}
该模式下,使用recv函数从对应文件描述符(即m_sockfd)上读取数据,一次性 读取尽可能多的数据 ,并将读取到的数据保存到m_read_buf中,返回值表示是否成功读取数据。(recv函数的输入参数介绍: 详见 ) 。
在ET模式下,当套接字的接收缓冲区状态发生变化时, 操作系统只触发一次可读事件 。也就是说,只有当接收缓冲区由空变为非空时,才会触发一次可读事件。通过循环调用 recv 函数,直到返回值为-1(表示没有更多数据可读)或返回值为0(表示对方关闭连接)为止.
如果 recv 函数返回-1且错误码为EAGAIN或EWOULDBLOCK,表示当前没有更多数据可读,此时退出循环。否则,继续读取数据.
ET模式相比LT模式更加高效,因为它只在接收缓冲区状态发生变化时触发一次事件,减少了事件的触发次数.
缺点就是更复杂,需要消耗更多的资源 。
该函数用于处理获取到的数据 。
void http_conn::process(){
HTTP_CODE read_ret = process_read();
if (read_ret == NO_REQUEST){
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
return;
}
bool write_ret = process_write(read_ret);
if (!write_ret) close_conn();
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);
}
可以看到,这里实际上就是调用了 process_read() 和 process_write() 两个函数来处理数据 。
工作流程如下:
process_read()
函数进行读取和解析请求( 详见 ):
process_read()
函数负责从套接字中读取数据,并解析HTTP请求。 NO_REQUEST
:表示没有完整的HTTP请求,需要继续等待数据到达。 GET_REQUEST
:表示成功解析出一个完整的GET请求。 BAD_REQUEST
:表示解析请求出现错误,请求格式不正确。 read_ret
的值进行处理:
read_ret
为 NO_REQUEST
,表示没有完整的HTTP请求,将套接字的事件设置为可读,并返回等待下一次可读事件的到达。 read_ret
为其他值,表示成功解析出一个完整的HTTP请求或出现错误,需要进行下一步的处理。 process_write()
函数进行响应处理():
process_write()
函数负责根据 read_ret
的值生成HTTP响应,并将响应数据写入套接字。 write_ret
表示写入套接字的结果,为 true
表示写入成功,为 false
表示写入失败。 write_ret
的值进行处理:
write_ret
为 false
,表示写入套接字失败,需要关闭连接。 write_ret
为 true
,表示写入套接字成功,将套接字的事件设置为可写,并等待下一次可写事件的到达。 虽然该函数也被process()调用,但是其没有使用主从状态机模式去设计,该函数的作用是 根据传入的 HTTP_CODE 参数生成HTTP响应,并将生成的响应内容添加到写缓冲区中 .
process_write() 函数根据传入的 HTTP_CODE 参数, 针对不同的状态码生成不同的响应内容 。它通过调用一系列辅助函数(如 add_status_line() 、 add_headers() 和 add_content() )将响应的状态行、响应头和响应体添加到写缓冲区中.
(不贴代码了,有点长) 。
所谓的" 主从状态机 "其实就是指http_conn::HTTP_CODE http_conn::process_read()遵循的设计模式,该函数根据主从状态机模式进行设计,用于处理不同状态。 详见 。
因为之前有详细写过这部分的介绍,这里就概括一下就行 。
主状态机 : http_conn::process_read() 函数是主状态机。它负责解析HTTP请求的不同部分,并根据当前状态执行相应的操作。主状态机在循环中不断解析一行数据,并根据解析的结果进行状态切换和处理。主状态机的状态包括 CHECK_STATE_REQUESTLINE 、 CHECK_STATE_HEADER 和 CHECK_STATE_CONTENT .
从状态机 : parse_line() 函数是从状态机。它在主状态机中被调用,用于解析一行数据的状态。从状态机的任务是根据当前解析的数据判断是否解析完成一行,并返回相应的状态。从状态机的状态包括 LINE_OK 、 LINE_BAD 和 LINE_OPEN .
主状态机和从状态机的交互 :主状态机在循环中不断调用从状态机的 parse_line() 函数来解析一行数据的状态。如果从状态机返回的状态为 LINE_OK ,表示成功解析一行数据,主状态机根据当前状态进行相应的处理。如果从状态机返回的状态不是 LINE_OK ,则继续循环解析下一行数据.
主状态机根据从状态机的返回结果进行不同的处理,包括解析请求行、解析请求头、解析请求数据等。根据不同的解析结果,主状态机会返回不同的 HTTP_CODE ,用于后续的处理和生成HTTP响应.
一旦从状态机解析完整个HTTP请求,主状态机就会调用 do_request() 函数来处理具体的请求信息。该函数根据解析的请求信息生成HTTP响应。它会根据请求类型和URL构建实际的文件路径,并进行相应的处理。例如,如果是CGI请求,它会处理登录和注册等操作;如果是静态文件请求,它会检查文件的权限和类型,并将文件映射到内存中.
总体而言,这个主从状态机的作用是实现了对HTTP请求的解析和处理,以及生成相应的HTTP响应。它通过合理的状态切换和处理逻辑,使得Web服务器能够正确地响应客户端的请求,并处理各种错误情况.
总结:
- 主状态机是
http_conn::process_read()
函数,负责解析HTTP请求的不同部分。- 从状态机是
parse_line()
函数,用于解析一行数据的状态。- 主状态机通过循环调用从状态机来解析数据,并根据解析结果进行状态切换和处理。
- 主状态机和从状态机的交互通过从状态机返回的状态来完成。
简单来说就是:线程池是一种为服务器引入并发性的多线程技术 。
详细介绍: 见 。
在线程池的run函数中,不管是主从模式还是同步模式,都有以下一段代码 。
connectionRAII mysqlcon(&request->mysql, m_connPool);
干嘛的?现在来看 。
所谓的"池"实际上就是 一组资源的集合 ,任何资源如果有需要都可以以池的形式组织,比如 线程池 。
简单来说, 池是资源的容器 ,本质上是对资源的复用。连接池也不例外 。
连接池中的资源为一组 数据库连接 ,由程序动态地对池中的连接进行使用,释放.
数据库访问的流程一般是:当系统需要访问数据库时,先系统创建数据库连接,完成数据库操作,然后系统断开数据库连接.
按照上面的流程,如果要频繁地访问数据库,那就得不断创建和断开数据库连接,这个过程很耗时且存在数据安全隐患.
所以,在程序初始化时我们就提前创建一些数据库连接,将它们用池管理起来,等用的时候再给程序,这样既能保证较快的数据库访问速度,又能确保数据安全.
上代码 。
主函数 main.cpp 中通过调用webserver对象的 sql_pool() 函数来创建一个连接池 。
server.sql_pool();
void WebServer::sql_pool() 首先创建一个连接池对象,然后进行初始化操作 。
void WebServer::sql_pool(){
//初始化数据库连接池
m_connPool = connection_pool::GetInstance();
m_connPool->init("localhost", m_user, m_passWord, m_databaseName, 3306, m_sql_num, m_close_log);
//初始化数据库读取表
users->initmysql_result(m_connPool);
}
看到这个"GetInstance()"想到什么?没错,单例模式 。
这个数据库连接池的设计也使用到了单例模式,前面日志处理部分的时候我们见识过单例模式了其实 。
connection_pool *connection_pool::GetInstance(){
static connection_pool connPool;
return &connPool;
}//GetInstance()被调用之后返回一个唯一的连接池实例
并且这里很明显使用的也是懒汉式,GetInstance()被调用才会创建或返回唯一实例 。
(区分懒汉还是饿汉,最好就是看该类在头文件中的定义) 。
//构造初始化
void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log){
m_url = url;
m_Port = Port;
m_User = User;
m_PassWord = PassWord;
m_DatabaseName = DBName;
m_close_log = close_log;
...
传入的参数赋值给连接池的成员变量,包括主机地址( m_url )、端口号( m_Port )、用户名( m_User )、密码( m_PassWord )、数据库名( m_DatabaseName )和日志开关( m_close_log ).
然后使用循环创建指定数量( MaxConn )的数据库连接对象,并将其添加到连接池的 connList 列表中.
...
for (int i = 0; i < MaxConn; i++){
MYSQL *con = NULL;
con = mysql_init(con);
if (con == NULL){
LOG_ERROR("MySQL Error");
exit(1);
}
con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);
if (con == NULL){
LOG_ERROR("MySQL Error");
exit(1);
}
connList.push_back(con);
++m_FreeConn;//每创建一个连接对象,空闲连接数(m_FreeConn)加1
}//信号量reserve初始化,将信号量的初始值设置为空闲连接数(m_FreeConn),用于控制连接的获取。
reserve = sem(m_FreeConn);
m_MaxConn = m_FreeConn;//将最大连接数(m_MaxConn)设置为当前空闲连接数(m_FreeConn)。
}
这里首先使用 mysql_init() (MySQL C API 提供的函数)初始化一个 MYSQL 结构体对象.
con 是一个 MYSQL 结构体指针,传递给 mysql_init() 函数后,函数会初始化 con 指向的MYSQL结构体对象,使其成为一个有效的、用于表示数据库连接的对象。该对象可进行后续的数据库操作,如连接数据库、执行 SQL 查询等.
然后,我们需要使用 mysql_real_connect() 给 con 提供用于连接数据库的信息。如果连接成功, mysql_real_connect() 函数返回一个非空的MYSQL结构体指针,表示连接成功的连接对象。如果连接失败,返回 NULL.
在给定的代码中,连接成功后,将连接对象添加到连接池的 connList 列表( list<MYSQL *> connList; )中,并增加空闲连接数.
注意,在使用完连接对象后,需要通过 mysql_close() 函数关闭连接,并将连接对象从连接池中移除。这部分逻辑在 ReleaseConnection() 和 DestroyPool() ( 详见 )函数中实现.
初始化完数据库连接,并将其加入连接池后,当然得用这个连接去访问数据库啦 。
void WebServer::sql_pool(){
//初始化数据库连接池
...
//初始化数据库读取表
users->initmysql_result(m_connPool);
}
这里调用的是http_conn对象users中的initmysql_result()函数来与数据库进行交互 。
在该函数中,创建 connectionRAII 对象 mysqlcon ,并传递 &mysql 和 connPool 作为参数。这样, mysqlcon 对象的构造函数会获取一个数据库连接并将其赋值给 mysql .
map<string, string> users;
void http_conn::initmysql_result(connection_pool *connPool){
//先从连接池中取一个连接
MYSQL *mysql = NULL;
connectionRAII mysqlcon(&mysql, connPool);
...
connectionRAII 是一个自定义的类,用于管理数据库连接的生命周期。它的构造函数接受两个参数:一个 MYSQL** 类型的指针 con 和一个 connection_pool* 类型的指针 connPool .
简单来说,我们通过connectionRAII类来获取连接池中保存的"连接"资源,然后供后续使用.
注意,这里创建的connectionRAII类负责管理一个连接的整个使用周期,包括其取用到销毁 。
详见 connectionRAII类介绍 。
从连接池拿到连接后,开始检索数据库 。
...
//在user表中检索username,passwd数据,浏览器端输入
if (mysql_query(mysql, "SELECT username,passwd FROM user")){
LOG_ERROR("SELECT error:%s\n", mysql_error(mysql));
}
首先查询的是用户从浏览器输入的账户名和密码 。
然后查询剩余的信息并返回 。
...
//从表中检索完整的结果集
MYSQL_RES *result = mysql_store_result(mysql);
//返回结果集中的列数
int num_fields = mysql_num_fields(result);
//返回所有字段结构的数组
MYSQL_FIELD *fields = mysql_fetch_fields(result);
//从结果集中获取下一行,将对应的用户名和密码,存入map中
while (MYSQL_ROW row = mysql_fetch_row(result))
{
string temp1(row[0]);
string temp2(row[1]);
users[temp1] = temp2;
}
}
从mysql中可以得到所有数据的返回值,此时选取row[0]和row[1]对应着用户名和密码,将其保存到users(http_conn对象)中相应的成员属性中.
OK,现在我们完成了以下流程:
用户在浏览器输入用户名、密码进行注册->用户名密码入库,注册完成->用户输入用户名密码登录->使用输入数据在数据库查询 。
实际上到这里,连接池类的工作就已经完成了 。
字面意思,连接池类使用单例模式进行设计,创建了一个并维护一个连接池.
但是在初始化之后,维护连接池的工作更多的是由connectionRAII类来完成,该类调用连接池类提供的唯一实例,对外部提供了获取和使用连接池中连接的方法.
读完代码之后可以发现,实际上真正的连接池类 connection_pool 好像并没有被"直接"使用,就连连接池的管理都是"外包"出去的 。
这个就是RAII机制 。
RAII(Resource Acquisition Is Initialization)是一种资源获取即初始化的编程技术,用于管理资源的生命周期。它是一种 C++ 的编程范式,通过在对象的构造函数中获取资源,并在对象的析构函数中释放资源,以确保资源在对象的生命周期内得到正确的管理和释放.
RAII的基本原则是:在对象的构造函数中获取资源,并在析构函数中释放资源。通过这种方式,无论是正常执行还是异常情况下的退出,都可以保证资源的正确释放,避免资源泄漏.
对应到连接池的设计中就是:
连接池是保存"连接"这种资源的一个容器,而我们在创建一个连接池对象后,不直接通过对象提供的函数来使用池中的连接.
我们定义一个类connectionRAII, 。
该类的 构造函数 中调用连接池类提供的GetConnection()函数来获取连接; 。
该类的 析构函数 中调用连接池类提供的ReleaseConnection()函数来获取连接; 。
这样,当你想要获取一个连接时,你会去创建一个connectionRAII对象并输入一些必要的参数,当对象创建完成时你就已经得到了一个连接,这时候你可以开始用了.
然后等不用的时候,只需要将connectionRAII对象析构掉即可,过程中不用关系连接池背后做的一些内存回收的操作,避免产生错误 。
简单来说, RAII就是给某些资源类再次进行了封装,让使用者专注于资源的使用逻辑而不需要考虑资源的管理细节,降低资源泄漏的概率 。
在 Web 服务器中,线程池用于管理并发处理客户端请求的线程.
通常情况下, 线程池中的线程需要保持活动状态,以便随时处理新的请求 。如果使用 RAII 来管理线程池中的线程,那么在 线程对象的析构函数中释放线程资源将导致线程被终止,从而无法继续处理新的请求 .
为了保持线程池的正常工作和线程的重用,一般不使用 RAII 来管理线程池中的线程。相反,通常会使用其他手段来管理线程的生命周期,例如使用条件变量或标志位来控制线程的启动和停止,或者使用特定的线程池管理类来管理线程的创建、启动和销毁.
这就是为什么线程池不用RAII的原因 。
该函数被连接池的析构函数调用,用于销毁数据库连接池,释放内存 。
void connection_pool::DestroyPool(){
lock.lock();
if (connList.size() > 0){
list<MYSQL *>::iterator it;
for (it = connList.begin(); it != connList.end(); ++it){
MYSQL *con = *it;
mysql_close(con);
}
m_CurConn = 0;
m_FreeConn = 0;
connList.clear();
}
lock.unlock();
}
还是经典的线程安全操作,拿锁 。
然后遍历整个 connList ,取出每一个之前创建的连接,使用 mysql_close() 逐一关闭 。
最后将连接计数变量清空。然后删除 connList ,解锁.
前面也提到了,这个类是用于管理连接池中"连接"的取用的.
该类的声明如下:
class connectionRAII{//也位于连接池的头文件中
public:
connectionRAII(MYSQL **con, connection_pool *connPool);
~connectionRAII();
private:
MYSQL *conRAII;
connection_pool *poolRAII;
};
connectionRAII类中的connectionRAII函数使用传入的数据库连接对象来接收连接池中的连接,具体来说是使用GetConnection()从connPool中获取一个连接 。
connectionRAII::connectionRAII(MYSQL **SQL, connection_pool *connPool){
*SQL = connPool->GetConnection();
conRAII = *SQL;
poolRAII = connPool;
}
GetConnection()是连接池中的一个功能函数。当有请求时,从数据库连接池中返回一个可用连接并更新使用和空闲连接数 。
MYSQL *connection_pool::GetConnection()
{
MYSQL *con = NULL;
if (0 == connList.size()) return NULL;
reserve.wait();
lock.lock();
con = connList.front();
connList.pop_front();
--m_FreeConn;
++m_CurConn;
lock.unlock();
return con;
}
在GetConnection()中,首先检查 connList 是否为空,如果为空则返回NULL.
不为空则改变reserve状态为请求,然后拿锁,从 connList 的首部获取一个连接并从池中pop掉 。
完成后解锁 。
能够从池子拿连接那肯定可以释放连接,因为connectionRAII是一个类,所以释放连接的操作是在该类析构时进行的 。
connectionRAII::~connectionRAII(){
poolRAII->ReleaseConnection(conRAII);
}
析构函数中会调用连接池对象poolRAII中的ReleaseConnection函数对连接进行释放 。
bool connection_pool::ReleaseConnection(MYSQL *con){
if (NULL == con) return false;
lock.lock();
connList.push_back(con);
++m_FreeConn;
--m_CurConn;
lock.unlock();
reserve.post();
return true;
}
还是先检查获取到的连接对象指针con是否为空,然后就是经典的线程安全操作,在拿到锁之后,程序把被释放的连接从新加入 connList 的尾部,此时该连接被视为空闲连接.
然后解锁,将reserve信号量的状态改为post(增加信号量) 。
操作完成返回true 。
至此,connectionRAII类通过调用连接池实例中的成员函数,对外提供了连接池中连接的取用与管理.
最后此篇关于【从0开始编写webserver·基础篇#03】TinyWeb源码阅读,还是得看看靠谱的项目的文章就讲到这里了,如果你想了解更多关于【从0开始编写webserver·基础篇#03】TinyWeb源码阅读,还是得看看靠谱的项目的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在使用 NetBeans 开发 Java 中的 WebService,并使用 gradle 作为依赖管理。 我找到了this article关于使用 gradle 开发 Web 项目。它使用 Gr
我正在将旧项目从 ant 迁移到 gradle(以使用其依赖项管理和构建功能),并且在生成 时遇到问题>eclipse 项目。今天的大问题是因为该项目有一些子项目被拆分成 war 和 jar 包部署到
我已经为这个错误苦苦挣扎了很长时间。如果有帮助的话,我会提供一些问题的快照。请指导我该怎么办????在我看来,它看起来一团糟。 *** glibc detected *** /home/shivam/
我在 Ubuntu 12.10 上运行 NetBeans 7.3。我正在学习 Java Web 开发类(class),因此我有一个名为 jsage8 的项目,其中包含我为该类(class)所做的工作。
我想知道 Codeplex、GitHub 等中是否有任何突出的项目是 C# 和 ASP.NET,甚至只是 C# API 与功能测试 (NUnit) 和模拟(RhinoMocks、NMock 等)。 重
我创建了一个 Maven 项目,包装类型为“jar”,名为“Y”我已经完成了“Maven 安装”,并且可以在我的本地存储库中找到它.. 然后,我创建了另一个项目,包装类型为“war”,称为“X”。在这
我一直在关注the instructions用于将 facebook SDK 集成到我的应用程序中。除了“helloFacebookSample”之外,我已经成功地编译并运行了所有给定的示例应用程序。
我想知道,为什么我们(Java 社区)需要 Apache Harmony 项目,而已经有了 OpenJDK 项目。两者不是都是在开源许可下发布的吗? 最佳答案 事实恰恰相反。 Harmony 的成立是
我正在尝试使用 Jsoup HTML Parser 从网站获取缩略图 URL我需要提取所有以 60x60.jpg(或 png)结尾的 URL(所有缩略图 URL 都以此 URL 结尾) 问题是我让它在
我无法构建 gradle 项目,即使我编辑 gradle 属性,我也会收到以下错误: Error:(22, 1) A problem occurred evaluating root project
我有这个代码: var NToDel:NSArray = [] var addInNToDelArray = "Test1 \ Test2" 如何在 NToDel:NSArray 中添加 addInN
如何在单击显示更多(按钮)后将主题列表限制为 5 个(项目)。 还有 3(项目),依此类推到列表末尾,然后它会显示显示更少(按钮)。 例如:在 Udemy 过滤器选项中,当您点击查看更多按钮时,它仅显
如何将现有的 Flutter 项目导入为 gradle 项目? “导入项目”向导要求 Gradle 主路径。 我有 gradle,安装在我的系统中。但是这里需要设置什么(哪条路径)。 这是我正在尝试的
我有一个关于 Bitbucket 的项目。只有源被提交。为了将项目检索到新机器上,我在 IntelliJ 中使用了 Version Control > Checkout from Ve
所以,我想更改我公司的一个项目,以使用一些与 IDE 无关的设置。我在使用 Tomcat 设置 Java 应用程序方面有非常少的经验(我几乎不记得它是如何工作的)。 因此,为了帮助制作独立于 IDE
我有 2 个独立的项目,一个在 Cocos2dx v3.6 中,一个在 Swift 中。我想从 Swift 项目开始游戏。我该怎么做? 我已经将整个 cocos2dx 项目复制到我的 Swift 项目
Cordova 绝对是新手。这些是我完成的步骤: checkout 现有项目 运行cordova build ios 以上生成此构建错误: (node:10242) UnhandledPromiseR
我正在使用 JQuery 隐藏/显示 li。我的要求是,当我点击任何 li 时,它应该显示但隐藏所有其他 li 项目。当我将鼠标悬停在文本上时 'show all list item but don
我想将我所有的java 项目(223 个项目)迁移到gradle 项目。我正在使用由 SpringSource STS 团队开发的 Gradle Eclipse 插件。 目前,我所有的 java 项目
我下载this Eclipse Luna ,对于 Java EE 开发人员,如描述中所见,它支持 Web 应用程序。我找不到 file -> new -> other -> web projects
我是一名优秀的程序员,十分优秀!