gpt4 book ai didi

apache-kafka - Kafka MirrorMaker2 - 不镜像消费者组偏移量

转载 作者:行者123 更新时间:2023-12-04 11:42:54 27 4
gpt4 key购买 nike

我已经设置了 MirrorMaker2 来在 2 个 DC 之间复制数据。

我的 mm2.properties,

# mm2.properties
name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

在MM2启动时看到以下内容。
[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
admin.timeout.ms = 60000
checkpoints.topic.replication.factor = 3
config.action.reload = restart
config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
consumer.poll.timeout.ms = 1000
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
enabled = true
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
groups = [.*]
groups.blacklist = [console-consumer-.*, connect-.*, __.*]
header.converter = null
heartbeats.topic.replication.factor = 3
key.converter = null
metric.reporters = null
name = source->dest
offset-syncs.topic.replication.factor = 3
offset.lag.max = 100
refresh.groups.enabled = true
refresh.groups.interval.seconds = 600
refresh.topics.enabled = true
refresh.topics.interval.seconds = 600
replication.factor = 2
replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator = .
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
source.cluster.alias = source
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
sync.topic.acls.enabled = true
sync.topic.acls.interval.seconds = 600
sync.topic.configs.enabled = true
sync.topic.configs.interval.seconds = 600
target.cluster.alias = dest
task.assigned.groups = null
task.assigned.partitions = null
tasks.max = 1
topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
topics = [.*]
topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
transforms = []
value.converter = null
(org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

我的数据正在按预期复制。源主题在目标集群中作为源创建。但是,消费者组偏移量没有被复制。

在源集群中启动了一个消费者组。
./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

消耗了几条消息并停止了它。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。

我尝试使用来自目标集群的消息,如下所示。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

因为,我使用相同的消费者组,我希望我的偏移量也被同步并且不会消耗我在 cluster1 中消耗的相同消息。但是,仍然消耗所有消息。有什么我在这里想念的吗。

最佳答案

复制偏移量之所以重要,有几个基本原因:

  • kafka 是一个至少一次的系统(忽略炒作)。这意味着镜像制造商,因为它建立在 kafka 消费者和生产者之上,可以每次超时/断开连接,将导致某种程度的重复记录被传递到目的地。这意味着偏移量不会在源和目标之间映射 1:1。即使您尝试使用“恰好一次”支持(MM2 KIP 明确表示未使用),它所做的只是跳过部分交付的批次,但这些批次仍会占用目标
  • 处的偏移量
  • 如果在源主题开始过期记录后很长时间设置镜像,则目标主题将从偏移量 0 开始,而源将具有更高的“最旧”偏移量。曾尝试解决此问题(请参阅 KIP-391 ),但从未被接受
  • 通常无法保证您的镜像拓扑从单个源镜像到单个目标。例如, the linkedin topology ,从多个源集群镜像到“聚合”层集群。映射偏移对于此类拓扑毫无意义

  • 查看 MM2 KIP,其中提到了“偏移同步主题”。
    在您的代码中,您可以使用类 RemoteClusterUtils 来转换集群之间的检查点:
    Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
    newClusterProperties, oldClusterName, consumerGroupId
    );
    consumer.seek(newOffsets);

    这是从以下演示文稿中取出的 - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

    或者,您可以使用按时间间隔 API 查找在目标上启动您的消费者组,直到将数据交付到目标(或交付到源,如果日志附加时间戳的代理设置在目标上没有覆盖那些)的粗略时间次)。为了安全,你需要倒带一点。

    关于apache-kafka - Kafka MirrorMaker2 - 不镜像消费者组偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60250330/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com