gpt4 book ai didi

Java实现Kafka生产者和消费者的示例

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 25 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Java实现Kafka生产者和消费者的示例由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

Kafka简介 。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统1、高吞吐、低延迟的平台.

Java实现Kafka生产者和消费者的示例

方式一:kafka-clients 。

引入依赖 。

在pom.xml文件中,引入kafka-clients依赖:

?
1
2
3
4
5
< dependency >
   < groupId >org.apache.kafka</ groupId >
   < artifactId >kafka-clients</ artifactId >
   < version >2.3.1</ version >
</ dependency >

生产者 。

创建一个KafkaProducer的生产者实例:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class Config {
 
   public final static String bootstrapServers = "127.0.0.1:9092" ;
 
   @Bean (destroyMethod = "close" )
   public KafkaProducer<String, String> kafkaProducer() {
     Properties props = new Properties();
     //设置Kafka服务器地址
     props.put( "bootstrap.servers" , bootstrapServers);
     //设置数据key的序列化处理类
     props.put( "key.serializer" , StringSerializer. class .getName());
     //设置数据value的序列化处理类
     props.put( "value.serializer" , StringSerializer. class .getName());
     KafkaProducer<String, String> producer = new KafkaProducer<>(props);
     return producer;
   }
}

在Controller中进行使用:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RestController
@Slf4j
public class Controller {
 
   @Autowired
   private KafkaProducer<String, String> kafkaProducer;
 
   @RequestMapping ( "/kafkaClientsSend" )
   public String send() {
     String uuid = UUID.randomUUID().toString();
     RecordMetadata recordMetadata = null ;
     try {
      //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
       recordMetadata = kafkaProducer.send( new ProducerRecord<>( "one-more-topic" , uuid)).get();
       log.info( "recordMetadata: {}" , recordMetadata);
       log.info( "uuid: {}" , uuid);
     } catch (Exception e) {
       log.error( "send fail, uuid: {}" , uuid, e);
     }
     return uuid;
   }
}

消费者 。

创建一个KafkaConsumer的消费者实例:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class Config {
 
   public final static String groupId = "kafka-clients-group" ;
   public final static String bootstrapServers = "127.0.0.1:9092" ;
 
   @Bean (destroyMethod = "close" )
   public KafkaConsumer<String, String> kafkaConsumer() {
     Properties props = new Properties();
     //设置Kafka服务器地址
     props.put( "bootstrap.servers" , bootstrapServers);
     //设置消费组
     props.put( "group.id" , groupId);
     //设置数据key的反序列化处理类
     props.put( "key.deserializer" , StringDeserializer. class .getName());
     //设置数据value的反序列化处理类
     props.put( "value.deserializer" , StringDeserializer. class .getName());
     props.put( "enable.auto.commit" , "true" );
     props.put( "auto.commit.interval.ms" , "1000" );
     props.put( "session.timeout.ms" , "30000" );
     KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
     //订阅名称为“one-more-topic”的Topic的消息
     kafkaConsumer.subscribe(Arrays.asList( "one-more-topic" ));
     return kafkaConsumer;
   }
}

在Controller中进行使用:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@Slf4j
public class Controller {
 
   @Autowired
   private KafkaConsumer<String, String> kafkaConsumer;
 
   @RequestMapping ( "/receive" )
   public List<String> receive() {
    从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息
     ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds( 1 ));
     List<String> messages = new ArrayList<>(records.count());
     for (ConsumerRecord<String, String> record : records.records( "one-more-topic" )) {
       String message = record.value();
       log.info( "message: {}" , message);
       messages.add(message);
     }
     return messages;
   }
}

  。

方式二:spring-kafka 。

使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了.

引入依赖 。

在pom.xml文件中,引入spring-kafka依赖:

?
1
2
3
4
5
< dependency >
   < groupId >org.springframework.kafka</ groupId >
   < artifactId >spring-kafka</ artifactId >
   < version >2.3.12.RELEASE</ version >
</ dependency >

生产者 。

在application.yml文件中增加配置:

?
1
2
3
4
5
6
7
spring:
  kafka:
   #Kafka服务器地址
   bootstrap-servers: 127.0.0.1:9092
   producer:
    #设置数据value的序列化处理类
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

在Controller中注入KafkaTemplate就可以直接使用了,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@Slf4j
public class Controller {
 
   @Autowired
   private KafkaTemplate<String, String> template;
 
   @RequestMapping ( "/springKafkaSend" )
   public String send() {
     String uuid = UUID.randomUUID().toString();
     //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
     this .template.send( "one-more-topic" , uuid);
     log.info( "uuid: {}" , uuid);
     return uuid;
   }
}

消费者 。

在application.yml文件中增加配置:

?
1
2
3
4
5
6
7
spring:
  kafka:
   #Kafka服务器地址
   bootstrap-servers: 127.0.0.1:9092
   consumer:
    #设置数据value的反序列化处理类
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class Receiver {
 
   @KafkaListener (topics = "one-more-topic" , groupId = "spring-kafka-group" )
   public void listen(ConsumerRecord<?, ?> record) {
     Optional<?> kafkaMessage = Optional.ofNullable(record.value());
     if (kafkaMessage.isPresent()) {
       String message = (String) kafkaMessage.get();
       log.info( "message: {}" , message);
     }
   }
}

到此这篇关于Java实现Kafka生产者和消费者的示例的文章就介绍到这了,更多相关Java Kafka生产者和消费者 内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。

原文链接:https://blog.csdn.net/heihaozi/article/details/111042472 。

最后此篇关于Java实现Kafka生产者和消费者的示例的文章就讲到这里了,如果你想了解更多关于Java实现Kafka生产者和消费者的示例的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com