gpt4 book ai didi

java - 使用 ActiveMQ 发送 JSON 文件

转载 作者:太空宇宙 更新时间:2023-11-04 14:43:45 25 4
gpt4 key购买 nike

我正在编写一个使用 ActiveMQ 发送 JSON 文件的 JAVA 应用程序。我从这个开源下载了库:http://activemq.apache.org/download.html

应该发生的情况是,我发送 JSON 文件的地方应该使用它,并且屏幕上应该显示响应。我有另一个可以运行的应用程序,但我想要另一个可以打印到命令行的应用程序。然而,根据我所写的内容,这种情况不会发生。看起来我的代码确实连接到了某个 URL,但我不确定我是否正确执行了操作,因为这是我第一次使用它。有人可以确认我是否正确执行此操作或者我是否遗漏了某些内容?

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
//import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.MessageProducer;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
//import java.util.Iterator;

//import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

public class SimRunner {

static String topic = null;
static String url = null;
static String msg = null;
static String jsonFile = null;

public static void main(String[] args) throws Exception {

JSONParser parser = new JSONParser();

url = "tcp://localhost:45125";
jsonFile = "c:\\example\\file.json";
System.out.println("ActiveMQ url: " + url);
System.out.println("JSON File: " + jsonFile);

//read JSON file and parse/set topic
try {
Object obj = parser.parse(new FileReader(jsonFile));

JSONObject jsonObject = (JSONObject) obj;
msg = jsonObject.toJSONString().replace("\\/", "/");

topic = ((String) jsonObject.get("topic"));
System.out.println("Message queue/topic: " + topic);
System.out.println("Message (from file): " + msg);

} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}

thread(new Producer(), false);
thread(new Consumer(), false);
Thread.sleep(1000);
}

public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}

public static class Producer implements Runnable {
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();

// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination (Topic or Queue)
Destination destination = session.createQueue(topic);

// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// Create a message
TextMessage message = session.createTextMessage(msg);

// Tell the producer to send the message
producer.send(message);
System.out.println("Sent: "+ message.getText());

// Clean up
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Error occured in Producer. " + e);
e.printStackTrace();
}
}
}

public static class Consumer implements Runnable, ExceptionListener {
public void run() {
try {

// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();

connection.setExceptionListener(this);

// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination (Topic or Queue)
Destination destination = session.createQueue(topic);

// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);

// Wait for a message
Message message = consumer.receive(1000);

if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}

consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}

public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
}

}
}

最佳答案

尚不完全清楚您在做什么,但听起来您正在尝试使用单个队列向两个客户端接收相同的消息,这是行不通的。队列仅将消息分派(dispatch)给一个客户端,因此如果您有两个订阅的队列使用者,他们将共享消息负载,而不是每个都处理同一组消息。如果您希望有一个监听器可以查看发送到队列的每条消息,但不会中断正常的队列操作,请尝试使用 mirrored queues在 ActiveMQ 中。

关于java - 使用 ActiveMQ 发送 JSON 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24677906/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com