gpt4 book ai didi

java - KStream-KTable 使用 Exactly Once 配置内部连接丢失的消息

转载 作者:搜寻专家 更新时间:2023-11-01 00:53:53 24 4
gpt4 key购买 nike

当我不设置时加工保证这意味着流将以其默认值( at_least_once )启动,此代码可以成功记录并将加入的消息发送到相关主题。

恰好_once 配置是 启用 在同一个流应用程序上,某些数据无法成功通过 join。即使有第一个 peek 块的日志,我也看不到一些第二个 peek 日志和一些我需要的消息。

我确信 kstream 和 ktable 都需要不为空的值。并且双方都定期收到消息。

流配置:

  • processing.guarantee=exactly_once
  • replication.factor=3(这会增加内部主题的复制因子)

  • Kafka (with 3 broker) 详情:
  • 版本=2.2.0
  • log.roll.ms=3600000
  • offsets.topic.replication.factor=3
  • transaction.state.log.replication.factor=3
  • transaction.state.log.min.isr=3
  • message.max.bytes=2000024

  • 问题是,如何 恰好_once 加工保证设置会导致这种情况吗?
    final KStream<String, UserProfile> userProfileStream = builder.stream(TOPIC_USER_PROFILE);
    final KTable<String, Device> deviceKTable = builder.table(TOPIC_DEVICE);

    userProfileStream
    .peek((genericId, userProfile) ->
    log.debug("[{}] Processing user profile: {}", openUserId, userProfile)
    )
    .join(
    deviceKTable,
    (userProfile, device) -> {
    userProfile.setDevice(device);

    return userProfile;
    },
    Joined.with(Serdes.String(), userProfileSerde, deviceSerde)
    )
    .peek((genericId, userProfile) ->
    log.debug("[{}] Updated user profile: {}", genericId, userProfile)
    )
    .to(TOPIC_UPDATED_USER_PROFILE, Produced.with(Serdes.String(), userProfileSerde));

    最佳答案

    Confluent 邮件组中也正在讨论有关该问题的更多信息:https://groups.google.com/d/msg/confluent-platform/MRjz8MRBDCg/XbVlJI0hBAAJ

    关于java - KStream-KTable 使用 Exactly Once 配置内部连接丢失的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57989278/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com