gpt4 book ai didi

c - 消息在 RabbitMQ C master 中丢失

转载 作者:行者123 更新时间:2023-11-30 16:41:21 24 4
gpt4 key购买 nike

我正在使用alanxz的RabbitMQ C master。链接如下: https://github.com/alanxz/rabbitmq-c

在 amqp_sendstring.c 和 amqp_listen.c 示例中,当我关闭 amqp_listen.c 并仍然继续使用 amqp_sendstring.c 发送字符串时,消息会丢失。因此,当我重新启动 amqp_listen.c 时,它没有收到消息。我希望 amqp_listen.c 在前者处于非事件状态时接收 amqp_sendstring.c 发送的消息。我该怎么办?

amqp_sendstring.c:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

#include "utils.h"

int main(int argc, char const *const *argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;

if (argc < 6) {
fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n");
return 1;
}

hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
messagebody = argv[5];

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}

status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}

die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
0,
0,
&props,
amqp_cstring_bytes(messagebody)),
"Publishing");
}

die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

amqp_listen.c:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

#include <assert.h>

#include "utils.h"

int main(int argc, char const *const *argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;

amqp_bytes_t queuename;

if (argc < 5) {
fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey\n");
return 1;
}

hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}

status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}

die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

{
for (;;) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;

amqp_maybe_release_buffers(conn);

res = amqp_consume_message(conn, &envelope, NULL, 0);

if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}

printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned) envelope.delivery_tag,
(int) envelope.exchange.len, (char *) envelope.exchange.bytes,
(int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);

if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int) envelope.message.properties.content_type.len,
(char *) envelope.message.properties.content_type.bytes);
}
printf("----\n");

amqp_dump(envelope.message.body.bytes, envelope.message.body.len);

amqp_destroy_envelope(&envelope);
}
}

die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");

return 0;
}

请建议我如何以正确的顺序检索丢失的消息。

最佳答案

amqp_listen.c 声明的队列是通过设置 auto_delete 标志来声明的。这意味着当队列上的使用者终止时(通常是通过关闭底层连接),队列将被删除。如果您希望队列持续存在,请将 amqp_queue_declare 中的 auto_delete 参数设置为 0。

此外,当未指定队列名称时(例如,使用 amqp_empty_bytes 作为 queue 参数),代理会在您每次调用该队列时创建一个新的唯一队列名称。更改 amqp_queue_declare 函数,以便每次调用它时都使用相同的队列名称。

另请参阅:https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare

关于c - 消息在 RabbitMQ C master 中丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46275788/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com