- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想重置 AerospikeSink Kafka 连接器偏移量,我首先删除连接器消费者组 (connect-*
) 偏移量,然后重新创建它。当我使用 earliest
策略重新创建时,它会以正确的偏移量重新创建,但是随后,当任务状态从 tasks = []
更改为 RUNNING
任务从连接器的前一个实例到达的点开始继续处理,这会阻止从一开始就读取来自 kafka 的所有消息(我正在尝试再次读取来自 Kafka 的所有消息)。
注意:使用新名称创建新连接器并不能解决问题。
使用 tasks = []
重新创建连接器后
在 RUNNING
状态下使用 tasks
重新创建连接器后:
Kafka 连接日志:
直到任务移动到RUNNING
状态:
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-8 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-9 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,720 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-7 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,722 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-9 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-6 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,737 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-8 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,917 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2
任务初始化后:
2021-08-18 08:13:39,277 INFO WorkerSinkTask{id=recovery-connector-one-2} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-8=OffsetAndMetadata{offset=2015, leaderEpoch=null, metadata=''}, prism-bs-profile-services-9=OffsetAndMetadata{offset=1989, leaderEpoch=null, metadata=''}, prism-bs-profile-services-7=OffsetAndMetadata{offset=1938, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-2]
2021-08-18 08:13:39,281 INFO flushed 5964 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,281 INFO WorkerSinkTask{id=recovery-connector-one-1} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-4=OffsetAndMetadata{offset=1973, leaderEpoch=null, metadata=''}, prism-bs-profile-services-5=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}, prism-bs-profile-services-6=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,323 INFO flushed 7943 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO flushed 193577 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421647232, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:36,965 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Discovered group coordinator nycd-og-kafkacluster02.my-company.corp:9094 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,182 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Attempt to heartbeat with Generation{generationId=1, memberId='connector-consumer-recovery-connector-one-0-80393504-bb92-4d33-ac12-86e6259f8a8c', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [kafka-coordinator-heartbeat-thread | connect-recovery-connector-one]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Lost previously assigned partitions prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO flushed 175680 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets synchronously using sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 ERROR WorkerSinkTask{id=recovery-connector-one-0} Commit of offsets threw an unexpected exception for sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1452)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:362)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:439)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:694)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-08-18 08:14:37,212 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,772 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Finished assignment for group at generation 3: {connector-consumer-recovery-connector-one-0-7da6ea9b-28c5-40c4-9486-4c2d3d4c638f=Assignment(partitions=[profile-services-0, prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3]), connector-consumer-recovery-connector-one-2-49f84027-8b22-48ad-9e5b-c59bb82310bf=Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]), connector-consumer-recovery-connector-one-1-6aae273d-1d5f-42da-9c41-04dded122f1c=Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-8, prism-bs-profile-services-9, prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-4 to the committed offset FetchPosition{offset=1973, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=51}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-5 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=52}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-6 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3, profile-services-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-0 to the committed offset FetchPosition{offset=1986, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition profile-services-0 to the committed offset FetchPosition{offset=421647232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=154}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-1 to the committed offset FetchPosition{offset=1999, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-2 to the committed offset FetchPosition{offset=2048, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-3 to the committed offset FetchPosition{offset=1922, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-8 to the committed offset FetchPosition{offset=2015, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-9 to the committed offset FetchPosition{offset=1989, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=48}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-7 to the committed offset FetchPosition{offset=1938, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
我想重置任务内部元数据,它没有存储在 __consumers_offsets
主题中,因为当我重置偏移量时,我正在向所有相关分区写入 NULL
,它也不在 offset.storage.topic=aerospike-connect-zvi-connectors-offsets
上,它是空的(它的接收器连接器,仅供源连接器使用)
我尝试exec
kafka connect 但没有找到任何有用的内部文件来存储数据。有什么想法吗?
谢谢!
连接器创建:
private[services] def createConnectorAsync(prevConnector: KafkaConnector): Future[Unit] = Future {
logger.debug(s"createConnectorAsync(${prevConnector.getMetadata.getName}) Triggered")
// Creating new Connector based on prevConnector
val connector = new KafkaConnectorBuilder()
.withApiVersion(prevConnector.getApiVersion)
.withApiVersion(prevConnector.getApiVersion)
.withNewStatus().withTasksMax(prevConnector.getStatus.getTasksMax).and
.withMetadata(
// Required for resetting the ResourceVersion, UID, etc.
new ObjectMetaBuilder()
.withName(prevConnector.getMetadata.getName)
.withLabels(prevConnector.getMetadata.getLabels)
.withAnnotations(prevConnector.getMetadata.getAnnotations)
.withNamespace(prevConnector.getMetadata.getNamespace)
.build()
)
.withSpec(
new KafkaConnectorSpecBuilder(prevConnector.getSpec)
// Re-create the connector with earliest offsets
.addToConfig("consumer.override.auto.offset.reset", "earliest")
.build()
)
.build()
connector.getSpec.setPause(false)
for ((name, value) <- prevConnector.getSpec.getAdditionalProperties.asScala) {
connector.getSpec.setAdditionalProperty(name, value)
}
Crds.kafkaConnectorOperation(client).create(connector)
// Waiting until new Connector is Running
Crds.kafkaConnectorOperation(client)
.withName(connector.getMetadata.getName)
.waitUntilCondition(connector => {
connector != null &&
ConnectorTasks(connector).exists(xs => xs.nonEmpty && xs.forall(_.state.equalsIgnoreCase("Running"))) &&
connector.getStatus != null && connector.getStatus.getConditions.stream().anyMatch(c => c.getType.equalsIgnoreCase("Ready") && c.getStatus.equalsIgnoreCase("True"))
}, config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
}
通过 strimzi-api
最佳答案
正如您对上一个问题的回答,它们不在每个连接器主题中
它们由 connect-$name
存储在消费者组中(因为接收器是从 Kafka 读取到外部系统的消费者),其中 name
是您在连接器属性或通过 Connect REST API 引用连接器。如果您列出所有消费者组,您将看到以 connect-
开头的消费者组。重置接收器应该像重置消费者组一样简单
并且之前提到过,某些连接器可以选择使用存储在其他地方的信息覆盖其组偏移量,而了解这一点的唯一方法是检查源代码或日志。在这种情况下,重置需要您了解它是如何完成的
作为一个最小的例子,使用 FileSink 连接器
创建主题
kafka-topics.sh --create --topic test --replication-factor=1 --partitions=1 --bootstrap-server $BOOTSTRAP_ADDRESS
填充数字 1..100
for i in {1..100}; do echo $i >> data.txt ; done
./kafka-console-producer.sh --topic test --broker-list $BOOTSTRAP_ADDRESS < data.txt
使用 name=console-sink
创建接收器
curl -XPOST $CONNECT_API/connectors -H 'Content-Type: application/json' -d '{
"name": "console-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
...检查 Connect worker 日志以查看写入的值 100
检查 connect-*
消费者组
kafka-consumer-groups.sh --list --bootstrap-server $BOOTSTRAP_ADDRESS | grep -e '^connect-'
connect-console-sink
...
描述我想要的那个,并看到结束偏移量是 100,正如预期的那样。
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 - 100 - connector-consumer-console-sink-0-46181998-e548-4f7d-a17f-155421d9ad00 /172.25.0.4 connector-consumer-console-sink-0
然后删除连接器并重复描述
curl -XDELETE $CONNECT_API/connectors/console-sink/
curl $CONNECT_API/connectors/console-sink
{"error_code":404,"message":"Connector console-sink not found"}%
组仍然存在
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
如果我要重新发布连接器,消息说 no active members goes away
,但偏移量不会改变。此外,表示该组仍然存在。
现在,让我们再次删除以确保该组处于非事件状态
curl -XDELETE $CONNECT_API/connectors/console-sink/
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
现在我将 test
分区 0 的偏移量重置为 42
kafka-consumer-groups.sh --reset-offsets --topic test:0 --to-offset 42 --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS --execute
GROUP TOPIC PARTITION NEW-OFFSET
connect-console-sink test 0 42
再次描述,我们看到它被考虑在内
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 42 100 58 - - -
再次使用相同的名称发布连接器...并检查日志,我们看到消息从 42 之后开始打印。
关于apache-kafka - 重置 Kafka Connect 接收器连接器偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68774618/
我在 Windows 机器上启动 Kafka-Server 时出现以下错误。我已经从以下链接下载了 Scala 2.11 - kafka_2.11-2.1.0.tgz:https://kafka.ap
关于Apache-Kafka messaging queue . 我已经从 Kafka 下载页面下载了 Apache Kafka。我已将其提取到 /opt/apache/installed/kafka
假设我有 Kafka 主题 cars。 我还有一个消费者组 cars-consumers 订阅了 cars 主题。 cars-consumers 消费者组当前位于偏移量 89。 当我现在删除 cars
我想知道什么最适合我:Kafka 流或 Kafka 消费者 api 或 Kafka 连接? 我想从主题中读取数据,然后进行一些处理并写入数据库。所以我编写了消费者,但我觉得我可以编写 Kafka 流应
我曾研究过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流不过是消费来自Kafka的实时事件的消费者。因此,我无法弄清楚何时使用 Kafka 流或为什么我们应该使用
Kafka Acknowledgement 和 Kafka 消费者 commitSync() 有什么区别 两者都用于手动偏移管理,并希望两者同步工作。 请协助 最佳答案 使用 spring-kafka
如何在 Kafka 代理上代理 Apache Kafka 生产者请求,并重定向到单独的 Kafka 集群? 在我的特定情况下,无法更新写入此集群的客户端。这意味着,执行以下操作是不可行的: 更新客户端
我需要在 Kafka 10 中命名我的消费者,就像我在 Kafka 8 中所做的一样,因为我有脚本可以嗅出并进一步使用这些信息。 显然,consumer.id 的默认命名已更改(并且现在还单独显示了
1.概述 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现
我正在使用以下命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.topic --property
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者已将数据写入/已经将数据写入各种主题。我想与服务
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door, house.room ) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了
有没有办法以编程方式获取kafka集群的版本?例如,使用AdminClient应用程序接口(interface)。 我想在消费者/生产者应用程序中识别 kafka 集群的版本。 最佳答案 目前无法检索
每当我尝试重新启动 kafka 时,它都会出现以下错误。一旦我删除/tmp/kafka-logs 它就会得到解决,但它也会删除我的主题。 有办法解决吗? ERROR Error while
我是 Apache Kafka 的新用户,我仍在了解内部结构。 在我的用例中,我需要从 Kafka Producer 客户端动态增加主题的分区数。 我发现了其他类似的 questions关于增加分区大
正如 Kafka 文档所示,一种方法是通过 kafka.tools.MirrorMaker 来实现这一点。但是,我需要将一个主题(比如 测试 带 1 个分区)(其内容和元数据)从生产环境复制到没有连接
我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。 但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。 我将 spring.kafka.bo
我的 kafka sink 连接器从多个主题(配置了 10 个任务)读取,并处理来自所有主题的 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。 以下是触发器记录中键值对的示例: "
我有以下 kafka 流代码 public class KafkaStreamHandler implements Processor{ private ProcessorConte
当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)去恢复正常
我是一名优秀的程序员,十分优秀!