gpt4 book ai didi

c# - 如何使用 ServiceBus EventData 偏移值

转载 作者:可可西里 更新时间:2023-11-01 08:49:57 25 4
gpt4 key购买 nike

我有一些使用 Service Bus Event Data 的代码,并且我怀疑我需要使用 offset 属性,因为目前我的程序正在(或似乎)一遍又一遍地重新运行相同的事件中心数据。

我的代码如下:

public class EventHubListener : IEventProcessor
{
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";

public void Start()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);

foreach (string partitionId in eventHub.PartitionIds)
{
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
{
PartitionId = partitionId
}, new EventProcessorCheckpointManager());

Console.WriteLine("Processing : " + partitionId);
}
}

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
MyData data = JsonConvert.DeserializeObject<MyData>(bytes);

当我一遍又一遍地收到相同的消息时,我怀疑我需要做这样的事情:

string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);

但是,Offset 是一个字符串,即使它看起来是一个数值(例如“12345”)。 context.CheckPointAsync() 的文档看起来这可能就是答案;然而,在循环结束时发出它似乎没有什么区别。

所以,我有一个由两部分组成的问题:

  1. 什么是偏移量?是我想象的那样吗(即流中某个点的数字标记),如果是,为什么它是一个字符串?
  2. 为什么我会再次收到相同的消息?据我了解事件中心,尽管它们保证至少一次,但一旦检查点出现问题,我不应该收到相同的消息。

编辑:

经过一段时间的摸索,我想出了一些办法可以避免这个问题;但是,我当然不会声称这是一个解决方案:

var filteredMessages =
messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
.OrderBy(a => a.EnqueuedTimeUtc);

使用EventProcessorHost似乎实际上使问题变得更糟;也就是说,历史事件不仅在重播,而且似乎是按随机顺序重播的。

编辑:

我遇到了this @Mikhail 的优秀文章,它似乎确实解决了我的确切问题。然而;大概是我的问题的根源(或者其中之一,假设这是正确的,那么我不确定为什么使用 EventProcessorHost 不能像 @Mikhail 自己在评论)。但是,ServiceBus 版本的 ICheckpointManager 只有一个接口(interface)方法:

namespace Microsoft.ServiceBus.Messaging
{

public interface ICheckpointManager
{
Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
}
}

最佳答案

您的标题应该是事件中心,而不是服务总线。对于您的问题:

  1. 虽然 Event Hub 的设计与 Kafka 类似,但一个很大的区别是您应该自己管理偏移量。事件中心代理完全不知道您的消费者组的偏移量。
  2. 因此Event Hub sdk提供了一些帮助类来存储存储帐户中的偏移量,但处理消息后您仍然需要手动调用检查点。

关于c# - 如何使用 ServiceBus EventData 偏移值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50694613/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com