- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章SpringBoot+Nacos+Kafka微服务流编排的简单实现由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
。
最近一直在做微服务开发,涉及了一些数据处理模块的开发,每个处理业务都会开发独立的微服务,便于后面拓展和流编排,学习了SpringCloud Data Flow等框架,感觉这个框架对于我们来说太重了,维护起来也比较麻烦,于是根据流编排的思想,基于我们目前的技术栈实现简单的流编排功能.
简单的说,我们希望自己的流编排就是微服务可插拔,微服务数据入口及输出可不停机修改.
。
。
自己学习的话推荐使用docker安装,命令如下 。
然后在浏览器输入 ip:8848/nacos 账号nacos 密码nacos 。
docker能够帮助我们快速安装服务,减少再环境准备花的时间 。
。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.0.RELEASE</version></parent><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency><dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.2.1</version></dependency>
配置文件 。
spring: kafka: bootstrap-servers: kafka-server:9092 producer: acks: all consumer: group-id: node1-group #三个服务分别为node1 node2 node3 enable-auto-commit: false# 部署的nacos服务nacos: config: server-addr: nacos-server:8848
建议配置本机host 就可以填写xxx-server 不用填写服务ip 。
。
我们现在需要对三个服务进行编排,保障每个服务可以插拔,也可以调整服务的位子示意图如下:
。
。
通常流编排里面每个服务都有一个输入及输出,分别为input及sink,所以每个服务我们需要配置两个topic,分别是input-topic output-topic,我们就在nacos里面添加输入输出配置 。
nacos配置项需要配置groupId,dataId,通常我们用服务名称作为groupId,配置项的名称作为dataId,如node1-server服务有一个input配置项,配置如下
完成其中一个服务的配置,其它服务参考下图配置即可 。
。
@Configuration@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)// autoRefreshed=true指的是nacos中配置发生改变后会刷新,false代表只会使用服务启动时候读取到的值@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)public class NacosConfig { @NacosValue(value = "${input:}", autoRefreshed = true) private String input; @NacosValue(value = "${sink:}", autoRefreshed = true) private String sink; public String getInput() { return input; } public String getSink() { return sink; }}
。
服务的输入需要在服务启动时候创建消费者,在topic发生改变时候重新创建消费者,移除旧topic的消费者,输出是业务驱动的,无需监听改变,在每次发送时候读取到的都是最新配置的topic,因为在上面的配置类中autoRefreshed = true,这个只会刷新nacosConfig中的配置值,服务需要知道配置改变去驱动消费的创建业务,需要创建nacos配置监听 。
/** * 监听Nacos配置改变,创建消费者,更新消费 */@Componentpublic class ConsumerManager { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.consumer.group-id}") private boolean groupId; @Autowired private NacosConfig nacosConfig; @Autowired private KafkaTemplate kafkaTemplate; // 用于存放当前消费者使用的topic private String topic; // 用于执行消费者线程 private ExecutorService executorService; /** * 监听input */ @NacosConfigListener(dataId = "node1-server", groupId = "input") public void inputListener(String input) { // 这个监听触发的时候 实际NacosConfig中input的值已经是最新的值了 我们只是需要这个监听触发我们更新消费者的业务 String inputTopic = nacosConfig.getInput(); // 我使用nacosConfig中读取的原因是因为监听到内容是input=xxxx而不是xxxx,如果使用需要自己截取一下,nacosConfig中的内容框架会处理好,大家看一下第一张图的配置内容就明白了 // 先检查当前局部变量topic是否有值,有值代表是更新消费者,没有值只需要创建即可 if(topic != null) { // 停止旧的消费者线程 executorService.shutdownNow(); executorService == null; } // 根据为新的topic创建消费者 topic = inputTopic; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build(); executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2), threadFactory); // 执行消费业务 executorService.execute(() -> consumer(topic)); } /** * 创建消费者 */ public void consumer(String topic) { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); properties.put("enable.auto.commit", enableAutoCommit); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", groupId); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList(topic)); try { while (!Thread.currentThread().isInterrupted()) { Duration duration = Duration.ofSeconds(1L); ConsumerRecords<String, String> records = consumer.poll(duration); for (ConsumerRecord<String, String> record : records) { String message = record.value(); // 执行数据处理业务 省略业务实现 String handleMessage = handle(message); // 处理完成后发送到下一个节点 kafkaTemplate.send(nacosConfig.getSink(), handleMessage); } } consumer.commitAsync(); } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } }}
。
流编排的思路整体来说就是数据流方向可调,我们以此为需求,根据一些主流框架提供的api实现自己的动态调整方案,可以帮助自己更好的理解流编码思想及原理,在实际业务中,还有许多业务问题需要去突破,我们这样处理更多是因为服务可插拔,便于流处理微服务在项目灵活搭配,因为我现在工作是在传统公司,由于一些原因很难去推动新框架的使用,经常会用一些现有技术栈组合搞一些sao操作,供大家参考,希望大家多多指教.
到此这篇关于SpringBoot+Nacos+Kafka微服务流编排的简单实现的文章就介绍到这了,更多相关SpringBoot+Nacos+Kafka流编排内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://juejin.cn/post/6997704312835047438 。
最后此篇关于SpringBoot+Nacos+Kafka微服务流编排的简单实现的文章就讲到这里了,如果你想了解更多关于SpringBoot+Nacos+Kafka微服务流编排的简单实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
SpringBoot-Admin 服务监控 简单介绍 Spring Boot Actuator 是 Spring Boot 自带的一个功能模块, 提供了一组已经开箱即用的生产环境下常用
我想查找通过关键字匹配字段 nameEnglish 或 nameChinese 的模型列表。我花了一个多小时谷歌搜索但我做不到。请帮忙。 Springboot Mongo 入门示例 https://s
(请注意:在调查 this issue 时,我更好地发现了我在此处介绍的问题根源) 我对 Hibernate 和 SpringBoot 非常陌生。我的项目涉及一个搜索引擎,其中索引(javafx 客户
我最近有一个 Web 应用程序从 springboot 升级到 springboot 2。当我将其部署到 Tomcat 8 时,它似乎启动了,但没有完全启动。 在 localhost.2019-09-
我是 Spring boot 的新手...我在运行 Controller 时遇到问题, Description: Field todoService in com.springboot.todoCon
我有一个SpringBoot应用程序,它使用以下配置与PostgreSQL通信,通过AWS Beanstrik部署:。在我将AWS Aurora证书更新为rds-ca-ecc384-g1之前,一切都很
实在是不知道标题写什么了 可以在评论区给个建议哈哈哈哈 先用这个作为标题吧 尝试使用 国内给出的 AI 大模型做出一个 可以和 AI 对话的 网站出来 使用 智普AI 只能 在控制
一、介绍 在实际的软件系统开发过程中,由于业务的需求,在代码层面实现数据的脱敏还是远远不够的,往往还需要在数据库层面针对某些关键性的敏感信息,例如:身份证号、银行卡号、手机号、工资等信息进行加密存储
Selenium Selenium是一个用于Web应用程序自动化测试的开源工具套件。它主要用于以下目的: 浏览器自动化:Selenium能够模拟真实用户在不同浏览器(如Chrome、Fire
一、简介 在实际的项目开发过程中,经常需要用到邮件通知功能。例如,通过邮箱注册,邮箱找回密码,邮箱推送报表等等,实际的应用场景非常的多。 早期的时候,为了能实现邮件的自动发送功能,通常会使用 Ja
SpringBoot:基于redis自定义注解实现后端接口防重复提交校验 一、添加依赖 org.springframework.boot spring
SpringBoot:使用Jackson完成全局序列化配置 一、测试准备 com.fasterxml.jackson.core jackson-cor
springboot:整合rocketmq 一、简易消息操作 生产者整合mq 导入依赖 org.springframework.boot
springboot:常用注解 一、spring常用注解 包扫描+组件标注注解 @Component:泛指各种组件 @Controller、@Service、@Repository都可以称为@Comp
我们经常需要在两个系统之间进行一些数据的交互,这时候我们就需要开发数据交互接口。 一般来说,遇到比较多的接口有HTTP接口、WebService接口、FTP文件传输。今天我要来学习一下在SpringB
背景 近期项目上线,甲方要求通过安全检测才能进行验收,故针对扫描结果对系统进行了一系列的安全加固,本文对一些常见的安全问题及防护策略进行介绍,提供对应的解决方案 跨站脚本攻击 XSS常发生于论坛评论等
1.排除 Spring-boot-starter 默认的日志配置 将原本的 spring-boot-starter 改为 org.springframework.boot
springboot:解决跨域问题 一、跨域简介 URL的组成: // 协议 + 域名(子域名 + 主域名) + 端口号 + 资源地址 http://www.baidu.com:8080/ 只要协
一、自定义Starter 的思路: 创建一个Maven工程,创建三个模块 一个模块为demo-app,一个模块为demo-module,一个模块为demo-module-springboot-star
1.pom.xml 4.0.0 org.springframework.boot spring-boot-starter-parent
我是一名优秀的程序员,十分优秀!