gpt4 book ai didi

c - Stomp 1.2 ActiveMQ 5.15 - 收到 ACK 时没有确认 ack id

转载 作者:行者123 更新时间:2023-11-30 14:47:52 30 4
gpt4 key购买 nike

我已经构建了一个 c 函数,该函数应该通过 stomp 库从 ActiveMQ 读取。此代码正在读取收据之前的附加消息(这是我所期望的)。请告知为什么我的 RECEIPT 帧没有在第二个 MESSAGE 帧之前到来。另请注意,我用 BEGIN 和 COMMIT 框架包装此代码,这可能是也可能不是最佳实践。

提前致谢。

int receive_from_queue_ack( MQDEF *mqdef, int transaction ) {
apr_status_t rc;
stomp_frame frame_write;
stomp_frame* frame_read = NULL;
char transaction_str[37];
char* ack = NULL;
char* protocol = NULL;
char* receipt_id = NULL;
char* message_id = NULL;
char* message = NULL;
char* seq = NULL;
uuid_t uuid;
char receipt_id_begin_uuid[37];
char receipt_id_commit_uuid[37];
char receipt_id_ack_uuid[37];
char id_uuid[37];
int receipt_flag = 1;

uuid_generate_random(uuid);
uuid_unparse_lower(uuid, transaction_str);

uuid_generate_random(uuid);
uuid_unparse_lower(uuid, receipt_id_begin_uuid);

uuid_generate_random(uuid);
uuid_unparse_lower(uuid, id_uuid);

uuid_generate_random(uuid);
uuid_unparse_lower(uuid, receipt_id_commit_uuid);


uuid_generate_random(uuid);
uuid_unparse_lower(uuid, receipt_id_ack_uuid);

fprintf(stdout, "1) *** Transaction BEGIN - (receive_from_queue)\n");
frame_write.command = "BEGIN";
frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
frame_write.body_length = -1;
frame_write.body = NULL;
rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}

printf("3) Subscribe begin\n");
frame_write.command = "SUBSCRIBE";
frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
apr_hash_set(frame_write.headers, "destination", APR_HASH_KEY_STRING, mqdef->queueName);
apr_hash_set(frame_write.headers, "ack", APR_HASH_KEY_STRING, "client");
apr_hash_set(frame_write.headers, "id", APR_HASH_KEY_STRING, "client-123");
apr_hash_set(frame_write.headers, "activemq.prefetchSize", APR_HASH_KEY_STRING, "1");
frame_write.body_length = -1;
frame_write.body = NULL;
rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}
frame_read = NULL;
rc = stomp_read(mqdef->amqConnection, &frame_read, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}
ack = NULL;
if( frame_read != NULL && strncmp(frame_read->command, "MESSAGE", 7) == 0 ) {
ack = (char*) apr_hash_get(frame_read->headers, "ack", APR_HASH_KEY_STRING);
printf("5) ack=<%s>\n", ack);
if( ack == NULL ) {
printf("ack being null is a problem\n");
exit(1);
}
message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
printf("5) message_id=<%s>\n", message_id);
printf("5) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
} else if( frame_read != NULL && strncmp(frame_read->command, "ERROR", 5) == 0 ) {
message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
printf("5) message_id=<%s>\n", message_id);
printf("5) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
exit(1);
} else {
message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
printf("5) message_id=<%s>\n", message_id);
printf("5) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
printf("5) should never get here.\n");
exit(1);
}
frame_write.command = "ACK";
frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
apr_hash_set(frame_write.headers, "id", APR_HASH_KEY_STRING, ack);
apr_hash_set(frame_write.headers, "receipt-id", APR_HASH_KEY_STRING, receipt_id_ack_uuid);
printf("set id =<%s>\n", ack);
frame_write.body_length = -1;
frame_write.body = NULL;
rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}
frame_read = NULL;
rc = stomp_read(mqdef->amqConnection, &frame_read, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}
if( frame_read != NULL && strncmp(frame_read->command, "RECEIPT", 7) == 0 ) {
message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
receipt_id = (char*) apr_hash_get(frame_read->headers, "receipt-id", APR_HASH_KEY_STRING);
printf("7) receipt-id=<%s>\n", receipt_id);
printf("7) message-id=<%s>\n", message_id);
printf("7) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
} else if( frame_read != NULL && strncmp(frame_read->command, "ERROR", 5) == 0 ) {
message_id = (char*) apr_hash_get(frame_read->headers, "message_id", APR_HASH_KEY_STRING);
receipt_id = (char*) apr_hash_get(frame_read->headers, "receipt-id", APR_HASH_KEY_STRING);
printf("7) receipt-id=<%s>\n", receipt_id);
printf("7) message-id=<%s>\n", message_id);
printf("7) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
exit(1);
} else {
message_id = (char*) apr_hash_get(frame_read->headers, "message_id", APR_HASH_KEY_STRING);
printf("7) message-id=<%s>\n", message_id);
printf("7) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
ack = (char*) apr_hash_get(frame_read->headers, "ack", APR_HASH_KEY_STRING);
printf("5) ack=<%s>\n", ack);
printf("this is a failure\n");
frame_write.command = "ACK";
frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
apr_hash_set(frame_write.headers, "id", APR_HASH_KEY_STRING, ack);
apr_hash_set(frame_write.headers, "receipt-id", APR_HASH_KEY_STRING, receipt_id_ack_uuid);
rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}
}

frame_write.command = "COMMIT";
frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
frame_write.body_length = -1;
frame_write.body = NULL;
rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
if ( rc != APR_SUCCESS ) {
return FAILURE;
}
fprintf(stdout, "12) *** Transaction Complete - (receive_from_queue)\n\n");
}

最佳答案

您没有在确认中使用正确的 STOMP header 来指示 ACK ID,因此代理告诉您您做错了。 STOMP 规范是开始学习在每个执行中添加什么内容的好地方,在本例中, header 是“id”而不是“message-id”

与往常一样,C 代码很难阅读,因此使用一些注释或上下文重新修改上面的代码会很有帮助,但到目前为止,我还没有看到客户端代码从传入消息中获取“ack” header ,并且将其传输到实际的“ACK”帧,因此这可能是问题所在。

来自规范:

If the message is received from a subscription that requires explicit acknowledgment (either client or client-individual mode) then the MESSAGE frame MUST also contain an ack header with an arbitrary value. This header will be used to relate the message to a subsequent ACK or NACK frame.

请参阅STOMP specification了解更多

关于c - Stomp 1.2 ActiveMQ 5.15 - 收到 ACK 时没有确认 ack id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50929695/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com