gpt4 book ai didi

c# - 将消息定向给消费者

转载 作者:行者123 更新时间:2023-11-30 15:13:24 24 4
gpt4 key购买 nike

我的客户端正在尝试向接收者发送消息。但是我注意到接收者有时没有收到客户端发送的所有消息,因此丢失了一些消息(不确定问题出在哪里?客户端还是接收者)。关于为什么会发生这种情况的任何建议。这就是我目前正在做的事情

在接收方,这就是我正在做的事情。

这是事件处理器

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
}
}

这是客户端连接到事件中心的方式

var StrBuilder = new EventHubsConnectionStringBuilder(eventHubConnectionString)
{
EntityPath = eventHubName,
};
this.eventHubClient = EventHubClient.CreateFromConnectionString(StrBuilder.ToString());

如何将消息定向给特定消费者

最佳答案

我正在使用来自 eventhub 官方文档的示例代码,用于 sendingreceiving .

我有 2 个消费者组:$Defaultnewcg。假设您有 2 个客户端,client_1 使用默认消费组($Default),client_2 使用另一个消费组(newcg)

首先,创建发送客户端后,在 SendMessagesToEventHub 方法中,我们需要添加一个带有值的属性。该值应该是消费者组名称。示例代码如下:

    private static async Task SendMessagesToEventHub(int numMessagesToSend)
{
for (var i = 0; i < numMessagesToSend; i++)
{
try
{
var message = "444 Message";
Console.WriteLine($"Sending message: {message}");
EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));

//here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
mydata.Properties.Add("cg", "newcg");

await eventHubClient.SendAsync(mydata);

}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}

await Task.Delay(10);
}

Console.WriteLine($"{numMessagesToSend} messages sent.");
}

然后在client_1中,创建接收器项目后,它使用默认消费者组($Default)-> 在SimpleEventProcessor类中->ProcessEventsAsync方法,我们可以过滤掉不需要的事件数据。 ProcessEventsAsync 方法的示例代码:

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
//filter the data here
if (eventData.Properties["cg"].ToString() == "$Default")
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
Console.WriteLine(context.ConsumerGroupName);
}
}

return context.CheckpointAsync();
}

在另一个客户端中,例如 client_2,它使用另一个消费者组,例如其名称为 newcg,我们可以按照 client_1 中的步骤进行操作,只需在 ProcessEventsAsync 中进行一些更改> 方法如下:

            public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
//filter the data here, using another consumer group name
if (eventData.Properties["cg"].ToString() == "newcg")
{
//other code
}
}

return context.CheckpointAsync();
}

关于c# - 将消息定向给消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59014664/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com