gpt4 book ai didi

c - POSIX sem_wait() SIGABRT

转载 作者:太空宇宙 更新时间:2023-11-04 07:36:57 26 4
gpt4 key购买 nike

我正在做一个学校项目,我们必须制作一个多线程网络服务器。当我在我的信号量上调用 sem_wait 时遇到问题(应该初始化为 0 但似乎已经被 sem_post() 编辑为 1)。我得到一个 SIGABRT。

我在下面附上我的代码,我在导致我的问题的行上发表了评论。我花了几个小时使用调试器,但运气不佳。

#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string>
#include <string.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <stdio.h>
#include <cstdlib>
#include <strings.h>

#define PORTNUM 7000
#define NUM_OF_THREADS 5
#define oops(msg) { perror(msg); exit(1);}
#define FCFS 0
#define SJF 1;

void bindAndListen();
void acceptConnection(int socket_file_descriptor);
void* dispatchJobs(void*);
void* replyToClient(void* pos);

//holds ids of worker threads
pthread_t threads[NUM_OF_THREADS];

//mutex variable for sleep_signal_cond
pthread_mutex_t sleep_signal_mutex[NUM_OF_THREADS];
//holds the condition variables to signal when the thread should be unblocked
pthread_cond_t sleep_signal_cond[NUM_OF_THREADS];

//mutex for accessing sleeping_thread_list
pthread_mutex_t sleeping_threads_mutex = PTHREAD_MUTEX_INITIALIZER;
//list of which threads are sleeping so they can be signaled and given a job
std::vector<bool> *sleeping_threads_list = new std::vector<bool>();

//number of threads ready for jobs
sem_t available_threads;
sem_t waiting_jobs;


//holds requests waiting to be given to one of the threads for execution
//request implemented as int[3] with int[0]== socket_descriptor int[1]== file_size int[2]== file_descriptor of requested file
//if file_size == 0 then HEAD request
std::vector<std::vector<int> >* jobs = new std::vector<std::vector<int> >();

pthread_mutex_t jobs_mutex = PTHREAD_MUTEX_INITIALIZER;


int main (int argc, char * const argv[]) {
//holds id for thread responsible for removing jobs from ready queue and assigning them to worker thread
pthread_t dispatcher_thread;

//initializes semaphores
if(sem_init(&available_threads, 0, NUM_OF_THREADS) != 0){
oops("Error Initializing Semaphore");
}

if(sem_init(&waiting_jobs, 0, 0) !=0){
oops("Error Initializing Semaphore");
}

//initializes condition variables and guarding mutexes
for(int i=0; i<NUM_OF_THREADS; i++){
pthread_cond_init(&sleep_signal_cond[i], NULL);
pthread_mutex_init(&sleep_signal_mutex[i], NULL);
}

if(pthread_create(&dispatcher_thread, NULL, dispatchJobs, (void*)NULL) !=0){
oops("Error Creating Distributer Thread");
}

for (int i=0; i<NUM_OF_THREADS; i++) {
pthread_mutex_lock(&sleeping_threads_mutex);
printf("before");
sleeping_threads_list->push_back(true);
printf("after");
pthread_mutex_unlock(&sleeping_threads_mutex);
}

printf("here");
for (int i=0; i<NUM_OF_THREADS; i++) {
//creates threads and stores ID in threads
if(pthread_create(&threads[i], NULL, replyToClient, (void*)i) !=0){
oops("Error Creating Thread");
}
}

/*
if(sem_init(&available_threads, 0, NUM_OF_THREADS) !=0){
oops("Error Initializing Semaphore");
}

if(sem_init(&waiting_jobs, 0, 0) !=0){ //this is the semaphore thats used in the sem_wait
oops("Error Initializing Semaphore");
}*/

bindAndListen();
}


//binds to socket and listens for connections
//being done by main thead
void bindAndListen(){
struct sockaddr_in saddr;
struct hostent *hp;
char hostname[256];
int sock_id, sock_fd;

gethostname(hostname, 256);
hp = gethostbyname(hostname);
bzero(&saddr, sizeof(saddr));

//errno = 0;

bcopy(hp->h_addr, &saddr.sin_addr, hp->h_length);

saddr.sin_family = AF_INET;
saddr.sin_port = htons(PORTNUM);
saddr.sin_addr.s_addr = INADDR_ANY;

sock_id = socket(AF_INET, SOCK_STREAM, 0);

if(sock_id == -1){
oops("socket");
printf("socket");
}

if(bind(sock_id, (const sockaddr*)&saddr, sizeof(saddr)) ==0){

if(listen(sock_id, 5) ==-1){
oops("listen");
}

//each time a new connection is accepted, get file info and push to ready queue
while(1){
int addrlen = sizeof(saddr);
sock_fd = accept(sock_id, (sockaddr*)&saddr, (socklen_t*)&addrlen);
if (sock_fd > 0) {
acceptConnection(sock_fd);
}else {
oops("Error Accepting Connection");
}
}
}else{
oops("there was an error binding to socket");
}
}// end of bindAndListen()


