- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章基于RocketMQ推拉模式详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
消费者客户端有两种方式从消息中间件获取消息并消费。严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现.
通过 Pull 不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息.
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”).
如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能; 。
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费.
但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久.
概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常; 。
主动推送的模式实现起来简单,避免了拉取的消费端业务逻辑的复杂度,消息的消费可以认为是实时的,同时也存在一定的弊端,要求消费端要有很强的消费能力.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public
class
Consumer1 {
public
static
void
main(String[] args){
try
{
DefaultMQPushConsumer consumer =
new
DefaultMQPushConsumer();
consumer.setConsumerGroup(
"consumer_push"
);
consumer.setNamesrvAddr(
"10.10.12.203:9876;10.10.12.204:9876"
);
consumer.subscribe(
"TopicTest"
,
"*"
);
consumer.registerMessageListener(
new
MessageListenerConcurrently(){
@Override
public
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
try
{
for
(MessageExt msg : paramList){
String msgbody =
new
String(msg.getBody(),
"utf-8"
);
SimpleDateFormat sd =
new
SimpleDateFormat(
"YYYY-MM-dd HH:mm:ss"
);
Date date =
new
Date(msg.getStoreTimestamp());
System.out.println(
"Consumer1=== 存入时间 : "
+ sd.format(date) +
" == MessageBody: "
+ msgbody);
//输出消息内容
}
}
catch
(Exception e) {
e.printStackTrace();
return
ConsumeConcurrentlyStatus.RECONSUME_LATER;
//稍后再试
}
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//消费成功
}
});
consumer.start();
System.out.println(
"Consumer1===启动成功!"
);
}
catch
(Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
|
PUSH消费方式,需要注册一个监听器Listener,,用来监听最新的消息,进行业务处理,同时反馈消息的消费状态,消费成功(CONSUME_SUCCESS)、消费重试(RECONSUME_LATER),消息重试会根据配置的消息的延迟等级的时间间隔,定时重新发送消费失败的记录。(PS:延迟消息中会重点讨论) 。
PUSH消息方式由于返回了消息的状态,服务端会维护每个消费端的消费进度,内部会记录消费进度,消息发送成功后会更新消费进度.
PUSH消息方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中.
上一个章节说明了服务端存储的每个主题对应的消费组的每个消息队列的偏移量 。
查看服务器文件上的消费进度信息:
/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public
class
PullConsumer {
private
static
final
Map<MessageQueue, Long> offseTable =
new
HashMap<MessageQueue, Long>();
public
static
void
main(String[] args)
throws
MQClientException {
DefaultMQPullConsumer consumer =
new
DefaultMQPullConsumer(
"pullConsumer"
);
consumer.setNamesrvAddr(
"10.10.12.203:9876;10.10.12.204:9876"
);
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(
"TopicTest"
);
for
(MessageQueue mq : mqs) {
SINGLE_MQ:
while
(
true
) {
try
{
PullResult pullResult =
consumer.pullBlockIfNotFound(mq,
null
, getMessageQueueOffset(mq),
32
);
System.out.println(
"============================================================="
);
System.out.println(
"Consume from the queue: "
+ mq +
"offset:"
+ getMessageQueueOffset(mq) +
"结果:"
+ pullResult.getPullStatus());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch
(pullResult.getPullStatus()) {
case
FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for
(MessageExt m : messageExtList) {
System.out.print(
new
String(m.getBody()) +
" == "
);
}
System.out.println(
""
);
case
NO_MATCHED_MSG:
break
;
case
NO_NEW_MSG:
break
SINGLE_MQ;
case
OFFSET_ILLEGAL:
break
;
default
:
break
;
}
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private
static
void
putMessageQueueOffset(MessageQueue mq,
long
offset) {
offseTable.put(mq, offset);
}
private
static
long
getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if
(offset !=
null
)
return
offset;
return
0
;
}
}
|
结果:
每次拉取消息的时候需要提供偏移量和拉取的消息的个数,需要自己业务实现每个主题下的队列的消费进度.
代码实现(1)这种方式只能拉取历史的消息,最新的消息拉取不了,也可以进行改造,来实现一直拉取.
在MQPullConsumer这个类里面,有一个MessageQueueListener,它的目的就是当queue发生变化的时候,通知Consumer。也正是这个借口,帮助我们在Pull模式里面,实现负载均衡.
注意,这个接口在MQPushConsumer里面是没有的,那里面有的是上面代码里的MessageListener.
1
2
3
4
5
|
void
registerMessageQueueListener(
final
String topic,
final
MessageQueueListener listener);
public
interface
MessageQueueListener {
void
messageQueueChanged(
final
String topic,
final
Set<MessageQueue> mqAll,
final
Set<MessageQueue> mqDivided);
}
|
有了这个Listener,我们就可以动态的知道当前的Consumer分摊到了几个MessageQueue。然后对这些MessageQueue,我们可以开个线程池来消费.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
public
class
PullConsumerExtend {
public
static
void
main(String[] args)
throws
MQClientException {
//消费组
final
MQPullConsumerScheduleService scheduleService =
new
MQPullConsumerScheduleService(
"pullConsumer"
);
//MQ NameService地址
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(
"10.10.12.203:9876;10.10.12.204:9876"
);
//负载均衡模式
scheduleService.setMessageModel(MessageModel.CLUSTERING);
//需要处理的消息topic
scheduleService.registerPullTaskCallback(
"TopicTest"
,
new
PullTaskCallback() {
@Override
public
void
doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try
{
long
offset = consumer.fetchConsumeOffset(mq,
false
);
if
(offset <
0
)
offset =
0
;
PullResult pullResult = consumer.pull(mq,
"*"
, offset,
32
);
System.out.println(
""
);
System.out.println(
"Consume from the queue: "
+ mq +
"offset:"
+ offset +
"结果:"
+ pullResult.getPullStatus());
switch
(pullResult.getPullStatus()) {
case
FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for
(MessageExt m : messageExtList) {
System.out.print(
new
String(m.getBody()) +
" == "
);
}
break
;
case
NO_MATCHED_MSG:
break
;
case
NO_NEW_MSG:
case
OFFSET_ILLEGAL:
break
;
default
:
break
;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
//设置下一下拉取的间隔时间
context.setPullNextDelayTimeMillis(
10000
);
}
catch
(Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
|
结果:
比较**代码实现(1)**这种方式改进了很多,不需要业务维护每个消费队列的消费进度,可以更新到服务端的.
弊端也很明显就是每次队列拉取消息的时间间隔,时间长导致消息挤压,时间段消息少,影响服务端性能.
以上为个人经验,希望能给大家一个参考,也希望大家多多支持我.
原文链接:https://my.oschina.net/mingxungu/blog/3083956 。
最后此篇关于基于RocketMQ推拉模式详解的文章就讲到这里了,如果你想了解更多关于基于RocketMQ推拉模式详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
对此感到疯狂,真的缺少一些东西。 我有webpack 4.6.0,webpack-cli ^ 2.1.2,所以是最新的。 在文档(https://webpack.js.org/concepts/mod
object Host "os.google.com" { import "windows" address = "linux.google.com" groups = ["linux"] } obj
每当我安装我的应用程序时,我都可以将数据库从 Assets 文件夹复制到 /data/data/packagename/databases/ .到此为止,应用程序工作得很好。 但 10 或 15 秒后
我在 cc 模式缓冲区中使用 hideshow.el 来折叠我不查看的文件部分。 如果能够在 XML 文档中做到这一点就好了。我使用 emacs 22.2.1 和内置的 sgml-mode 进行 xm
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
根据java: public Scanner useDelimiter(String pattern) Sets this scanner's delimiting pattern to a patt
我读过一些关于 PRG 模式以及它如何防止用户重新提交表单的文章。比如this post有一张不错的图: 我能理解为什么在收到 2xx 后用户刷新页面时不会发生表单提交。但我仍然想知道: (1) 如果
看看下面的图片,您可能会清楚地看到这一点。 那么如何在带有其他一些 View 的简单屏幕中实现没有任何弹出/对话框/模式的微调器日期选择器? 我在整个网络上进行了谷歌搜索,但没有找到与之相关的任何合适
我不知道该怎么做,我一直遇到问题。 以下是代码: rows = int(input()) for i in range(1,rows): for j in range(1,i+1):
我想为重写创建一个正则表达式。 将所有请求重写为 index.php(不需要匹配),它不是以/api 开头,或者不是以('.html',或'.js'或'.css'或'.png'结束) 我的例子还是这样
MVC模式代表 Model-View-Controller(模型-视图-控制器) 模式 MVC模式用于应用程序的分层开发 Model(模型) - 模型代表一个存取数据的对象或 JAVA PO
我想为组织模式创建一个 RDF 模式世界。您可能知道,组织模式文档基于层次结构大纲,其中标题是主要的分组实体。 * March auxiliary :PROPERTIES: :HLEVEL: 1 :E
我正在编写一个可以从文件中读取 JSON 数据的软件。该文件包含“person”——一个值为对象数组的对象。我打算使用 JSON 模式验证库来验证内容,而不是自己编写代码。符合代表以下数据的 JSON
假设我有 4 张 table 人 公司 团体 和 账单 现在bills/persons和bills/companys和bills/groups之间是多对多的关系。 我看到了 4 种可能的 sql 模式
假设您有这样的文档: doc1: id:1 text: ... references: Journal1, 2013, pag 123 references: Journal2, 2014,
我有这个架构。它检查评论,目前工作正常。 var schema = { id: '', type: 'object', additionalProperties: false, pro
这可能很简单,但有人可以解释为什么以下模式匹配不明智吗?它说其他规则,例如1, 0, _ 永远不会匹配。 let matchTest(n : int) = let ran = new Rand
我有以下选择序列作为 XML 模式的一部分。理想情况下,我想要一个序列: 来自 my:namespace 的元素必须严格解析。 来自任何其他命名空间的元素,不包括 ##targetNamespace和
我希望编写一个 json 模式来涵盖这个(简化的)示例 { "errorMessage": "", "nbRunningQueries": 0, "isError": Fals
首先,我是 f# 的新手,所以也许答案很明显,但我没有看到。所以我有一些带有 id 和值的元组。我知道我正在寻找的 id,我想从我传入的三个元组中选择正确的元组。我打算用两个 match 语句来做到这
我是一名优秀的程序员,十分优秀!