gpt4 book ai didi

java - Spring for Kafka 2.3 在运行时设置偏移量(如果消费者存在),否则创建新消费者

转载 作者:行者123 更新时间:2023-12-01 18:07:08 25 4
gpt4 key购买 nike

我必须使用 Spring boot Kafka 编写一个 Rest 服务,以动态创建具有特定消费者 ID 和特定起始偏移量的消费者。如果消费者已经存在,我必须将其倒回。但现在消费者已经创建了。

当我发送http://localhost:8080/rewindMessage?consumerID=时“system1”&newMessageOffset=2

我一直有:消费者信息:(memberId=consumer-1-4951c570-c53f-4a29-aa86-9aaecebdd037, clientId=consumer-1, host=/127.0.0.1, assignment=(topicPartitions=back-0))我无法理解为什么没有创建消费者以及为什么 clientId 不等于我写的内容。

    public class ReceiverController {
@PutMapping(value = "/rewindMessage")
public ResponseEntity<ObjectNode> rewindMessage(@RequestParam("consumerID") String consumerID,@RequestParam("newMessageOffset") long newMessageOffset) {
ObjectNode response=this.receiver.rewind(consumerID,newMessageOffset);
ResponseEntity<ObjectNode> entity=ResponseEntity.status(HttpStatus.valueOf(response.get("httpStatus").intValue())).body(response);

return entity;
}

public class Receiver {
public ObjectNode rewind(String consumerID, long newMessageOffset) {
LOG.info("rewindMessage.consumerID=>" + consumerID);
LOG.info("rewindMessage.newMessageOffset=>" + newMessageOffset);
ObjectMapper mapper = new ObjectMapper();

ObjectNode response = mapper.createObjectNode();

if (consumerID != null) {


AdminClient client = createAdminClient();

DescribeConsumerGroupsResult result = client.describeConsumerGroups(Collections.singleton(this.groupID));

Iterator<MemberDescription> it = result.describedGroups().get(this.groupID).get().members().stream().iterator();
while (it.hasNext()) {
System.out.println("consumer information: " + it.next().toString());
}

int n = (int) result.describedGroups().get(this.groupID).get().members().stream()
.filter(v -> v.clientId().equals(consumerID)).count();
if(n==0)
{

this.config = new ReceiverConfig();
ConcurrentMessageListenerContainer<String, String> container = this.config.createContainer(consumerID, "DMS",
newMessageID);
//this.config.createConsumer(consumerID, "DMS", newMessageID);
response.put("timestamp", new Timestamp(System.currentTimeMillis()).toString());
response.put("errorCode", "201");
response.put("httpStatus", 201);
response.put("message", "consumerID " + consumerID + " rewound to consumerID " + newMessageID + ".");

}
}

return response;

}


@EnableKafka
public class ReceiverConfig {


private String bootstrapServers = "localhost:9092";

private String topic = "back";
Map<String, Object> properties;


private ConcurrentKafkaListenerContainerFactory<String, String> factory;

ReceiverConfig() {
this.factory=new ConcurrentKafkaListenerContainerFactory<String, String>();
this.factory.getContainerProperties().setIdleEventInterval(5000L);
this.factory.setConsumerFactory(consumerFactory());


}

public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}


ConcurrentMessageListenerContainer<String, String> createContainer(String consumerId, String groupId, long messageId){


TopicPartitionOffset offset= new TopicPartitionOffset(topic,0,messageId,false);

ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(offset);
container.getContainerProperties().setGroupId(groupId);
container.getContainerProperties().setClientId(consumerId);
container.getContainerProperties().setMessageListener(new BackChannelListener());
return container;
}
}

最佳答案

consumerConfigs() 永远不会返回包含 client.idgroup.id 的映射

如果您想寻找/提交消费者,您还需要获取 KafkaConsumer 的实例

关于java - Spring for Kafka 2.3 在运行时设置偏移量(如果消费者存在),否则创建新消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60541959/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com