- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章SpringBoot整合分布式消息平台Pulsar由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
大家好,我是君哥.
作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端.
Pulsar 的部署方式主要有 3 种,本地安装二进制文件、docker 部署、在 Kubernetes 上部署.
本文采用 docker 部署一个单节点的 Pulsar 集群。实验环境是 2 核 CPU 和 4G 内存.
部署命令如下:
安装过程可能会出现下面的错误:
这是因为 docker 版本低,不支持 mount 参数,把 docker 版本升级到 17.06 以上就可以了.
部署过程中可能会因为网络的原因失败,多试几次就可以成功了。如果看到下面的日志,就说明启动成功了.
本地单节点集群启动后,会创建一个 namespace,名字叫 public/default 。
目前 Pulsar 支持多种语言的客户端,包括:
Java 客户端Go 客户端Python 客户端C++ 客户端Node.js 客户端WebSocket 客户端C# 客户端 。
使用 SpringBoot 整合 Pulsar 客户端,首先引入 Pulsar 客户端依赖,代码如下:
然后在 properties 文件中添加配置
创建客户端非常简单,代码如下:
上面的 url 就是 properties 文件中定义的 pulsar.url .
创建 Client 时,即使集群没有启成功,程序也不会报错,因为这时还没有真正地去连接集群.
创建 Producer,会真正的连接集群,这时如果集群有问题,就会报连接错误.
下面解释一下创建 Producer 的参数:
topic:Producer 要写入的 topic.
compressionType:压缩策略,目前支持 4 种策略 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 开始,只有 Consumer 的版本在 2.3 以上,这个策略才会生效.
sendTimeout:超时时间,如果 Producer 在超时时间为收到 ACK,会进行重新发送.
enableBatching:是否开启消息批量处理,这里默认 true,这个参数只有在异步发送 (sendAsync) 时才能生效,选择同步发送会失效.
batchingMaxPublishDelay:批量发送消息的时间段,这里定义的是 10ms,需要注意的是,设置了批量时间,就不会受消息数量的影响。批量发送会把要发送的批量消息放在一个网络包里发送出去,减少网络 IO 次数,大大提高网卡的发送效率.
batchingMaxMessages:批量发送消息的最大数量.
maxPendingMessages:等待从 broker 接收 ACK 的消息队列最大长度。如果这个队列满了,producer 所有的 sendAsync 和 send 都会失败,除非设置了 blockIfQueueFull 值是 true.
blockIfQueueFull:Producer 发送消息时会把消息先放入本地 Queue 缓存,如果缓存满了,就会阻塞消息发送.
roundRobinRouterBatchingPartition-SwitchFrequency:如果发送消息时没有指定 key,那默认采用 round robin 的方式发送消息,使用 round robin 的方式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay).
Pulsar 的消费模型如下图:
从图中可以看到,Consumer 要绑定一个 subscription 才能进行消费.
下面解释一下创建 Consumer 的参数:
topic:Consumer 要订阅的 topic.
subscriptionName:consumer 要关联的 subscription 名字.
subscriptionType:订阅类型,Pulsar 支持四种类型订阅:
Exclusive:独占模式,同一个 Topic 只能有一个消费者,如果多个消费者,就会出错。Failover:灾备模式,同一个 Topic 可以有多个消费者,但是只能有一个消费者消费,其他消费者作为故障转移备用,如果当前消费者出了故障,就从备用消费者中选择一个进行消费。如下图:
Shared:共享模式,同一个 Topic 可以由多个消费者订阅和消费。消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开,如果发送给它消息没有被消费,这些消息会被重新分发给其它存活的消费者。如下图:
Key_Shared:消息和消费者都会绑定一个key,消息只会发送给绑定同一个key的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序。如下图:
subscriptionInitialPosition:创建新的 subscription 时从哪里开始消费,有两个选项:
Latest:从最新的消息开始消费Earliest:从最早的消息开始消费 。
negativeAckRedeliveryDelay:消费失败后间隔多久 broker 重新发送.
receiverQueueSize:在调用 receive 方法之前,最多能累积多少条消息。可以设置为 0,这样每次只从 broker 拉取一条消息。在 Shared 模式下,receiverQueueSize 设置为 0,可以防止批量消息多发给一个 Consumer 而导致其他 Consumer 空闲.
Consumer 接收消息有四种方式:同步单条、同步批量、异步单条和异步批量,代码如下:
对于批量接收,也可以设置批量接收的策略,代码如下:
代码中的参数说明如下:
maxNumMessages:批量接收的最大消息数量。maxNumBytes:批量接收消息的大小,这里是 1MB.
首先编写 Producer 发送消息的代码,如下:
然后编写一个 Consumer 消费消息的代码,如下:
最后编写一个 Controller 类,调用 Producer 发送消息,代码如下:
调用 Producer 发送一条消息,key=key1,data=data1,具体操作为在浏览器中输入下面的 url 后回车:
可以看到控制台输出下面日志:
从日志中看到,这里使用的 namespace 就是创建集群时生成的public/default.
从 SpringBoot 整合 Java 客户端使用来看,Pulsar 的 api 是非常友好的,使用起来方便简洁。Consumer 的使用需要考虑多一些,需要考虑到批量、异步以及订阅类型.
原文链接:https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ 。
最后此篇关于SpringBoot整合分布式消息平台Pulsar的文章就讲到这里了,如果你想了解更多关于SpringBoot整合分布式消息平台Pulsar的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我在 cordova@7.1.0、cordova-ios@4.5.2 下运行。安装平台:ios 4.5.2。 我运行 npm install、bower install,然后运行 cordova
我正在使用 VSTS 构建 IOS,运行命令后出现以下错误:cordova build ios 平台“android”似乎不是有效的 cordova 平台。它缺少 API.js。不支持安卓。 Cord
您使用什么软件/Wiki 来编写和分享有关开发人员、测试人员和管理人员的规范? 你使用维基系统,如果是,你使用什么维基软件? 或者您是否使用 Sharepoint 来管理和版本规范?将 SharePo
这是一家销售完整软件套件/平台的公司的示例 www.ql2.com/technology/platform.php 我想知道这样的套件/平台是如何开发的?你必须使用J2EE吗? 我更感兴趣的是这家公司
这个问题不太可能对任何 future 的访客有帮助;它只与一个较小的地理区域、一个特定的时间点或一个非常狭窄的情况相关,通常不适用于全世界的互联网受众。如需帮助使此问题更广泛适用,visit the
我有一个连接到套接字连接的应用程序,并且该连接向我发送了很多信息..可以说每秒 300 个订单(也许更多)..我有一个类(它就像一个监听器,对某个事件(并且该事件具有顺序)接收该顺序。创建一个对象,然
我即将开始一个 Netbeans 平台的项目。有没有人推荐他们用过并觉得有用的书籍或教程? 编辑: 这是一个已经开发好的swing应用。 最佳答案 除了 NetBeans 网站上的教程外,我还喜欢这本
有没有什么好的方法可以以非特定语言的方式定义接口(interface)/类层次结构,然后以特定语言生成相应的源代码?特别是,我需要同时针对 Java 和 C# 来创建一个相当全面的 API。我记得有一
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 8 年前。 Improve
大家晚上好我使用 API 平台,我想在创建实体时自动将所有者添加到我的实体中。我创建了一个事件来覆盖 API 平台,它获取当前用户并添加它。但是我的事件永远不会发生,但它确实存在于 debug:eve
这是一个有点奇怪的元编程问题,但我意识到我的新项目不需要完整的 MVC 框架,作为一个 Rails 人,我不确定现在该使用什么。 为您提供必要功能的要点;该网站将显示静态页面,但用户将能够登录并“编辑
这两天我的信息有点过载。 我打算建立自己的网站,允许本地企业列出他们的打折商品,然后用户可以进来搜索“Abercrombie T 恤”,然后就会列出出售它们的商店。 这是一个非常棒的小项目,我真的很兴
我的任务是为产品的下一代版本评估“企业”平台。我们目前正在考虑两种“类型”的平台——RAD(工作流引擎、集成 UI、工作流“技术插件”的小核心、状态的自动持久化……),例如 SalesForce.co
我需要一个不依赖于特定语言或构建系统的依赖管理器。我研究了几个优秀的工具(Gradle、Bazel、Hunter、Biicode、Conan 等),但没有一个能满足我的要求(见下文)。我还使用了 Gi
我在 Symfony 4 Flex 应用程序中使用 API Platform v2.2.5,该应用程序由一个功能 API 和 JWT Authentication 组成。 ,一些资源默认Open AP
虽然隐私法通常不属于我们开发人员的管辖范围,但我确实认为这是一个重要的话题,因为我们开发人员应该有责任警告我们的雇主,如果他们想要的东西会违反一些法律......在这种情况下,隐私法......通常情
我已经下载了 VisualVM 源代码,并尝试使用 Netbeans 7.01 编译 Glassfish 插件。这样做会导致以下错误: C:\source\visualvm\trunk\plugins
尝试 gradle 同步后...失败并在消息对话框中显示 Missing Android platform(s) detected: 'android-26' Install missing plat
大家好!我最近开始使用 Cordova,当我运行 Cordova platform add android 时,出现以下错误。我已经成功放置了 Java 和 Android SDK 的环境变量。但 n
已关闭。这个问题是 off-topic 。目前不接受答案。 想要改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 已关闭10 年前。 Improve th
我是一名优秀的程序员,十分优秀!