gpt4 book ai didi

java - 在 RabbitMQ 中使用单 channel 与单线程执行器通信可以吗?

转载 作者:行者123 更新时间:2023-11-30 06:39:52 35 4
gpt4 key购买 nike

我正在尝试使用 RabbitMQ-java 客户端 API 与 RabbitMQ 服务器进行交互。我从 java client api guide 读到:

As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

我正在尝试使用 corePoolSize 1 的 ThreadPoolExecutor 并添加可运行任务以将消息保存在 RabbitMQ 队列中。这是我正在使用的代码:

package common;

import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.JsonObject;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitMQUtil {
private static Logger log= LoggerFactory.getLogger("logger");
private static RabbitMQUtil gmInstance;
private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000));
private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties";
private final Properties properties = new Properties();
private String host = null;
private int port = 0;
private String username = null;
private String password = null;
private String useSSL = "false";
private ConnectionFactory factory;
private Connection connection;
private Channel channel;

private RabbitMQUtil() throws IOException, TimeoutException, Exception {
try {
InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME);
if(stream != null) {
properties.load(stream);
}
} catch (Exception ex) {
log.error("Exception while loading the rabbitmq properties file:", ex);
}

host = properties.getProperty("rabbitmq.host", "localhost");
port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672"));
username = properties.getProperty("rabbitmq.username", "guest");
password = properties.getProperty("rabbitmq.password", "guest");
useSSL = properties.getProperty("rabbitmq.usessl", "false");

factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
if("true".equalsIgnoreCase(useSSL)) {
try {
factory.useSslProtocol();
} catch (KeyManagementException | NoSuchAlgorithmException e) {
log.error("Exception while applying the tls for rabbitmq:", e);
}
}
connection = factory.newConnection();
connection.addBlockedListener(new RabbitMQBlockedListener());
connection.addShutdownListener(new RabbitMQShutDownListener());

channel = connection.createChannel();
}

public static RabbitMQUtil getInstance() {
if(gmInstance == null) {
synchronized (RabbitMQUtil.class) {
if(gmInstance == null) {
try {
gmInstance = new RabbitMQUtil();
} catch (IOException | TimeoutException e) {
log.error("Exception in getInstance:", e);
} catch (Exception e) {
log.error("Exception in getInstance:", e);
}
}
}
}
return gmInstance;
}

public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) {
log.info("data to be saved for :"+queueName+" is:"+obj.toString());
}

public void saveMsgInQueue(JsonObject gson, String queueName) {
this.executor.execute(new RabbitMQData(gson, queueName));
}

private class RabbitMQBlockedListener implements BlockedListener {
@Override
public void handleBlocked(String arg0) throws IOException {
log.warn("blocked listener called:", arg0);
}

@Override
public void handleUnblocked() throws IOException {
log.warn("unblocked listener called:");
}
}

private class RabbitMQShutDownListener implements ShutdownListener {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
log.error("Shutdown event listener called:", cause);
log.error("shutdown event listener:"+cause.isHardError());
}
}

private class RabbitMQData implements Runnable{
JsonObject obj;
String queueName;
public RabbitMQData(JsonObject obj, String queueName) {
Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName);
this.obj = obj;
this.queueName = queueName;
}

@Override
public void run() {
try {
channel.queueDeclare(this.queueName, true, false, false, null);
channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes());
} catch (Exception e) {
log.info("Error while running the scheduled rabbitmq task:", e);
log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString());
}
}
}

public static void saveRabbitMQData(JsonObject obj, String queueName) {
RabbitMQUtil util = RabbitMQUtil.getInstance();
if(util != null)
util.saveMsgInQueue(obj, queueName);
else
RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName);
}
}

我想知道以下事情:

  1. 当仅使用 1 个线程的线程池时,使用单个 channel 是否可以?
  2. 当触发阻塞/解除阻塞和关闭事件时,应如何处理连接和 channel 对象?尽管当 RabbitMQ 服务器再次启动时,API 会自动建立连接。

如有任何其他反馈,我们将不胜感激。

谢谢

最佳答案

1.- Is is fine to use a single channel when a threadpool of only 1 thread is used ?

是的,没问题。这就是你应该这样做的方式。只有一个线程必须使用 Channel 实例。否则,确认可能会丢失(请参阅此处:https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.html)

2.- How should connection and channel objects be handled when blocked/unblocked and shutdown events are triggered ? Although the API automatically establishes connection when RabbitMQ server is up again.

当应用程序关闭时,您应该关闭 channel ,然后关闭与 RabbitMQ 的连接。

    channel.close();
conn.close();

关于阻止/解除阻止,请阅读此处 ( https://www.rabbitmq.com/api-guide.html ):

Callbacks to Consumers are dispatched in a thread pool separate from the thread that instantiated its Channel. This means that Consumers can safely call blocking methods on the Connection or Channel, such as Channel#queueDeclare or Channel#basicCancel.

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

关于java - 在 RabbitMQ 中使用单 channel 与单线程执行器通信可以吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44560031/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com