//accepts connection and gets file info of requested file
//being done by main thread
void acceptConnection(int sock_fd){
printf("**Server: A new client connected!");

//only using loop so on error we can break out on error
while(true){
//used to hold input from client
char* inputBuff = new char[BUFSIZ];
int slen = read(sock_fd, inputBuff, BUFSIZ);

//will sit on space between HEAD/GET and path
int pos1 = 0;
//will sit on space between path and HTTP version
int pos2 = 0;

//need duplicate ptr so we can manipulate one in the loop
char* buffPtr = inputBuff;
//parses client input breaks up query by spaces
for(int i=0; i<slen; i++){
if(*buffPtr == ' '){
if (pos1 == 0) {
pos1 = i;
}else {
pos2 = i;
break;
}
}
buffPtr++;
}

if((pos1 - pos2) >=0){
std::string str = "Invalid Query";
write(sock_fd, str.c_str(), strlen(str.c_str()));
break;
}

printf("slen length %d\n", slen);

std::string* method = new std::string(inputBuff, pos1);

printf("method length %lu\n",method->length());

//increment the ptr for buff to the starting pos of the path
inputBuff += ++pos1;

printf("pos2 - pos1 %d\n", (pos2 - pos1));

printf("pos1 = %d pos2 = %d\n", pos1, pos2);

std::string* path = new std::string(inputBuff, (pos2 - pos1));

printf("path length %lu\n", path->length());

printf("part1 %s\n", method->c_str());

printf("part2 %s\n", path->c_str());

//opens file requested by client
int fd = open(path->c_str(), O_RDONLY);
if(fd < 0){
std::string* error = new std::string("Error Opening File");
*error += *path + std::string(strerror(errno), strlen(strerror(errno)));
write(sock_fd, error->c_str(), strlen(error->c_str()));
break;
}

int file_size;
if(method->compare("GET") == 0){
//gets file info and puts the resulting struct in file_info
struct stat file_info;
if(fstat(fd, &file_info) !=0){
oops("Error getting file info");
}
file_size = file_info.st_size;
}else if(method->compare("HEAD")){
file_size = 0;
}else{
write(sock_fd, "Invalid Query", strlen("Invalid Query"));
break;
}

//job to be pushed to ready queue
std::vector<int> job;
job.push_back(sock_fd);
job.push_back(file_size);
job.push_back(fd);

//check mutex guarding the ready queue
pthread_mutex_lock(&jobs_mutex);
//push job to back of ready queue
jobs->push_back(job);
//unlock mutex guarding the ready queue
pthread_mutex_unlock(&jobs_mutex);

//increment number of jobs in ready queue
sem_post(&waiting_jobs);

} //end of while(true)
// we only end up here if there was an error
fflush(stdout);
close(sock_fd);
}// end of acceptConnection()


//routine run by dispather thread
void *dispatchJobs(void*){
while(true){
//wait for a thread to be available to execute a job
sem_wait(&available_threads);
//wait for a job to be waiting in the ready queue
sem_wait(&waiting_jobs); //this is the line thats crashing
//aquire lock to check which threads are waiting
pthread_mutex_lock(&sleeping_threads_mutex);
//go through list of threads to see which is waiting
for(int i=0; i<sleeping_threads_list->size(); i++){
if(sleeping_threads_list->at(i)){
//unlocks lock for access to list of waiting threads
pthread_mutex_unlock(&sleeping_threads_mutex);
//allows us access to the list of condition variables to signal the thread to resume execution
pthread_mutex_lock(&sleep_signal_mutex[i]);
pthread_cond_signal(&sleep_signal_cond[i]);
pthread_mutex_unlock(&sleep_signal_mutex[i]);
}
}

}//end of while(true)
}//end of dispatchJobs()


//sends file or metadata to client
//run by worker thread
//pos is position of condition variable that it waits to be signaled in the sleep_signal_cond[] array
void* replyToClient(void* pos){
int position = (long)pos;
while(true){
//waits for dispather thread to signal it
pthread_mutex_lock(&sleep_signal_mutex[position]);
pthread_cond_wait(&sleep_signal_cond[position], &sleep_signal_mutex[position]);
pthread_mutex_unlock(&sleep_signal_mutex[position]);


//lock mutex to get job to be executed
pthread_mutex_lock(&jobs_mutex);
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(&jobs_mutex);

//socket file descriptor, used for writing to socket
int sock_fd =job[0];
int file_size = job[1];
//file descriptor for requested job
int fd = job[2];

//holds output to be written to socket
char* outputBuffer = new char[BUFSIZ];

//GET request, send file
if(file_size !=0){
int readResult = 0;
while ((readResult = read(fd, outputBuffer, BUFSIZ)) > 0) {
if(write(sock_fd, outputBuffer, readResult) != readResult){
printf("We may have a write error");
}
}
if(readResult < 0){
oops("Error Reading File");
}
if(readResult == 0){
printf("finished sending file");
}
}else{ // HEAD request

}
//increment number of available threads
sem_post(&available_threads);
}
}// end of replyToClient()

最佳答案

再次检查代码的整个逻辑——可以到达这里:

pthread_mutex_lock(&jobs_mutex);
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(&jobs_mutex);

jobs->size () == 0,在这种情况下,front()erase() 调用未定义的行为,这很可能会导致您观察到的效果。

检查您的程序在进行以下更改后是否仍然崩溃:

//lock mutex to get job to be executed
pthread_mutex_lock(&jobs_mutex);
if (jobs->size () == 0)
{
pthread_mutex_unlock (&jobs_mutex);
continue;
}
std::vector<int> job = jobs->front();
//removes job from front of vector
jobs->erase(jobs->begin());
//releases mutex
pthread_mutex_unlock(&jobs_mutex);

关于c - POSIX sem_wait() SIGABRT,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7925582/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com