gpt4 book ai didi

c - 处理 *nix 中多路复用套接字上的读取超时

转载 作者:行者123 更新时间:2023-11-30 17:38:17 24 4
gpt4 key购买 nike

我正在尝试编写一个程序,每 10 分钟运行一次,轮询大约 3 到 4000 个服务器。

只需发送一个 HTTP 请求并获得响应,解析它并存储在数据库中。现在我有一个 C 源代码,实际上可以完美地完成这项工作,只是总有一些服务器表现不佳并阻止轮询器。我发现了一些有用的示例,展示了如何使用多路复用非阻塞套接字、选择和回调来完成我需要的操作。

那么,它的作用是:
1.以非阻塞模式打开套接字:

recstate sckt_open(int *socketfd, const char *address, unsigned int *port)
{
struct hostent *desthost;

*socketfd = -1;

desthost = gethostbyname(address); // get IP address of the destanation host by DNS name or IP
if (!desthost || desthost->h_length != sizeof(struct in_addr))
{
if (verbose >= 2)
fprintf(stderr, "sckt_open(): cannot resolve %s: unknown host\n",
address);

return (FAILED);
}

*socketfd = socket(AF_INET, SOCK_STREAM, 0); // create an AF_INET stream socket
if (*socketfd < 0)
{
if (verbose >= 2)
fprintf(stderr, "sckt_open(): failed to create socket for %s:%d", address,
*port);

return (INTERROR); // internal error occurred while processing record
}

int rc;
if ((rc = fcntl(*socketfd, F_GETFL)) < 0
|| fcntl(*socketfd, F_SETFL, rc | O_NONBLOCK) < 0)
{
if (verbose >= 2)
fprintf(stderr,
"sckt_open(): failed to set nonblocking mode for socket %d for %s:%d",
*socketfd, address, *port);

return (INTERROR); // internal error occurred while processing record
}
rc = 1;
if (setsockopt(*socketfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &rc, sizeof(rc))
< 0)
{
if (verbose >= 2)
fprintf(stderr,
"sckt_open(): failed set keepaliv mode for socket %d for %s:%d",
*socketfd, address, *port);

return (INTERROR); // internal error occurred while processing record
}

//FIXME
// struct timeval timeout;
// timeout.tv_sec = 10;
// timeout.tv_usec = 0;
// setsockopt(*socketfd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&timeout, sizeof(struct timeval));

struct sockaddr_in connaddr;
memset(&connaddr, 0, sizeof(connaddr));
connaddr.sin_family = AF_INET; // set proto family
connaddr.sin_addr = *(struct in_addr *) desthost->h_addr; // set real destination address
connaddr.sin_port = htons(*port); // set destination port

if (connect(*socketfd, (struct sockaddr *) &connaddr, sizeof(connaddr)) < 0
&& errno != EINPROGRESS)
{
if (verbose >= 2)
fprintf(stderr, "sckt_open(): socket %d: failed to connect to %s:%d",
*socketfd, address, *port);

return (FAILED);
}

if (verbose >= 2)
printf("sckt_open(): created socket endpoint %d for %s:%d\n", *socketfd,
address, *port);

return (ACTIVE);
}

如果是 ACTIVE,主循环将上传输出缓冲区,然后注册一个写入回调。写入回调函数将尝试实际将缓冲区写入套接字并测试结果:如果写入则返回 smth。小于 0 但不是 EAGAIN ,它将删除与此 fd 对应的所有回调,并将服务器标记为 FAILED,以防 write 返回 smth。大于 0,但小于在跳过写入回调事件(部分写入)时它将返回的输出缓冲区的长度。如果写入返回缓冲区长度:它现在将删除写入回调并注册读取回调并返回。

