- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想重置 AerospikeSink Kafka 连接器偏移量,我首先删除连接器消费者组 (connect-*
) 偏移量,然后重新创建它。当我使用 earliest
策略重新创建时,它会以正确的偏移量重新创建,但是随后,当任务状态从 tasks = []
更改为 RUNNING
任务从连接器的前一个实例到达的点开始继续处理,这会阻止从一开始就读取来自 kafka 的所有消息(我正在尝试再次读取来自 Kafka 的所有消息)。
注意:使用新名称创建新连接器并不能解决问题。
使用 tasks = []
重新创建连接器后
在 RUNNING
状态下使用 tasks
重新创建连接器后:
Kafka 连接日志:
直到任务移动到RUNNING
状态:
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-8 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-9 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,720 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-7 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,722 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-9 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-6 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,737 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-8 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,917 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2
任务初始化后:
2021-08-18 08:13:39,277 INFO WorkerSinkTask{id=recovery-connector-one-2} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-8=OffsetAndMetadata{offset=2015, leaderEpoch=null, metadata=''}, prism-bs-profile-services-9=OffsetAndMetadata{offset=1989, leaderEpoch=null, metadata=''}, prism-bs-profile-services-7=OffsetAndMetadata{offset=1938, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-2]
2021-08-18 08:13:39,281 INFO flushed 5964 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,281 INFO WorkerSinkTask{id=recovery-connector-one-1} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-4=OffsetAndMetadata{offset=1973, leaderEpoch=null, metadata=''}, prism-bs-profile-services-5=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}, prism-bs-profile-services-6=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,323 INFO flushed 7943 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO flushed 193577 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421647232, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:36,965 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Discovered group coordinator nycd-og-kafkacluster02.my-company.corp:9094 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,182 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Attempt to heartbeat with Generation{generationId=1, memberId='connector-consumer-recovery-connector-one-0-80393504-bb92-4d33-ac12-86e6259f8a8c', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [kafka-coordinator-heartbeat-thread | connect-recovery-connector-one]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Lost previously assigned partitions prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO flushed 175680 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets synchronously using sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 ERROR WorkerSinkTask{id=recovery-connector-one-0} Commit of offsets threw an unexpected exception for sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1452)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:362)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:439)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:694)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-08-18 08:14:37,212 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,772 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Finished assignment for group at generation 3: {connector-consumer-recovery-connector-one-0-7da6ea9b-28c5-40c4-9486-4c2d3d4c638f=Assignment(partitions=[profile-services-0, prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3]), connector-consumer-recovery-connector-one-2-49f84027-8b22-48ad-9e5b-c59bb82310bf=Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]), connector-consumer-recovery-connector-one-1-6aae273d-1d5f-42da-9c41-04dded122f1c=Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-8, prism-bs-profile-services-9, prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-4 to the committed offset FetchPosition{offset=1973, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=51}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-5 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=52}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-6 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3, profile-services-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-0 to the committed offset FetchPosition{offset=1986, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition profile-services-0 to the committed offset FetchPosition{offset=421647232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=154}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-1 to the committed offset FetchPosition{offset=1999, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-2 to the committed offset FetchPosition{offset=2048, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-3 to the committed offset FetchPosition{offset=1922, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-8 to the committed offset FetchPosition{offset=2015, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-9 to the committed offset FetchPosition{offset=1989, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=48}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-7 to the committed offset FetchPosition{offset=1938, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
我想重置任务内部元数据,它没有存储在 __consumers_offsets
主题中,因为当我重置偏移量时,我正在向所有相关分区写入 NULL
,它也不在 offset.storage.topic=aerospike-connect-zvi-connectors-offsets
上,它是空的(它的接收器连接器,仅供源连接器使用)
我尝试exec
kafka connect 但没有找到任何有用的内部文件来存储数据。有什么想法吗?
谢谢!
连接器创建:
private[services] def createConnectorAsync(prevConnector: KafkaConnector): Future[Unit] = Future {
logger.debug(s"createConnectorAsync(${prevConnector.getMetadata.getName}) Triggered")
// Creating new Connector based on prevConnector
val connector = new KafkaConnectorBuilder()
.withApiVersion(prevConnector.getApiVersion)
.withApiVersion(prevConnector.getApiVersion)
.withNewStatus().withTasksMax(prevConnector.getStatus.getTasksMax).and
.withMetadata(
// Required for resetting the ResourceVersion, UID, etc.
new ObjectMetaBuilder()
.withName(prevConnector.getMetadata.getName)
.withLabels(prevConnector.getMetadata.getLabels)
.withAnnotations(prevConnector.getMetadata.getAnnotations)
.withNamespace(prevConnector.getMetadata.getNamespace)
.build()
)
.withSpec(
new KafkaConnectorSpecBuilder(prevConnector.getSpec)
// Re-create the connector with earliest offsets
.addToConfig("consumer.override.auto.offset.reset", "earliest")
.build()
)
.build()
connector.getSpec.setPause(false)
for ((name, value) <- prevConnector.getSpec.getAdditionalProperties.asScala) {
connector.getSpec.setAdditionalProperty(name, value)
}
Crds.kafkaConnectorOperation(client).create(connector)
// Waiting until new Connector is Running
Crds.kafkaConnectorOperation(client)
.withName(connector.getMetadata.getName)
.waitUntilCondition(connector => {
connector != null &&
ConnectorTasks(connector).exists(xs => xs.nonEmpty && xs.forall(_.state.equalsIgnoreCase("Running"))) &&
connector.getStatus != null && connector.getStatus.getConditions.stream().anyMatch(c => c.getType.equalsIgnoreCase("Ready") && c.getStatus.equalsIgnoreCase("True"))
}, config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
}
通过 strimzi-api
最佳答案
正如您对上一个问题的回答,它们不在每个连接器主题中
它们由 connect-$name
存储在消费者组中(因为接收器是从 Kafka 读取到外部系统的消费者),其中 name
是您在连接器属性或通过 Connect REST API 引用连接器。如果您列出所有消费者组,您将看到以 connect-
开头的消费者组。重置接收器应该像重置消费者组一样简单
并且之前提到过,某些连接器可以选择使用存储在其他地方的信息覆盖其组偏移量,而了解这一点的唯一方法是检查源代码或日志。在这种情况下,重置需要您了解它是如何完成的
作为一个最小的例子,使用 FileSink 连接器
创建主题
kafka-topics.sh --create --topic test --replication-factor=1 --partitions=1 --bootstrap-server $BOOTSTRAP_ADDRESS
填充数字 1..100
for i in {1..100}; do echo $i >> data.txt ; done
./kafka-console-producer.sh --topic test --broker-list $BOOTSTRAP_ADDRESS < data.txt
使用 name=console-sink
创建接收器
curl -XPOST $CONNECT_API/connectors -H 'Content-Type: application/json' -d '{
"name": "console-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
...检查 Connect worker 日志以查看写入的值 100
检查 connect-*
消费者组
kafka-consumer-groups.sh --list --bootstrap-server $BOOTSTRAP_ADDRESS | grep -e '^connect-'
connect-console-sink
...
描述我想要的那个,并看到结束偏移量是 100,正如预期的那样。
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 - 100 - connector-consumer-console-sink-0-46181998-e548-4f7d-a17f-155421d9ad00 /172.25.0.4 connector-consumer-console-sink-0
然后删除连接器并重复描述
curl -XDELETE $CONNECT_API/connectors/console-sink/
curl $CONNECT_API/connectors/console-sink
{"error_code":404,"message":"Connector console-sink not found"}%
组仍然存在
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
如果我要重新发布连接器,消息说 no active members goes away
,但偏移量不会改变。此外,表示该组仍然存在。
现在,让我们再次删除以确保该组处于非事件状态
curl -XDELETE $CONNECT_API/connectors/console-sink/
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
现在我将 test
分区 0 的偏移量重置为 42
kafka-consumer-groups.sh --reset-offsets --topic test:0 --to-offset 42 --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS --execute
GROUP TOPIC PARTITION NEW-OFFSET
connect-console-sink test 0 42
再次描述,我们看到它被考虑在内
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 42 100 58 - - -
再次使用相同的名称发布连接器...并检查日志,我们看到消息从 42 之后开始打印。
关于apache-kafka - 重置 Kafka Connect 接收器连接器偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68774618/
我的一个 friend 在一次求职面试中被要求编写一个程序来测量可用 RAM 的数量。预期的答案是以二进制搜索方式使用 malloc():分配越来越大的内存部分,直到收到失败消息,减少部分大小,然后对
我正在通过任务管理器检查 Chrome 中特定选项卡的内存消耗情况。它显示了我使用的 RAM 量相当大: 但是,当我在开发人员工具中拍摄堆快照时,其显示的大小要小几倍: 怎么会这样呢? 最佳答案 并非
是否有一种可移植的方式,可以在各种支持的操作系统上同时在 .Net 和 Mono 上运行,让程序知道它运行的机器上有多少 RAM(即物理内存而不是虚拟内存)可用? 上下文是一个程序,其内存要求是“请尽
有谁知道是否有办法查看 android studio 项目中的所有 View 、LinearLayout、TextView 等? 我正在使用 android 设备监视器中的层次结构查看器使用 xml
很简单,我想从 Python 脚本中运行外部命令/程序,完成后我还想知道它消耗了多少 CPU 时间。 困难模式:并行运行多个命令不会导致 CPU 消耗结果不准确。 最佳答案 在 UNIX 上: (a)
我需要在给定数组索引和范围的情况下,在返回新索引的数组中向前循环 X 量并向后循环 X 量。 如果循环向前到达数组的末尾,它将在数组的开头继续。如果循环在向后时到达开头,它会在数组末尾继续。 例如,数
Android 应用程序中是否有类似最大 Activity 的内容?我想知道,因为我正在考虑创建具有铃声功能的声音应用程序。它将有大约 40 个 Activity 。但只有 1 个会持续运行。那太多了
有什么方法可以限制这种演示文稿的 curl 量吗?我知道系统会根据我们以 taht 方式模态呈现的 viewcontroller View 内的内容自动 curl 。 但 thta 在我的 iPad
我正在编写一个 Java 应用程序,它需要检查系统中可用的最大 RAM 量(不是 VM 可用的 RAM)。有没有可移植的方式来做到这一点? 非常感谢:-) 最佳答案 JMX 您可以访问 java.la
我发现它使用了 600 MB 的 RAM,甚至超过了 Visual Studio(当它达到 400 MB 的 RAM 时我将其关闭)。 最佳答案 dart 编辑器基于 Eclipse,而 Eclips
这个问题已经有答案了: Java get available memory (10 个回答) 已关闭 7 年前。 假设我有一个专门运行一个程序的 JVM,我如何获得分配给 JVM 的 RAM 量? 假
我刚刚使用 Eclipse 编写了一个程序,该程序需要很长时间才能执行。它花费的时间甚至更长,因为它只将我的 CPU 加载到 25%(我假设这是因为我使用的是四核,而程序只使用一个核心)。有没有办法让
我编写了一个 2x2x2 魔方求解器,它使用广度优先搜索算法求解用户输入的立方体位置。该程序确实解决了立方体。然而,当我进入一个很难解决的问题时,我会在搜索的深处发现这个问题,我用完了堆空间。我的电脑
我正在尝试同步运行多个 fio 线程,但随着线程数量的增加,我的计算机内存不足。似乎每个 fio 线程占用大约 200MB 的 RAM。话虽这么说,有没有办法让每个线程都有一个固定的最大内存使用量?设
我使用“fitctree”函数(链接:https://de.mathworks.com/help/stats/classificationtree-class.html)在 Matlab 中开发了一个
我有一个 .NET 进程,由于我不会深入探讨的原因,它消耗了大量 RAM。我想要做的是对该进程可以使用的 RAM 量实现上限。有办法做到这一点吗? 我找到的最接近的是 Process.GetCurre
您可能已经看到许多“系统信息”应用程序,它们显示诸如剩余电池生命周期之类的信息,甚至显示内存等系统信息。 以类似的方式,是否有任何方法可以从我的应用中检索当前可用 RAM 量,以便我可以更好地决定何时
我从来都不是 MFC 的忠实粉丝,但这并不是重点。我读到微软将在 2010 年发布新版本的 MFC,这让我感到很奇怪 - 我以为 MFC 已经死了(不是恶意,我真的这样做了)。 MFC 是否用于新开发
我在一台安装了 8 GB 内存的机器上工作,我试图以编程方式确定机器中安装了多少内存。我已经尝试使用 sysctlbyname() 来获取安装的内存量,但它似乎仅限于返回带符号的 32 位整数。 ui
基本上,我想要一个由大小相同的 div(例如 100x100)和类似 200x100 的变体构建的页面。它们都 float :向左调整以相应地调整窗口大小。问题是,我不知道如何让它们在那种情况下居中,
我是一名优秀的程序员,十分优秀!