gpt4 book ai didi

c - 线程: Increasing program execution time with respect to number of threads

转载 作者:行者123 更新时间:2023-11-30 16:10:48 26 4
gpt4 key购买 nike

我正在尝试使用 pthreads、C 构建一个高效的并发 HashMap 。

以下是我的实现

#include <stdlib.h>
#include <stddef.h>
#include <pthread.h>
#include <stdint.h>
#include <limits.h>
#include <stdio.h>
#include <linux/limits.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

#define ENTRIES_PER_BUCKET 3

struct Bucket
{
pthread_mutex_t mutex;
void **keys;
int *vals;
struct Bucket *next;
};

struct Concurrent_Map
{
struct Bucket *buckets;
map_keys_equality *keys_eq;
map_key_hash *khash;
int capacity;
};

int concurrent_map_allocate /*@ <t> @*/ (map_keys_equality *keq, map_key_hash *khash,
unsigned capacity,
struct Concurrent_Map **map_out)

{

struct Concurrent_Map *old_map_val = *map_out;
struct Concurrent_Map *map_alloc = malloc(sizeof(struct Concurrent_Map));
if (map_alloc == NULL)
{
return 0;
}
*map_out = (struct Concurrent_Map *)map_alloc;

struct Bucket *buckets_alloc = (struct Bucket *)malloc(sizeof(struct Bucket) * (int)capacity);

if (buckets_alloc == NULL)
{
free(map_alloc);
*map_out = old_map_val;
return 0;
}
(*map_out)->buckets = buckets_alloc;
(*map_out)->capacity = capacity;
(*map_out)->keys_eq = keq;
(*map_out)->khash = khash;

unsigned i;

for (i = 0; i < capacity; i++)
{
if (pthread_mutex_init(&((*map_out)->buckets[i].mutex), NULL) == 0)
{
void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));

if (key_alloc != NULL)
{
(*map_out)->buckets[i].keys = key_alloc;

int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{

(*map_out)->buckets[i].keys[k] = NULL;
}
}

int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));

if (vals_alloc != NULL)
{
(*map_out)->buckets[i].vals = vals_alloc;

int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
(*map_out)->buckets[i].vals[k] = -1;
}
}

(*map_out)->buckets[i].next = NULL;
}
}

// todo exceptions in allocation

return 1;
}

static unsigned loop(unsigned k, unsigned capacity)
{
unsigned g = k % capacity;

unsigned res = (g + capacity) % capacity;

return res;
}

int concurrent_map_get(struct Concurrent_Map *map, void *key, int *value_out)

{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);

unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);

if (bucket_index < map->capacity)
{

struct Bucket *bucket = &(map->buckets[bucket_index]);

pthread_mutex_t mutex = bucket->mutex;

pthread_mutex_lock(&mutex);

int j;
do
{
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
int val = bucket->vals[j];
if (map->keys_eq(bucket->keys[j], key))
{
if (bucket->vals[j] == val)
{
*value_out = val;
return 1;
}
else
{
*value_out = -1;
return 0;
}
}
}
if (bucket->next != NULL)
{
bucket = (bucket->next);
}
else
{
break;
pthread_mutex_unlock(&mutex);
}

pthread_mutex_unlock(&mutex);

} while (1);
}
*value_out = -1;
return 0;
}

int concurrent_map_put(struct Concurrent_Map *map, void *key, int value)

{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);

unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);

struct Bucket *bucket = &(map->buckets[bucket_index]);

int j;

do
{

pthread_mutex_t mutex = bucket->mutex;

int j;

pthread_mutex_lock(&mutex);

for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
if (map->keys_eq(bucket->keys[j], key))
{
pthread_mutex_unlock(&mutex);
return 0;
}
else if (bucket->keys[j] == NULL)
{
bucket->vals[j] = value;
bucket->keys[j] = key;
pthread_mutex_unlock(&mutex);
return 1;
}
}
if (bucket->next == NULL)

{
// allocate a new bucket

struct Bucket *new_bucket = malloc(sizeof(struct Bucket));

if (pthread_mutex_init(&(new_bucket->mutex), NULL) == 0)
{
void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));

if (key_alloc != NULL)
{
new_bucket->keys = key_alloc;

int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
new_bucket->keys[k] = NULL;
}
}

int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));

if (vals_alloc != NULL)
{
new_bucket->vals = vals_alloc;

int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
new_bucket->vals[k] = -1;
}
}

bucket->next = new_bucket;
}
}

pthread_mutex_unlock(&mutex);
bucket = bucket->next;

} while (1);

return 0;
}

int concurrent_map_erase(struct Concurrent_Map *map, void *key, void **trash)

{

map_key_hash *khash = map->khash;
unsigned hash = khash(key);

unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);

struct Bucket *bucket = &(map->buckets[bucket_index]);

int j;

