gpt4 book ai didi

c# - 如何在 .NET Core 上正确实现 kafka 消费者作为后台服务

转载 作者:行者123 更新时间:2023-12-03 14:55:48 27 4
gpt4 key购买 nike

我通过在 .NET Core 2.2 上使用 BackgroundService 将 Kafka 使用者实现为控制台应用程序。我使用 confluent-kafka-dotnet v1.0.1.1 作为 Apache Kafka 的客户端。我不太确定如何处理每条消息。

  • 由于处理每条消息可能需要一些时间(最多 24 小时),因此我为每条消息启动了一个新任务,这样我就不会阻止消费者使用新消息。我认为如果我的消息太多,每次创建一个新任务并不是正确的方法。那么处理每条消息的正确方法是什么?是否可以为每条消息创建某种动态后台服务?
  • 如果一条消息已经在处理中,但应用程序崩溃或发生重新平衡,我最终会多次使用和处理相同的消息。我应该自动提交偏移量(或在它被消耗后立即提交)并将消息(或任务)的状态存储在某个地方,比如在数据库中?

  • 我知道有 Hangfire,但我不确定是否需要使用它。如果我目前的方法完全错误,请给我一些建议。
    下面是 ConsumerService 的实现:
    public class ConsumerService : BackgroundService
    {
    private readonly IConfiguration _config;
    private readonly IElasticLogger _logger;
    private readonly ConsumerConfig _consumerConfig;
    private readonly string[] _topics;
    private readonly double _maxNumAttempts;
    private readonly double _retryIntervalInSec;

    public ConsumerService(IConfiguration config, IElasticLogger logger)
    {
    _config = config;
    _logger = logger;
    _consumerConfig = new ConsumerConfig
    {
    BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
    GroupId = _config.GetValue<string>("Kafka:GroupId"),
    EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
    AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
    };
    _topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
    _maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
    _retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
    Console.WriteLine("!!! CONSUMER STARTED !!!\n");

    // Starting a new Task here because Consume() method is synchronous
    var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);

    return task;
    }

    private void ProcessQueue(CancellationToken stoppingToken)
    {
    using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
    {
    consumer.Subscribe(_topics);

    try
    {
    while (!stoppingToken.IsCancellationRequested)
    {
    try
    {
    var consumeResult = consumer.Consume(stoppingToken);

    // Don't want to block consume loop, so starting new Task for each message
    Task.Run(async () =>
    {
    var currentNumAttempts = 0;
    var committed = false;

    var response = new Response();

    while (currentNumAttempts < _maxNumAttempts)
    {
    currentNumAttempts++;

    // SendDataAsync is a method that sends http request to some end-points
    response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);

    if (response != null && response.Code >= 0)
    {
    try
    {
    consumer.Commit(consumeResult);
    committed = true;

    break;
    }
    catch (KafkaException ex)
    {
    // log
    }
    }
    else
    {
    // log
    }

    if (currentNumAttempts < _maxNumAttempts)
    {
    // Delay between tries
    await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
    }
    }

    if (!committed)
    {
    try
    {
    consumer.Commit(consumeResult);
    }
    catch (KafkaException ex)
    {
    // log
    }
    }
    }, stoppingToken);
    }
    catch (ConsumeException ex)
    {
    // log
    }
    }
    }
    catch (OperationCanceledException ex)
    {
    // log
    consumer.Close();
    }
    }
    }
    }

    最佳答案

    同意法比奥的观点,你不应该Task.Run为了处理消息,因为您最终将有大量线程浪费资源并切换它们的执行,从而使性能受到影响。
    此外,可以在同一线程中处理消费的消息,因为 Kafka 使用拉模型,您的应用程序可以按自己的节奏处理消息。
    关于不止一次处理消息,我建议存储已处理消息的偏移量,以便跳过已处理的消息。由于 offset 是一个长基数,因此您可以轻松跳过偏移量小于之前提交的消息。当然,这仅在您有一个分区时才有效,因为 Kafka 在分区级别提供偏移计数器和顺序保证
    您可以在 my article 中找到 Kafka Consumer 的示例.如果您有任何问题,请随时提问,我很乐意为您提供帮助

    关于c# - 如何在 .NET Core 上正确实现 kafka 消费者作为后台服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56733810/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com