- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章springBoot整合RocketMQ及坑的示例代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
版本:
pom 配置: 。
1
2
3
4
5
6
7
8
9
10
|
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version>
1.5
.
10
.release</version>
</parent>
<dependency>
<groupid>org.apache.rocketmq</groupid>
<artifactid>rocketmq-client</artifactid>
<version>
4.2
.
0
</version>
</dependency>
|
application.properties 配置
1
2
3
4
5
6
|
# 消费者的组名
apache.rocketmq.consumer.pushconsumer=pushconsumer
# 生产者的组名
apache.rocketmq.producer.producergroup=producer
# nameserver地址
apache.rocketmq.namesrvaddr=localhost:
9876
|
java代码:
生产者 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
package
test.config.rocketmq;
import
org.apache.rocketmq.client.producer.defaultmqproducer;
import
org.apache.rocketmq.client.producer.sendresult;
import
org.apache.rocketmq.common.message.message;
import
org.apache.rocketmq.remoting.common.remotinghelper;
import
org.springframework.beans.factory.annotation.value;
import
org.springframework.stereotype.component;
import
org.springframework.util.stopwatch;
import
javax.annotation.postconstruct;
@component
public
class
rocketmqclient {
/**
* 生产者的组名
*/
@value
(
"${apache.rocketmq.producer.producergroup}"
)
private
string producergroup;
/**
* nameserver 地址
*/
@value
(
"${apache.rocketmq.namesrvaddr}"
)
private
string namesrvaddr;
@postconstruct
public
void
defaultmqproducer() {
//生产者的组名
defaultmqproducer producer =
new
defaultmqproducer(producergroup);
//指定nameserver地址,多个地址以 ; 隔开
producer.setnamesrvaddr(namesrvaddr);
producer.setvipchannelenabled(
false
);
try
{
/**
* producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
//创建一个消息实例,包含 topic、tag 和 消息体
//如下:topic 为 "topictest",tag 为 "push"
message message =
new
message(
"topictest"
,
"push"
,
"发送消息----zhisheng-----"
.getbytes(remotinghelper.default_charset));
stopwatch stop =
new
stopwatch();
stop.start();
for
(
int
i =
0
; i <
1
; i++) {
sendresult result = producer.send(message);
system.out.println(
"发送响应:msgid:"
+ result.getmsgid() +
",发送状态:"
+ result.getsendstatus());
}
stop.stop();
system.out.println(
"----------------发送一万条消息耗时:"
+ stop.gettotaltimemillis());
}
catch
(exception e) {
e.printstacktrace();
}
finally
{
producer.shutdown();
}
}
}
|
消费者: 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
import
org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import
org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import
org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import
org.apache.rocketmq.common.consumer.consumefromwhere;
import
org.apache.rocketmq.common.message.messageext;
import
org.apache.rocketmq.remoting.common.remotinghelper;
import
org.springframework.beans.factory.annotation.value;
import
org.springframework.stereotype.component;
import
javax.annotation.postconstruct;
@component
public
class
rocketmqserver {
/**
* 消费者的组名
*/
@value
(
"${apache.rocketmq.consumer.pushconsumer}"
)
private
string consumergroup;
/**
* nameserver 地址
*/
@value
(
"${apache.rocketmq.namesrvaddr}"
)
private
string namesrvaddr;
@postconstruct
public
void
defaultmqpushconsumer() {
//消费者的组名
defaultmqpushconsumer consumer =
new
defaultmqpushconsumer(consumergroup);
//指定nameserver地址,多个地址以 ; 隔开
consumer.setnamesrvaddr(namesrvaddr);
consumer.setvipchannelenabled(
false
);
try
{
//订阅pushtopic下tag为push的消息
consumer.subscribe(
"topictest"
,
"push"
);
//设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);
consumer.registermessagelistener((messagelistenerconcurrently) (list, context) -> {
try
{
for
(messageext messageext : list) {
system.out.println(
"messageext: "
+ messageext);
//输出消息内容
string messagebody =
new
string(messageext.getbody(), remotinghelper.default_charset);
system.out.println(
"消费响应:msgid : "
+ messageext.getmsgid() +
", msgbody : "
+ messagebody);
//输出消息内容
}
}
catch
(exception e) {
e.printstacktrace();
return
consumeconcurrentlystatus.reconsume_later;
//稍后再试
}
return
consumeconcurrentlystatus.consume_success;
//消费成功
});
consumer.start();
}
catch
(exception e) {
e.printstacktrace();
}
}
}
|
掉坑总结:
1.rocketmq启动时,命令不是 mqbroker -n 127.0.0.1:9876 。
正确应该是:mqbroker -n 127.0.0.1:9876 butiautocreatetopicenable=true 。
否则会抛出:no route info of this topic, topictest 。
2.客户端连接时抛出异常 。
org.apache.rocketmq.client.exception.mqclientexception: 。
send [3] times, still failed, cost [3180]ms, topic: topictest, brokerssent: \ 。
[win-93cgo0s5g25, win-93cgo0s5g25, win-93cgo0s5g25] 。
解决方式两种 。
1.producer.setvipchannelenabled(false); 生产者和消费者添加这行代买.
2.降rocketmq版本,降成3.2.6 。
关于spring.rocketmq.name-server的坑 。
看下图:
注意:
如果你是springboot2.0+的框架,或者是jdk10.
你需要将你自己的项目配置文件中的,spring.rocketmq.name-server改成 。
spring.rocketmq.nameserver。注意是nameserver.
不然就会报各种稀奇古怪的bug.
关于启动报内存不足的错 。
在安装启动name server和broker的时候,一定要修改配置文件,不然内存会爆炸.
native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory 。
将下面的配置文件根据你的需要改 。
我这里以前默认是xms4g,都是g,我修改到m就行了.
java_opt="${java_opt} -server -xms256m -xmx256m -xmn128m -xx:metaspacesize=128m -xx:maxmetaspacesize=320m" 。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:https://blog.csdn.net/qq_24853627/article/details/79443437 。
最后此篇关于springBoot整合RocketMQ及坑的示例代码的文章就讲到这里了,如果你想了解更多关于springBoot整合RocketMQ及坑的示例代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
这个问题在这里已经有了答案: 关闭 11 年前。 Possible Duplicate: Sample data for IPv6? 除了 wireshark 在其网站上提供的内容之外,是否有可以下
我正在寻找可以集成到现有应用程序中并使用多拖放功能的示例或任何现成的解决方案。我在互联网上找到的大多数解决方案在将多个项目从 ListBox 等控件拖放到另一个 ListBox 时效果不佳。谁能指出我
我是 GATE Embedded 的新手,我尝试了简单的示例并得到了 NoClassDefFoundError。首先我会解释我尝试了什么 在 D:\project\gate-7.0 中下载并提取 Ga
是否有像 Eclipse 中的 SWT 示例那样的多合一 JFace 控件示例?搜索(在 stackoverflow.com 上使用谷歌搜索和搜索)对我没有帮助。 如果它是一个独立的应用程序或 ecl
我找不到任何可以清楚地解释如何通过 .net API(特别是 c#)使用谷歌计算引擎的内容。有没有人可以指点我什么? 附言我知道 API 引用 ( https://developers.google.
最近在做公司的一个项目时,客户需要我们定时获取他们矩阵系统的数据。在与客户进行对接时,提到他们的接口使用的目前不常用的BASIC 认证。天呢,它好不安全,容易被不法人监听,咋还在使用呀。但是没办法呀,
最近在做公司的一个项目时,客户需要我们定时获取他们矩阵系统的数据。在与客户进行对接时,提到他们的接口使用的目前不常用的BASIC 认证。天呢,它好不安全,容易被不法人监听,咋还在使用呀。但是没办法呀,
我正在尝试为我的应用程序设计配置文件格式并选择了 YAML。但是,这(显然)意味着我需要能够定义、解析和验证正确的 YAML 语法! 在配置文件中,必须有一个名为 widgets 的集合/序列。 .这
你能给我一个使用 pysmb 库连接到一些 samba 服务器的例子吗?我读过有类 smb.SMBConnection.SMBConnection(用户名、密码、my_name、remote_name
linux服务器默认通过22端口用ssh协议登录,这种不安全。今天想做限制,即允许部分来源ip连接服务器。 案例目标:通过iptables规则限制对linux服务器的登录。 处理方法:编
我一直在寻找任何 PostProjectAnalysisTask 工作代码示例,但没有看。 This页面指出 HipChat plugin使用这个钩子(Hook),但在我看来它仍然使用遗留的 Po
我发现了 GWT 的 CustomScrollPanel 以及如何自定义滚动条,但我找不到任何示例或如何设置它。是否有任何示例显示正在使用的自定义滚动条? 最佳答案 这是自定义 native 滚动条的
我正在尝试开发一个 Backbone Marionette 应用程序,我需要知道如何以最佳方式执行 CRUD(创建、读取、更新和销毁)操作。我找不到任何解释这一点的资源(仅适用于 Backbone)。
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题?通过 editing this post 添加详细信息并澄清问题. 去年关闭。 Improve this
我需要一个提交多个单独请求的 django 表单,如果没有大量定制,我找不到如何做到这一点的示例。即,假设有一个汽车维修店使用的表格。该表格将列出商店能够进行的所有可能的维修,并且用户将选择他们想要进
我有一个 Multi-Tenancy 应用程序。然而,这个相同的应用程序有 liquibase。我需要在我的所有数据源中运行 liquibase,但是我不能使用这个 Bean。 我的应用程序.yml
我了解有关单元测试的一般思想,并已在系统中发生复杂交互的场景中使用它,但我仍然对所有这些原则结合在一起有疑问。 我们被警告不要测试框架或数据库。好的 UI 设计不适合非人工测试。 MVC 框架不包括一
我正在使用 docjure并且它的 select-columns 函数需要一个列映射。我想获取所有列而无需手动指定。 如何将以下内容生成为惰性无限向量序列 [:A :B :C :D :E ... :A
$condition使用说明和 $param在 findByAttributes在 Yii 在大多数情况下,这就是我使用 findByAttributes 的方式 Person::model()->f
我在 Ubuntu 11.10 上安装了 qtcreator sudo apt-get install qtcreator 安装的版本有:QT Creator 2.2.1、QT 4.7.3 当我启动
我是一名优秀的程序员,十分优秀!