读取回调将通过使用相应套接字的 FIONREAD 调用 ioctl 来计算操作系统输入缓冲区大小,然后尝试从套接字读取该字节数到本地缓冲区。如果没有分配本地缓冲区并且 read 返回 0,则 read 回调将关闭套接字,报告“0 字节响应”并将服务器标记为 FAILED,如果读请求返回 0 更大,则 read 回调将加载数据到本地缓冲区并返回同时保持读取回调事件。如果读取请求返回 <0,但 errno 为 EAGAIN,它将返回,否则将删除回调,关闭套接字并将服务器标记为 FAILED。如果返回 0 并且本地缓冲区非空,读取回调会将服务器标记为已完成,删除读取回调,关闭套接字并返回。

还有另一个重要的事情:我检查套接字是否就绪以及需要触发回调的方式:

void sckt_cb_check(void)
{
fd_set tread_fds, twrite_fds;
int counter, ready_fds;

tread_fds = read_fds;
twrite_fds = write_fds;

ready_fds = select(FD_MAX, &tread_fds, &twrite_fds, NULL, NULL); // check for how many file descriptors are ready

if (ready_fds < 0)
{
fprintf(stderr, "sckt_cb_check(): select returned an error: %s\n",
strerror(errno));

return;
}

for (counter = 0; ready_fds && counter < FD_MAX; counter++)
{
if (FD_ISSET(counter, &tread_fds))
{
ready_fds--;

if (FD_ISSET(counter, &read_fds))
read_callback[counter].callback_func(
read_callback[counter].callback_arg);
}

if (FD_ISSET(counter, &twrite_fds))
{
ready_fds--;

if (FD_ISSET(counter, &write_fds))
write_callback[counter].callback_func(
write_callback[counter].callback_arg);
}
}
}

这就是注册和删除回调的方式:

void sckt_cb_add(int socketfd, scktop operation, void (*func), void *arg)
{
struct callback *curr_callback;

if (socketfd < 0 || socketfd > FD_MAX)
{
fprintf(stderr,
"sckt_cb_add(): invalid file descriptor, failed to add new callback\n");

return;
}

curr_callback =
&(operation == WRITE ? write_callback : read_callback)[socketfd];
curr_callback->callback_func = func;
curr_callback->callback_arg = arg;
FD_SET(socketfd, operation == WRITE ? &write_fds : &read_fds);

if (verbose >= 2)
printf("sckt_cb_add(): registered %s callback for socket fd %d\n",
operation == WRITE ? "write" : "read", socketfd);
}

void sckt_cb_free(int socketfd, scktop operation)
{
if (socketfd <= 0 || socketfd > FD_MAX)
{
fprintf(stderr,
"sckt_cb_free(): invalid file descriptor, failed to free callback\n");

return;
}

FD_CLR(socketfd, operation == WRITE ? &write_fds : &read_fds);

if (verbose >= 2)
printf("sckt_cb_free(): removed %s callback for socket fd: %d\n",
operation == WRITE ? "write" : "read", socketfd);
}

此外,还有一个事件连接计数器,每次打开套接字时,我都会递增它,每次套接字因任何原因关闭时,我都会递减它。程序将循环直到该计数器为 0,这意味着所有连接都已关闭。因此,当这个机制尝试轮询这个虚拟服务器时,它永远不会关闭套接字,我测试了设置套接字超时和选择超时。他们都没有完成这项工作,套接字保持打开状态,并且无法确定超时条件(没有读取,也没有选择返回错误)。我确实明白有一个解决方案。最后,我可以将计时器添加到轮询循环中,在其中执行 sckt_cb_check() 并在一段时间后从该点删除所有这些服务器,但我认为这不是一个好主意。

所以,我发现有一种方法可以使用 pselect、设置超时并指示 EINTR errno,但我不知道如何将它们连接在一起。

UPD:实际上,如果发现虚拟服务器,它将永远保留在 sckt_cd_check() 中。这使得我在主循环中使用计时器的解决方案毫无用处。

最佳答案

实际上,通过添加单独的时间戳字段和用于检查所有事件轮询队列的 select() 超时来实现每个服务器的超时。

关于c - 处理 *nix 中多路复用套接字上的读取超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22173200/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com