- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 pthreads、C 构建一个高效的并发 HashMap 。
以下是我的实现
#include <stdlib.h>
#include <stddef.h>
#include <pthread.h>
#include <stdint.h>
#include <limits.h>
#include <stdio.h>
#include <linux/limits.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#define ENTRIES_PER_BUCKET 3
struct Bucket
{
pthread_mutex_t mutex;
void **keys;
int *vals;
struct Bucket *next;
};
struct Concurrent_Map
{
struct Bucket *buckets;
map_keys_equality *keys_eq;
map_key_hash *khash;
int capacity;
};
int concurrent_map_allocate /*@ <t> @*/ (map_keys_equality *keq, map_key_hash *khash,
unsigned capacity,
struct Concurrent_Map **map_out)
{
struct Concurrent_Map *old_map_val = *map_out;
struct Concurrent_Map *map_alloc = malloc(sizeof(struct Concurrent_Map));
if (map_alloc == NULL)
{
return 0;
}
*map_out = (struct Concurrent_Map *)map_alloc;
struct Bucket *buckets_alloc = (struct Bucket *)malloc(sizeof(struct Bucket) * (int)capacity);
if (buckets_alloc == NULL)
{
free(map_alloc);
*map_out = old_map_val;
return 0;
}
(*map_out)->buckets = buckets_alloc;
(*map_out)->capacity = capacity;
(*map_out)->keys_eq = keq;
(*map_out)->khash = khash;
unsigned i;
for (i = 0; i < capacity; i++)
{
if (pthread_mutex_init(&((*map_out)->buckets[i].mutex), NULL) == 0)
{
void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));
if (key_alloc != NULL)
{
(*map_out)->buckets[i].keys = key_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
(*map_out)->buckets[i].keys[k] = NULL;
}
}
int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));
if (vals_alloc != NULL)
{
(*map_out)->buckets[i].vals = vals_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
(*map_out)->buckets[i].vals[k] = -1;
}
}
(*map_out)->buckets[i].next = NULL;
}
}
// todo exceptions in allocation
return 1;
}
static unsigned loop(unsigned k, unsigned capacity)
{
unsigned g = k % capacity;
unsigned res = (g + capacity) % capacity;
return res;
}
int concurrent_map_get(struct Concurrent_Map *map, void *key, int *value_out)
{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);
unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);
if (bucket_index < map->capacity)
{
struct Bucket *bucket = &(map->buckets[bucket_index]);
pthread_mutex_t mutex = bucket->mutex;
pthread_mutex_lock(&mutex);
int j;
do
{
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
int val = bucket->vals[j];
if (map->keys_eq(bucket->keys[j], key))
{
if (bucket->vals[j] == val)
{
*value_out = val;
return 1;
}
else
{
*value_out = -1;
return 0;
}
}
}
if (bucket->next != NULL)
{
bucket = (bucket->next);
}
else
{
break;
pthread_mutex_unlock(&mutex);
}
pthread_mutex_unlock(&mutex);
} while (1);
}
*value_out = -1;
return 0;
}
int concurrent_map_put(struct Concurrent_Map *map, void *key, int value)
{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);
unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);
struct Bucket *bucket = &(map->buckets[bucket_index]);
int j;
do
{
pthread_mutex_t mutex = bucket->mutex;
int j;
pthread_mutex_lock(&mutex);
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
if (map->keys_eq(bucket->keys[j], key))
{
pthread_mutex_unlock(&mutex);
return 0;
}
else if (bucket->keys[j] == NULL)
{
bucket->vals[j] = value;
bucket->keys[j] = key;
pthread_mutex_unlock(&mutex);
return 1;
}
}
if (bucket->next == NULL)
{
// allocate a new bucket
struct Bucket *new_bucket = malloc(sizeof(struct Bucket));
if (pthread_mutex_init(&(new_bucket->mutex), NULL) == 0)
{
void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));
if (key_alloc != NULL)
{
new_bucket->keys = key_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
new_bucket->keys[k] = NULL;
}
}
int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));
if (vals_alloc != NULL)
{
new_bucket->vals = vals_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
new_bucket->vals[k] = -1;
}
}
bucket->next = new_bucket;
}
}
pthread_mutex_unlock(&mutex);
bucket = bucket->next;
} while (1);
return 0;
}
int concurrent_map_erase(struct Concurrent_Map *map, void *key, void **trash)
{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);
unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);
struct Bucket *bucket = &(map->buckets[bucket_index]);
int j;
do
{
pthread_mutex_t mutex = bucket->mutex;
int j;
pthread_mutex_lock(&mutex);
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
if (map->keys_eq(bucket->keys[j], key))
{
bucket->vals[j] = -1;
bucket->keys[j] = NULL;
pthread_mutex_unlock(&mutex);
return 1;
}
}
pthread_mutex_unlock(&mutex);
if (bucket->next != NULL)
{
bucket = (bucket->next);
}
else
{
break;
}
} while (1);
return 0;
}
int concurrent_map_size(struct Concurrent_Map *map)
{
int num_buckets = 0;
struct Bucket *buckets = map->buckets;
unsigned i;
for (i = 0; i < map->capacity; i++)
{
struct Bucket bucket = buckets[i];
do
{
num_buckets++;
if (bucket.next != NULL)
{
bucket = *(bucket.next);
}
else
{
break;
}
} while (1);
}
return num_buckets * ENTRIES_PER_BUCKET;
}
struct FlowId
{
int src_port;
int dst_port;
int src_ip;
int dst_ip;
int internal_device;
int protocol;
};
bool FlowId_eq(void *a, void *b)
{
if (a == NULL || b == NULL)
{
return false;
}
struct FlowId *id1 = a;
struct FlowId *id2 = b;
return (id1->src_port == id2->src_port) && (id1->dst_port == id2->dst_port) && (id1->src_ip == id2->src_ip) && (id1->dst_ip == id2->dst_ip) && (id1->internal_device == id2->internal_device) && (id1->protocol == id2->protocol);
}
unsigned FlowId_hash(void *obj)
{
struct FlowId *id = obj;
unsigned hash = 0;
hash = __builtin_ia32_crc32si(hash, id->src_port);
hash = __builtin_ia32_crc32si(hash, id->dst_port);
hash = __builtin_ia32_crc32si(hash, id->src_ip);
hash = __builtin_ia32_crc32si(hash, id->dst_ip);
hash = __builtin_ia32_crc32si(hash, id->internal_device);
hash = __builtin_ia32_crc32si(hash, id->protocol);
return hash;
}
struct Concurrent_Map *concurrent_map;
#define NUM_THREADS 2
#define NUM_PACKETS 10000000
void *expirator(void *arg)
{
// printf("Thread started executing\n");
unsigned i = 0;
int error = 0;
unsigned packet_count = NUM_PACKETS / NUM_THREADS;
while (i < packet_count)
{
i++;
struct FlowId *id = malloc(sizeof(struct FlowId));
struct FlowId *id1 = malloc(sizeof(struct FlowId));
id->dst_ip = 1;
id->src_ip = 1;
id->internal_device = 1;
id->protocol = 1;
id->src_port = 1;
id->dst_port = rand() % 65536;
id1->dst_ip = 1;
id1->src_ip = 1;
id1->internal_device = 1;
id1->protocol = 1;
id1->src_port = 1;
id1->dst_port = rand() % 65536;
int external_port = rand() % 65536;
int external;
concurrent_map_erase(concurrent_map, id, NULL);
concurrent_map_put(concurrent_map, id, external_port);
concurrent_map_get(concurrent_map, id, &external);
if (external_port != external)
{
error++;
}
else
{
}
}
return NULL;
}
int main()
{
clock_t begin = clock();
concurrent_map_allocate(FlowId_eq, FlowId_hash, 65536, &(concurrent_map));
pthread_t *threads = malloc(sizeof(pthread_t) * NUM_THREADS);
int i;
for (i = 0; i < NUM_THREADS; i++)
{
if (pthread_create(&threads[i], NULL, expirator, NULL) != 0)
{
printf("Error creating threads");
exit(0);
}
}
for (i = 0; i < NUM_THREADS; i++)
{
if (pthread_join(threads[i], NULL) != 0)
{
printf("Error joining threads");
exit(0);
}
}
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("%lf\n", time_spent);
return 0;
}
以下是运行该程序的方法。
gcc concurrent_map.c -o test-concurrent-new -lpthread -msse4.2 -O3
然后我测量固定工作负载的执行时间,以下是我观察到的时间值。
1:3.29
2:6.687811
3:5.88
4:6.23
5:6.38
6:6.52
7:6.74
8:6.82
看起来,当线程数量增加时,执行时间会增加,但几乎保持不变。
我使用 Mutrace 分析了这段代码,它查找互斥锁争用。事实证明
No mutex contended according to filtering parameters.
我检查了缓存未命中数,发现修改线程数后,缓存未命中数大致相等。
为什么线程数增加执行时间没有减少?
我在 32 核机器上运行它
最佳答案
rand() 通常不适合多线程执行。而是使用 rand_r()。
还使用Linux时间工具对应用程序进行计时。
您的工作负载生成会带来巨大的开销,我认为这是这里的瓶颈,而不是并发 HashMap
关于c - 线程: Increasing program execution time with respect to number of threads,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58799910/
从 angular 5.1 更新到 6.1 后,我开始从我的代码中收到一些错误,如下所示: Error: ngc compilation failed: components/forms/utils.
我正在学习 Typescript 并尝试了解类型和接口(interface)的最佳实践。我正在玩一个使用 GPS 坐标的示例,想知道一种方法是否比另一种更好。 let gps1 : number[];
type padding = [number, number, number, number] interface IPaddingProps { defaultValue?: padding
这两种格式在内存中保存结果的顺序上有什么区别吗? number = number + 10; number += 10; 我记得一种格式会立即保存结果,因此下一行代码可以使用新值,而对于另一种格式,
在 Python 匹配模式中,如何匹配像 1 这样的文字数字在按数字反向引用后 \1 ? 我尝试了 \g用于此目的的替换模式中可用的语法,但它在我的匹配模式中不起作用。 我有一个更大的问题,我想使用一
我的源文件here包含 HTML 代码,我想将电话号码更改为可在我的应用程序中单击。我正在寻找一个正则表达式来转换字符串 >numbernumber(\d+)$1numbernumber<",我们在S
我们有一个包含 2 个字段和一个按钮的表单。我们想要点击按钮来输出位于 int A 和 int B 之间的随机整数(比如 3、5 或 33)? (不需要使用 jQuery 或类似的东西) 最佳答案 你
我收到以下类型错误(TypeScript - 3.7.5)。 error TS2345: Argument of type '(priority1: number, priority2: number
只想创建简单的填充器以在其他功能中使用它: function fillLine(row, column, length, bgcolor) { var sheet = SpreadsheetApp
我有一个问题。当我保存程序输出的 *.txt 时,我得到以下信息:0.021111111111111112a118d0 以及更多的东西。 问题是: 这个数字中的“d0”和“a”是什么意思? 我不知道“
首先:抱歉标题太长了,但我发现很难用一句话来解释这个问题;)。是的,我也四处搜索(这里和谷歌),但找不到合适的答案。 所以,问题是这样的: 数字 1-15 将像这样放在金字塔中(由数组表示):
我想从字符串中提取血压。数据可能如下所示: text <- c("at 10.00 seated 132/69", "99/49", "176/109", "10.12 I 128/51, II 1
当尝试执行一个简单的 bash 脚本以将前面带有 0 的数字递增 1 时,原始数字被错误地解释。 #!/bin/bash number=0026 echo $number echo $((number
我有一个类型为 [number, number] 的字段,TypeScript 编译器(strict 设置为 true)出现问题,提示初始值值(value)。我尝试了以下方法: public shee
你能帮我表达数组吗:["232","2323","233"] 我试试这个:/^\[("\d{1,7}")|(,"\d{1,7}")\]$/ 但是这个表达式不能正常工作。 我使用 ruby(rail
这个问题在这里已经有了答案: meaning of (number) & (-number) (4 个回答) 关闭6年前. 例如: int get(int i) { int res = 0;
我正在考虑使用 Berkeley DB作为高度并发的移动应用程序后端的一部分。对于我的应用程序,使用 Queue对于他们的记录级别锁定将是理想的。但是,如标题中所述,我需要查询和更新概念建模的数据,如
我正在尝试解决涉及重复数字的特定 JavaScript 练习,为此我需要将重复数字处理到大量小数位。 目前我正在使用: function divide(numerator, denominator){
我有这个数组类型: interface Details { Name: string; URL: string; Year: number; } interface AppState {
我们正在使用 Spring 3.x.x 和 Quartz 2.x.x 实现 Web 应用程序。 Web 服务器是 Tomcat 7.x.x。我们有 3 台服务器。 Quartz 是集群式的,因此所有这
我是一名优秀的程序员,十分优秀!