do
{

pthread_mutex_t mutex = bucket->mutex;

int j;

pthread_mutex_lock(&mutex);

for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
if (map->keys_eq(bucket->keys[j], key))
{
bucket->vals[j] = -1;
bucket->keys[j] = NULL;
pthread_mutex_unlock(&mutex);
return 1;
}
}

pthread_mutex_unlock(&mutex);
if (bucket->next != NULL)
{
bucket = (bucket->next);
}
else
{
break;
}

} while (1);
return 0;
}

int concurrent_map_size(struct Concurrent_Map *map)

{
int num_buckets = 0;

struct Bucket *buckets = map->buckets;
unsigned i;

for (i = 0; i < map->capacity; i++)
{
struct Bucket bucket = buckets[i];
do
{
num_buckets++;
if (bucket.next != NULL)
{
bucket = *(bucket.next);
}
else
{
break;
}

} while (1);
}
return num_buckets * ENTRIES_PER_BUCKET;
}
struct FlowId
{
int src_port;
int dst_port;
int src_ip;
int dst_ip;
int internal_device;
int protocol;
};

bool FlowId_eq(void *a, void *b)

{
if (a == NULL || b == NULL)
{
return false;
}
struct FlowId *id1 = a;
struct FlowId *id2 = b;

return (id1->src_port == id2->src_port) && (id1->dst_port == id2->dst_port) && (id1->src_ip == id2->src_ip) && (id1->dst_ip == id2->dst_ip) && (id1->internal_device == id2->internal_device) && (id1->protocol == id2->protocol);
}

unsigned FlowId_hash(void *obj)

{
struct FlowId *id = obj;
unsigned hash = 0;
hash = __builtin_ia32_crc32si(hash, id->src_port);
hash = __builtin_ia32_crc32si(hash, id->dst_port);
hash = __builtin_ia32_crc32si(hash, id->src_ip);
hash = __builtin_ia32_crc32si(hash, id->dst_ip);
hash = __builtin_ia32_crc32si(hash, id->internal_device);
hash = __builtin_ia32_crc32si(hash, id->protocol);
return hash;
}

struct Concurrent_Map *concurrent_map;

#define NUM_THREADS 2
#define NUM_PACKETS 10000000

void *expirator(void *arg)
{
// printf("Thread started executing\n");
unsigned i = 0;
int error = 0;
unsigned packet_count = NUM_PACKETS / NUM_THREADS;
while (i < packet_count)
{
i++;
struct FlowId *id = malloc(sizeof(struct FlowId));
struct FlowId *id1 = malloc(sizeof(struct FlowId));
id->dst_ip = 1;
id->src_ip = 1;
id->internal_device = 1;
id->protocol = 1;
id->src_port = 1;
id->dst_port = rand() % 65536;

id1->dst_ip = 1;
id1->src_ip = 1;
id1->internal_device = 1;
id1->protocol = 1;
id1->src_port = 1;
id1->dst_port = rand() % 65536;

int external_port = rand() % 65536;
int external;

concurrent_map_erase(concurrent_map, id, NULL);

concurrent_map_put(concurrent_map, id, external_port);
concurrent_map_get(concurrent_map, id, &external);

if (external_port != external)
{
error++;
}
else
{
}
}
return NULL;
}

int main()
{

clock_t begin = clock();

concurrent_map_allocate(FlowId_eq, FlowId_hash, 65536, &(concurrent_map));

pthread_t *threads = malloc(sizeof(pthread_t) * NUM_THREADS);
int i;
for (i = 0; i < NUM_THREADS; i++)
{
if (pthread_create(&threads[i], NULL, expirator, NULL) != 0)
{
printf("Error creating threads");
exit(0);
}
}
for (i = 0; i < NUM_THREADS; i++)
{
if (pthread_join(threads[i], NULL) != 0)
{
printf("Error joining threads");
exit(0);
}
}
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("%lf\n", time_spent);
return 0;
}

以下是运行该程序的方法。

gcc  concurrent_map.c  -o test-concurrent-new -lpthread -msse4.2 -O3

然后我测量固定工作负载的执行时间,以下是我观察到的时间值。

1:3.29

2:6.687811

3:5.88

4:6.23

5:6.38

6:6.52

7:6.74

8:6.82

看起来,当线程数量增加时,执行时间会增加,但几乎保持不变。

我使用 Mutrace 分析了这段代码,它查找互斥锁争用。事实证明

No mutex contended according to filtering parameters.

我检查了缓存未命中数,发现修改线程数后,缓存未命中数大致相等。

为什么线程数增加执行时间没有减少?

我在 32 核机器上运行它

最佳答案

rand() 通常不适合多线程执行。而是使用 rand_r()。

还使用Linux时间工具对应用程序进行计时。

您的工作负载生成会带来巨大的开销,我认为这是这里的瓶颈,而不是并发 HashMap

关于c - 线程: Increasing program execution time with respect to number of threads,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58799910/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com