- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章学会Pulsar Consumer的使用方式由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
引入依赖:
1
2
3
4
5
|
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>
2.6
.
1
</version>
</dependency>
|
在尝试使用Producer和Consumer前,我们先讲一下Pulsar客户端,因为不管是Producer还是Consumer,都是依靠PulsarClient来创建的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/**
* Pulsar工具类
* @author winfun
**/
public
class
PulsarUtils {
/**
* 根据serviceUrl创建PulsarClient
* @param serviceUrl 服务地址
* @return 客户端
* @throws PulsarClientException 异常
*/
public
static
PulsarClient createPulsarClient(String serviceUrl)
throws
PulsarClientException {
return
PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
}
}
|
我们这里简单使用,只借用ServiceUrl创建客户端,其实还有很多比较重要的参数,下面稍微列举一下:
Producer这里我们也先简单使用,只负责往指定Topic发送消息,其他功能不用,例如异步发送、延时发送等 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
* 初次使用Pulsar生产者,无任何封装
* @author winfun
**/
public
class
FirstProducerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(
"pulsar://127.0.0.1:6650"
)
.build();
ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic(
"winfun/study/test-topic"
)
.blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(
100
).enableBatching(Boolean.TRUE).sendTimeout(
3
, TimeUnit.SECONDS);
Producer<String> producer = productBuilder.create();
for
(
int
i =
0
; i <
100
; i++) {
producer.send(
"hello"
+i);;
}
producer.close();
}
}
|
下面我们将比较详细地介绍消费者的使用方式,因为这里能拓展的东西稍微多一点,下面开始使用旅程.
我们利用PulsarClient创建Consumer;接着在死循环中利用Consumer#receive方法接收消息然后进行消费.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
/**
* 初次使用Pulsar消费者,无任何封装
* @author winfun
**/
@Slf4j
public
class
FirstConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(
"pulsar://127.0.0.1:6650"
)
.build();
/**
* The subscribe method will auto subscribe the consumer to the specified topic and subscription.
* One way to make the consumer listen on the topic is to set up a while loop.
* In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed.
* If the processing logic fails, you can use negative acknowledgement to redeliver the message later.
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-topic"
)
.subscriptionName(
"my-subscription"
)
.ackTimeout(
10
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
// 死循环接收
while
(
true
){
Message<String> message = consumer.receive();
String msgContent = message.getValue();
log.info(
"接收到消息: {}"
,msgContent);
consumer.acknowledge(message);
}
}
}
|
上面我们可以看到,我们是利用死循环来保证及时消费,但是这样会导致主线程;所以下面我们可以使用Pulsar提供的MessageListener,即监听器,当消息来了,会回调监听器指定的方法,从而避免阻塞主线程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
/**
* 使用MessageListener,避免死循环代码&阻塞主线程
* @author winfun
**/
@Slf4j
public
class
SecondConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient(
"pulsar://127.0.0.1:6650"
);
/**
* If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.
*
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-topic"
)
.subscriptionName(
"my-subscription"
)
.ackTimeout(
10
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
/**
* 当接收到一个新的消息,就会回调 MessageListener的receive方法。
* 消息将会保证按顺序投放到单个消费者的同一个线程,因此可以保证顺序消费
* 除非应用程序或broker崩溃,否则只会为每条消息调用此方法一次
* 应用程序负责调用消费者的确认方法来确认消息已经被消费
* 应用程序负责处理消费消息时可能出现的异常
*/
log.info(
"接收到消息:{}"
,msg.getValue());
try
{
consumer1.acknowledge(msg);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
}).subscribe();
}
}
|
上面利用监听器来解决死循环代码和阻塞主线程问题;但是我们可以发现,每次消费都是单线程,当一个消息消费完才能进行下一个消息的消费,这样会导致消费效率非常的低; 。
如果如果追求高吞吐量,不在乎消息消费的顺序性,那么我们可以接入线程池;一有消息来就丢进线程池中,这样不但可以支持异步消费,还能保证消费的效率非常的高.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
/**
* MessageListener 内使用线程池进行异步消费
* @author winfun
**/
@Slf4j
public
class
ThirdConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
Executor executor =
new
ThreadPoolExecutor(
10
,
10
,
10
,
TimeUnit.SECONDS,
new
ArrayBlockingQueue<>(
100
)
);
PulsarClient client = PulsarUtils.createPulsarClient(
"pulsar://127.0.0.1:6650"
);
/**
* If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.
*
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-topic"
)
.subscriptionName(
"my-subscription"
)
.ackTimeout(
10
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
/**
* MessageListener还是保证了接收的顺序性
* 但是利用线程池进行异步消费后不能保证消费顺序性
*/
executor.execute(() -> handleMsg(consumer1, msg));
}).subscribe();
}
/**
* 线程池异步处理
* @param consumer 消费者
* @param msg 消息
*/
public
static
void
handleMsg(Consumer consumer, Message msg){
ThreadUtil.sleep(RandomUtil.randomInt(
3
),TimeUnit.SECONDS);
log.info(
"接收到消息:{}"
,msg.getValue());
try
{
consumer.acknowledge(msg);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
}
}
|
我们可以发现,在上面的三个例子中,如果在调用Consumer#acknowledge方法前,因为代码问题导致抛异常了,我们是没有做处理的,那么会导致消费者会一直重试没有被确认的消息.
那么我们此时需要接入Pulsar提供的死信队列:当Consumer消费消息时抛异常,并达到一定的重试次数,则将消息丢入死信队列;但需要注意的是,单独使用死信队列,Consumer的订阅类型需要是 Shared/Key_Shared;否则不会生效.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
/**
* 超过最大重试次数,进入死信队列
* @author: winfun
**/
@Slf4j
public
class
FourthConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
/**
* 如果指定了死信队列策略,但是没指定死信队列
* 死信队列:String.format("%s-%s-DLQ", topic, this.subscription)
* 这里的this.subscription为上面指定的 subscriptionName。
*
* 一般在生产环境,会将pulsar的自动创建topic功能给关闭掉,所以上线前,记得先提工单创建指定的死信队列。
*
* 重点信息:
* 如果是单单使用死信队列,subscriptionType为 Shared/Key_Shared,否则死信队列不生效。
*/
PulsarClient client = PulsarUtils.createPulsarClient(
"pulsar://127.0.0.1:6650"
);
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-topic"
)
.subscriptionName(
"my-subscription"
)
.receiverQueueSize(
100
)
.ackTimeout(
1
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Key_Shared)
.negativeAckRedeliveryDelay(
1
,TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder()
//可以指定最大重试次数,最大重试三次后,进入到死信队列
.maxRedeliverCount(
3
)
//可以指定死信队列
.deadLetterTopic(
"winfun/study/test-topic-dlq3"
)
.build())
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info(
"接收到队列「{}」消息:{}"
,msg.getTopicName(),msg.getValue());
if
(msg.getValue().equals(
"hello3"
)) {
throw
new
RuntimeException(
"hello3消息消费失败!"
);
}
else
{
try
{
consumer1.acknowledge(msg);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
}
}).subscribe();
}
}
|
死信队列一般是不做消费的,我们会关注死信队列的情况,从而作出下一步的动作。 而且,一般做消息重试,我们不希望在原Topic中做重试,这样会影响原有消息的消费进度.
那么我们可以同时使用重试队列和死信队列。 当代码抛出异常时,我们可以捕获住,然后调用Consumer#reconsumeLater方法,将消息丢入重试队列;当消息重试指定次数后还无法正常完成消费,即会将消息丢入死信队列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/**
* 重试队列
* @author winfun
**/
@Slf4j
public
class
FifthConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient(
"pulsar://127.0.0.1:6650"
);
/**
* 注意点:
* 1、使用死信策略,但是没有指定重试topic和死信topic名称
* 死信队列:String.format("%s-%s-DLQ", topic, this.subscription)
* 重试队列:String.format("%s-%s-RETRY", topic, this.subscription)
* 这里的this.subscription为上面指定的 subscriptionName。
*
* 2、是否限制订阅类型
* 同时开启重试队列和死信队列,不限制subscriptionType只能为Shared/Key_Shared;
* 如果只是单独使用死信队列,需要限制subscriptionType为Shared
*
* 3、重试原理
* 如果使用重试队列,需要保证 enableRetry 是开启的,否则调用 reconsumeLater 方法时会抛异常:org.apache.pulsar.client.api.PulsarClientException: reconsumeLater method not support!
* 如果配置了重试队列,consumer会同时监听原topic和重试topic,consumer的实现类对应是:MultiTopicsConsumerImpl
* 如果消费消息时调用了 reconsumeLater 方法,会将此消息丢进重试topic
* 如果在重试topic重试maxRedeliverCount次后都无法正确ack消息,即将消息丢到死信队列。
* 死信队列需要另起Consumer进行监听消费。
*
* 4、直接抛异常
* 如果我们不是业务层面上调用 reconsumeLater 方法来进行重试,而是代码层面抛异常了,如果subscriptionType不为Shared/Key_Shared,消息无法进入重试队列和死信队列,是当前消费者无限在原topic进行消费。
* 而如果如果subscriptionType为Shared/Key_Shared,则消息进行maxRedeliverCount次消费后,会直接进入到死信队列,此时不会用到重试队列。
* 因此,重试队列是仅仅针对 reconsumeLater 方法的,而不针对异常的重试。
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-retry-topic"
)
.subscriptionName(
"my-subscription"
)
.receiverQueueSize(
100
)
.ackTimeout(
1
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.negativeAckRedeliveryDelay(
1
,TimeUnit.SECONDS)
.enableRetry(
true
)
.deadLetterPolicy(DeadLetterPolicy.builder()
//可以指定最大重试次数,最大重试三次后,进入到死信队列
.maxRedeliverCount(
3
)
.retryLetterTopic(
"winfun/study/test-retry-topic-retry"
)
//可以指定死信队列
.deadLetterTopic(
"winfun/study/test-retry-topic-dlq"
)
.build())
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info(
"接收到队列「{}」消息:{}"
,msg.getTopicName(),msg.getValue());
if
(msg.getValue().equals(
"hello3"
)) {
try
{
consumer1.reconsumeLater(msg,
1
,TimeUnit.SECONDS);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
//throw new RuntimeException("hello3消息消费失败!");
}
else
{
try
{
consumer1.acknowledge(msg);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
}
}).subscribe();
}
}
|
关于重试机制,其实是比较有意思的,下面我们会简单分析一下源码.
1.判断是否开启重试机制,如果没有开启重试机制,则直接抛异常 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
void
reconsumeLater(Message<?> message,
long
delayTime, TimeUnit unit)
throws
PulsarClientException {
// 如果没开启重试机制,直接抛异常
if
(!
this
.conf.isRetryEnable()) {
throw
new
PulsarClientException(
"reconsumeLater method not support!"
);
}
else
{
try
{
// 当然了,reconsumeLaterAsync里面也会判断是否开启重试机制
this
.reconsumeLaterAsync(message, delayTime, unit).get();
}
catch
(Exception var7) {
Throwable t = var7.getCause();
if
(t
instanceof
PulsarClientException) {
throw
(PulsarClientException)t;
}
else
{
throw
new
PulsarClientException(t);
}
}
}
}
|
还有我们可以发现,pulsar很多方法是支持同步和异步的,而同步就是直接调用异步方法,再后调用get()方法进行同步阻塞等待即可.
2.调用 reconsumeLaterAsunc 方法,接着调用 get() 进行同步阻塞等待结果 。
1
2
3
4
5
6
7
8
9
10
11
|
public
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message,
long
delayTime, TimeUnit unit) {
if
(!
this
.conf.isRetryEnable()) {
return
FutureUtil.failedFuture(
new
PulsarClientException(
"reconsumeLater method not support!"
));
}
else
{
try
{
return
this
.doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
}
catch
(NullPointerException var6) {
return
FutureUtil.failedFuture(
new
InvalidMessageException(var6.getMessage()));
}
}
}
|
3.调用 doReconsumeLater 方法 。
我们知道,在 Pulsar 的 Consumer 中,可以支持多 Topic 监听,而如果我们加入了重试机制,默认是同个 Consumer 同时监听原队列和重试队列,所以 Consumer 接口的实现此时为 MultiTopicsConsumerImpl,而不是 ComsumerImpl。 那我们看看 MultiConsumerImpl 的 doReconsumeLater 是如何进行重新消费的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
protected
CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties,
long
delayTime, TimeUnit unit) {
MessageId messageId = message.getMessageId();
Preconditions.checkArgument(messageId
instanceof
TopicMessageIdImpl);
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)messageId;
if
(
this
.getState() != State.Ready) {
return
FutureUtil.failedFuture(
new
PulsarClientException(
"Consumer already closed"
));
}
else
{
MessageId innerId;
if
(ackType == AckType.Cumulative) {
Consumer individualConsumer = (Consumer)
this
.consumers.get(topicMessageId.getTopicPartitionName());
if
(individualConsumer !=
null
) {
innerId = topicMessageId.getInnerMessageId();
return
individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
}
else
{
return
FutureUtil.failedFuture(
new
NotConnectedException());
}
}
else
{
ConsumerImpl<T> consumer = (ConsumerImpl)
this
.consumers.get(topicMessageId.getTopicPartitionName());
innerId = topicMessageId.getInnerMessageId();
return
consumer.doReconsumeLater(message, ackType, properties, delayTime, unit).thenRun(() -> {
this
.unAckedMessageTracker.remove(topicMessageId);
});
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
protected
CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties,
long
delayTime, TimeUnit unit) {
MessageId messageId = message.getMessageId();
if
(messageId
instanceof
TopicMessageIdImpl) {
messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
}
Preconditions.checkArgument(messageId
instanceof
MessageIdImpl);
if
(
this
.getState() != State.Ready &&
this
.getState() != State.Connecting) {
this
.stats.incrementNumAcksFailed();
PulsarClientException exception =
new
PulsarClientException(
"Consumer not ready. State: "
+
this
.getState());
if
(AckType.Individual.equals(ackType)) {
this
.onAcknowledge(messageId, exception);
}
else
if
(AckType.Cumulative.equals(ackType)) {
this
.onAcknowledgeCumulative(messageId, exception);
}
return
FutureUtil.failedFuture(exception);
}
else
{
if
(delayTime < 0L) {
delayTime = 0L;
}
// 如果 retryLetterProducer 为null,则尝试创建
if
(
this
.retryLetterProducer ==
null
) {
try
{
this
.createProducerLock.writeLock().lock();
if
(
this
.retryLetterProducer ==
null
) {
this
.retryLetterProducer =
this
.client.newProducer(
this
.schema).topic(
this
.deadLetterPolicy.getRetryLetterTopic()).enableBatching(
false
).blockIfQueueFull(
false
).create();
}
}
catch
(Exception var28) {
log.error(
"Create retry letter producer exception with topic: {}"
,
this
.deadLetterPolicy.getRetryLetterTopic(), var28);
}
finally
{
this
.createProducerLock.writeLock().unlock();
}
}
// 如果 retryLetterProcuder 不为空,则尝试将消息丢进重试队列中
if
(
this
.retryLetterProducer !=
null
) {
try
{
MessageImpl<T> retryMessage =
null
;
String originMessageIdStr =
null
;
String originTopicNameStr =
null
;
if
(message
instanceof
TopicMessageImpl) {
retryMessage = (MessageImpl)((TopicMessageImpl)message).getMessage();
originMessageIdStr = ((TopicMessageIdImpl)message.getMessageId()).getInnerMessageId().toString();
originTopicNameStr = ((TopicMessageIdImpl)message.getMessageId()).getTopicName();
}
else
if
(message
instanceof
MessageImpl) {
retryMessage = (MessageImpl)message;
originMessageIdStr = ((MessageImpl)message).getMessageId().toString();
originTopicNameStr = ((MessageImpl)message).getTopicName();
}
SortedMap<String, String> propertiesMap =
new
TreeMap();
int
reconsumetimes =
1
;
if
(message.getProperties() !=
null
) {
propertiesMap.putAll(message.getProperties());
}
// 如果包含 RECONSUMETIMES,则最递增
if
(propertiesMap.containsKey(
"RECONSUMETIMES"
)) {
reconsumetimes = Integer.valueOf((String)propertiesMap.get(
"RECONSUMETIMES"
));
++reconsumetimes;
// 否则先加入「原始队列」和「原始messageId」信息
}
else
{
propertiesMap.put(
"REAL_TOPIC"
, originTopicNameStr);
propertiesMap.put(
"ORIGIN_MESSAGE_IDY_TIME"
, originMessageIdStr);
}
// 加入重试次数信息
propertiesMap.put(
"RECONSUMETIMES"
, String.valueOf(reconsumetimes));
// 加入延时时间信息
propertiesMap.put(
"DELAY_TIME"
, String.valueOf(unit.toMillis(delayTime)));
TypedMessageBuilder typedMessageBuilderNew;
// 判断是否超过最大重试次数,如果还未超过,则重新投放到重试队列
if
(reconsumetimes <=
this
.deadLetterPolicy.getMaxRedeliverCount()) {
typedMessageBuilderNew =
this
.retryLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap);
if
(delayTime > 0L) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
if
(message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
// 发送延时消息
typedMessageBuilderNew.send();
// 确认当前消息
return
this
.doAcknowledge(messageId, ackType, properties, (TransactionImpl)
null
);
}
// 先忽略
this
.processPossibleToDLQ((MessageIdImpl)messageId);
// 判断 deadLetterProducer 是否为null,如果为null,尝试创建
if
(
this
.deadLetterProducer ==
null
) {
try
{
if
(
this
.deadLetterProducer ==
null
) {
this
.createProducerLock.writeLock().lock();
this
.deadLetterProducer =
this
.client.newProducer(
this
.schema).topic(
this
.deadLetterPolicy.getDeadLetterTopic()).blockIfQueueFull(
false
).create();
}
}
catch
(Exception var25) {
log.error(
"Create dead letter producer exception with topic: {}"
,
this
.deadLetterPolicy.getDeadLetterTopic(), var25);
}
finally
{
this
.createProducerLock.writeLock().unlock();
}
}
// 如果 deadLetterProducer 不为null
if
(
this
.deadLetterProducer !=
null
) {
// 加入「原始队列」信息
propertiesMap.put(
"REAL_TOPIC"
, originTopicNameStr);
// 加入「原始MessageId」信息
propertiesMap.put(
"ORIGIN_MESSAGE_IDY_TIME"
, originMessageIdStr);
typedMessageBuilderNew =
this
.deadLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap);
// 将消息内容发往死信队列中
typedMessageBuilderNew.send();
// 确认当前消息
return
this
.doAcknowledge(messageId, ackType, properties, (TransactionImpl)
null
);
}
}
catch
(Exception var27) {
log.error(
"Send to retry letter topic exception with topic: {}, messageId: {}"
,
new
Object[]{
this
.deadLetterProducer.getTopic(), messageId, var27});
Set<MessageId> messageIds =
new
HashSet();
messageIds.add(messageId);
this
.unAckedMessageTracker.remove(messageId);
this
.redeliverUnacknowledgedMessages(messageIds);
}
}
return
CompletableFuture.completedFuture((Object)
null
);
}
}
|
分析了一波,我们可以看到和上面代码的注释描述的基本一致.
上面我们提到,当Consumer指定了重试队列,Consumer会同时监听原Topic和重试Topic,那么如果我们想多个Consumer消费重试Topic时,需要将Consumer的订阅类型指定为 Shared/Key_Shared,让重试队列支持多Consumer监听消费,提升重试队列的消费效率.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
/**
* 重试队列-Shared
* @author winfun
**/
@Slf4j
public
class
SixthConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient(
"pulsar://127.0.0.1:6650"
);
/**
* 因为如果指定了重试策略,Consumer会同时监听「原队列」和「重试队列」
* 即如果我们想「重试队列」可以让多个 Consumer 监听,从而提高消费能力,那么 Consumer 需指定为 Shared 模式。
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-retry-topic"
)
.subscriptionName(
"my-subscription"
)
.receiverQueueSize(
100
)
.ackTimeout(
1
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(
1
,TimeUnit.SECONDS)
.enableRetry(
true
)
.deadLetterPolicy(DeadLetterPolicy.builder()
//可以指定最大重试次数,最大重试三次后,进入到死信队列
.maxRedeliverCount(
3
)
.retryLetterTopic(
"winfun/study/test-retry-topic-retry"
)
//可以指定死信队列
.deadLetterTopic(
"winfun/study/test-retry-topic-dlq"
)
.build())
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info(
"接收到队列「{}」消息:{}"
,msg.getTopicName(),msg.getValue());
if
(msg.getValue().contains(
"1"
) || msg.getValue().contains(
"2"
) || msg.getValue().contains(
"3"
)) {
try
{
consumer1.reconsumeLater(msg,
1
,TimeUnit.SECONDS);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
//throw new RuntimeException("hello3消息消费失败!");
}
else
{
try
{
consumer1.acknowledge(msg);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
}
}).subscribe();
}
}
/**
* 监听重试队列-Shared订阅模式
* @author winfun
**/
@Slf4j
public
class
RetryConsumerDemo {
public
static
void
main(String[] args)
throws
PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient(
"pulsar://127.0.0.1:6650"
);
Consumer<String> deadLetterConsumer = client.newConsumer(Schema.STRING)
.topic(
"winfun/study/test-retry-topic-retry"
)
.subscriptionName(
"my-subscription2"
)
.receiverQueueSize(
100
)
.ackTimeout(
1
, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info(
"接收到队列「{}」消息:{}"
,msg.getTopicName(),msg.getValue());
try
{
consumer1.acknowledge(msg);
}
catch
(PulsarClientException e) {
e.printStackTrace();
}
}).subscribe();
}
}
|
到此,我们已经将Consmuer的几种使用方式都尝试了一遍,可以说基本包含了常用的操作;但是我们可以发现,如果我们每次新建一个Consumer都需要写一堆同样的代码,那其实挺麻烦的,又不好看;并且,现在我们大部分项目都是基于 SpringBoot 来做的,而 SpringBoot 也没有一个比较大众的Starter.
所以接下来的计划就是,自己写一个编写一个关于Pulsar的SpringBoot Starter,这个组件不会特别复杂,但是会支持 Producer 和 Cousnmer 的自动配置,并且支持 Consumer 上面提到的几个点:MessageListener 监听、线程池异步并发消费、重试机制等.
到此这篇关于学会Pulsar Consumer的使用方式的文章就介绍到这了,更多相关Pulsar Consumer 使用内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://blog.csdn.net/Howinfun/article/details/119970626 。
最后此篇关于学会Pulsar Consumer的使用方式的文章就讲到这里了,如果你想了解更多关于学会Pulsar Consumer的使用方式的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我已经开始研究 MassTransit 并正在编写将处理消息的类。当我从 Consumes 实现接口(interface)时我有四个选项:All , Selected , For和 Context .
我正在尝试找出消费者群体级别是否也有任何抵消。在 Kafka 中,Consumer Offset 是在 Consumer group 级别还是在该 consumer group 内的单个消费者? 最佳
我有一个我不理解的 java 编译器错误。看来消费者 和 Consumer(带有 T 扩展对象)在方法签名参数中不等效。请查看以下代码: import java.util.function.Consu
我在泛型方面遇到了一些麻烦,尽管找到了解决方法,但我不明白是什么阻止了我的代码编译。 我有一个显示 TreeTableView 的 JavaFX 项目:
C++11 标准定义了一个内存模型(1.7、1.10),其中包含内存排序,大致为“顺序一致”、“获取”、“消耗”、“释放”和“放松”。同样粗略地,一个程序只有在它是无种族的情况下才是正确的,如果所有
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。 1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码
我有四个当前消费者在 Amazon AWS 上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志: 18:01:46,515 [jmsContaine
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
我们有一个系统,我们希望将记录(例如联系人、客户、机会)从我们的系统推送到 SalesForce。 为此,我们使用了 ForceToolKit for .Net .我们成功地将联系人记录从我们的系统推
我怎样才能写一个方法来组合 Stream的 Consumers成单个 Consumer使用 Consumer.andThen(Consumer) ? 我的第一个版本是: Consumer combi
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我正在尝试在我的 scala play 应用程序中创建消费者 secret / key 对,但我似乎无法让它正常工作。我有以下代码 import org.apache.commons.codec.bi
我通过传递用户(消费者)名称使用 .NET 应用程序,我需要从 Salesforce 检索消费者 key 和消费者 key ,我该如何实现。 最佳答案 Consumer Key 和 Consumer
我想设置 至 0 .这似乎是另一个问题 ( JMS queue with multiple consumers ) 的答案,并在此 article 中进行了描述。在第 17.1.1 章中。我使用 JN
I have send message api to my users.When I send to message from my x numbers I need to wait 10-15
我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下- while (true) { ConsumerRecords records
我正在为 iPhone 编写 Twitter/Facebook 应用程序。我有自己的 Apache/PHP 服务器。我只想把Consumer Key放在app里,然后我把Consumer Secret
Spring AMQP:比较多个消费者与每个消费者多个线程的性能 我正处于从 Spring 文档学习 Spring AMQP 的阶段。我不清楚提高异步消息消费率的首选方法:根据 Spring 文档 (
我正在制作一个需要 oAuth 1.0 身份验证的应用程序。我可以访问客户提供的消费者 key 和消费者 secret 。我曾尝试使用 AFNetworking 进行此操作,但效果不佳。有人可以建议我
我是一名优秀的程序员,十分优秀!