gpt4 book ai didi

apache-spark - Websphere MQ 作为 Apache Spark Streaming 的数据源

转载 作者:行者123 更新时间:2023-12-03 07:23:19 25 4
gpt4 key购买 nike

我正在深入研究 Websphere MQ 作为 Spark-Streaming 数据源的可能性,因为我们的一个用例需要它。我知道MQTT是支持 MQ 数据结构通信的协议(protocol),但由于我是 Spark Streaming 的新手,我需要一些相同的工作示例。有人尝试用 Spark Streaming 连接 MQ 吗?请设计最佳方法。

最佳答案

所以,我在这里发布 CustomMQReceiver 的工作代码,它连接 Websphere MQ 并读取数据:

public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;

Enumeration enumeration =null;

public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;

}

public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}

/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");

try {

JMSMessage receivedMessage= null;

while (!isStopped() && enumeration.hasMoreElements() )
{

receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}

// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");

stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");

}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}

public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);


qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();

enumeration= browser.getEnumeration();
}

@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}

关于apache-spark - Websphere MQ 作为 Apache Spark Streaming 的数据源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30434555/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com