gpt4 book ai didi

java - Confluent Cloud - Spring Boot Consumer REST Endpoint?

转载 作者:行者123 更新时间:2023-12-03 11:20:14 31 4
gpt4 key购买 nike

我正在尝试构建一个 Java Spring Boot 应用程序,该应用程序将发布并从 Confluent Cloud Kafka 获取消息。
我关注了article用于将 Kafka 消息发布到 Confluent Cloud 并且它可以工作。
下面是实现
KafkaController.java

package com.seroter.confluentboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;
import com.seroter.confluentboot.engine.Producer;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

private final Producer producer;

private final com.seroter.confluentboot.engine.Consumer consumer;

@Autowired
KafkaController(Producer producer,com.seroter.confluentboot.engine.Consumer consumer) {
this.producer = producer;
this.consumer=consumer;
}

@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
}


@PostMapping(value="/publishJson")
public ResponseEntity<Product> publishJsonMessage(@RequestBody Product product) {
producer.sendJsonMessage(product);
ResponseEntity<Product> responseEntity=new ResponseEntity<>(product,HttpStatus.CREATED);
return responseEntity;
}


}
产品.java
package com.seroter.confluentboot.dto;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonPropertyOrder(value = {"product_id","product_name","quantity","price"})
public class Product {

@JsonProperty(value = "product_id")
private int productId;
@JsonProperty(value="product_name")
private String productName;

private int quantity;

private double price;

}
Producer.java
package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;

import com.seroter.confluentboot.dto.Product;

@Service
@EnableBinding(Source.class)
public class Producer {

private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "users";

@Autowired
private Source source;

public void sendMessage(String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
this.source.output().send(new GenericMessage<>(message));
}


public void sendJsonMessage(Product product)
{
logger.info(String.format("#### -> Producing message -> %s",product.toString()));
this.source.output().send(new GenericMessage<>(product));
}

}
ConfluentBootApplication.java
package com.seroter.confluentboot;

import org.apache.tomcat.util.net.WriteBuffer.Sink;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;


@SpringBootApplication
@EnableBinding(Source.class)
@RestController
@RequestMapping(value = "/confluent")

public class ConfluentBootApplication {
@Autowired
private com.seroter.confluentboot.engine.Consumer consumer;

public static void main(String[] args) {
SpringApplication.run(ConfluentBootApplication.class, args);
}


}
应用程序属性
spring.cloud.stream.kafka.binder.brokers=pkc-epwny.eastus.azure.confluent.cloud:9092
spring.cloud.stream.bindings.output.destination=test

spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

server.port=9000
有用
enter image description here
我可以验证
enter image description here
我想构建一个 Spring Boot Consumer REST Endpoint?我该怎么做?
更新:
ConfluentConsumer.java
package com.seroter.confluentboot.controller;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import com.seroter.confluentboot.dto.Product;

//@RestController
@EnableBinding(Sink.class)
public class ConfluentConsumer {

@StreamListener(Sink.INPUT)
public void consumeMessage(Product product)
{
System.out.println("******************************");
System.out.println("============= "+product.getProductId()+" ================");
System.out.println("******************************");
}

}
消费者.java
package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;

@Service
@PropertySource("classpath:application.properties")
public class Consumer {

private final Logger logger = LoggerFactory.getLogger(Producer.class);
}

最佳答案

我相信您在这里尝试做的是,从 Kafka consumer 中选择最新消息通过 REST端点,即您想要手动轮询 Kafka话题。通过 REST 发布消息端点是合乎逻辑的,但是通过端点使用消息听起来不是一个好主意。如果你想要一个队列行为,你应该使用 RabbitMQ而不是 Kafka .
但是,如果你想使用 Kafka并手动轮询消息。您可以使用以下两种方法之一。
方法一:创建 ConsumerFactory并获得 Consumer从工厂,然后使用 Consumer 轮询 Kafka

@Configuration
class KafkaConsumerConfig {

private static final String TOPIC_NAME = "test";
private final String userName = "username";
private final String password = "password";

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-epwny.eastus.azure.confluent.cloud:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"conumer-gp-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + userName + " password=" + password);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public Consumer<String, String> createConsumer(ConsumerFactory consumerFactory) {
Consumer consumer = consumerFactory.createConsumer("consumer-group-1", "client-1");
consumer.subscribe(List.of(TOPIC_NAME));
return consumer;
}
}
您可以从 application.properties 中读取主题名称、组 ID、引导服务器、SSL 配置等
现在你可以通过在 RestController 中注入(inject)消费者来消费消息。
private final Consumer<String, String> consumer;

@Autowired
ConsumerController(Consumer<String, String> consumer) {
this.consumer = consumer;
}

@GetMapping("retrieveMessage")
public String getMessage() {
// Kafka might return more than 1 events so be careful
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
String value = iterator.next().value();
consumer.commitSync();
return value;
} else {
return "no message";
}
}
方法二:将消息存储在内存队列中,然后轮询内存队列
spring.cloud.stream.bindings.input.destination=test
然后将消息存储在 Queue并通过 REST 检索它端点
@RestController
@EnableBinding(Sink.class)
class ConsumerController {

private final Queue<String> queue;

ConsumerController() {
this.queue = new ConcurrentLinkedQueue<>();
}


@StreamListener(target = Sink.INPUT)
public void consume(String message) {
this.queue.add(message);
}

@GetMapping("getMessage")
public String retrieveMessage() {
return this.queue.poll();
}
}
缺点:如果您的应用程序重新启动,您将丢失所有内存中的消息。因此,将消息存储在分布式缓存中,例如 Redis将是一个更好的解决方案。

关于java - Confluent Cloud - Spring Boot Consumer REST Endpoint?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64913582/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com