gpt4 book ai didi

apache-spark - Spark 流 : Custom Receiver : Data source : Websphere Message Queue

转载 作者:行者123 更新时间:2023-12-04 04:58:52 31 4
gpt4 key购买 nike

我正在尝试在 Spark 流中为 WSMQ 数据源实现客户接收器。我遵循了提供的示例 here .

后来我仿照 this Github repository 的例子.

我遇到了三个问题:

1:错误(程序运行一段时间后出现此错误)

java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
  1. 即使我在创建 session 时使用了这段代码,程序也不会从 WSMQ 中删除消息

    MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
  2. 我需要实现在 Custom Receiver Spark API 中解释的可靠的 Receiver。它说:

    要实现可靠的接收器,您必须使用 store(multiple-records) 来存储数据。这种存储方式是一种阻塞调用,只有在所有给定记录都存储在 Spark 中后才会返回。如果接收方配置的存储级别使用复制(默认启用),则此调用将在复制完成后返回。因此它确保数据被可靠地存储,并且接收者现在可以适本地确认来源。这确保当接收方在复制数据的过程中发生故障时不会产生任何数据——缓冲的数据将不会被确认,因此稍后将由源重新发送。

我不知道如何处理存储(多记录)?

我不知道为什么会发生这些错误,也不知道如何实现可靠的 Receiver

代码如下:

public class JavaConnector extends Receiver<String> {

String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
String topic=null;

Enumeration enumeration =null;
private static MQQueueReceiver receiver = null;


public JavaConnector(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;


}

public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
}.start();
}

public void onStop() {

// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false

}

/** Create a MQ connection and receive data until receiver is stopped */
private void receive() throws InterruptedException {
System.out.print("Started receiving messages from MQ");


try {

JMSTextMessage receivedMessage= null;
int cnt =0;

//JMSTextMessage receivedMessage = (JMSTextMessage) receiver.receive(10000);

boolean flag=false;
while (!isStopped() && enumeration.hasMoreElements()&&cnt<50 )
{

receivedMessage= (JMSTextMessage) enumeration.nextElement();
receivedMessage.acknowledge();
String userInput = receivedMessage.getText();

ArrayList<String> list = new ArrayList<String>();
list.add(userInput);
Iterator<String> itr = list.iterator();
store(itr);
cnt++;

}
/*while (!isStopped() && receivedMessage !=null)
{

// receivedMessage= (JMSTextMessage) enumeration.nextElement();
String userInput = receivedMessage.getText();

store(userInput);
receivedMessage.acknowledge();

}*/

// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");

stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");

}
catch(Exception e)
{ Thread.sleep(100);
System.out.println("WRONG"+e.toString());
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
Thread.sleep(100);
System.out.println("WRONG-1"+t.toString());
// restart if there is any other error
restart("Error receiving data", t);
}



}

public void initConnection() throws JMSException,InterruptedException {
try {
MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
conFactory.setMsgBatchSize(100);


qCon = (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
MQQueue queue = (MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
//receiver = (MQQueueReceiver) qSession.createReceiver(queue);
enumeration= browser.getEnumeration();


} catch (Exception e) {
Thread.sleep(1000);
}
}

@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}

最佳答案

终于解决了这个问题。解决方案 1:Steaming 上下文尝试写入 Kafka,因为 kafka 已关闭并且它给我 IO 错误。我真傻。 :)

解决方案 2:我应该使用 MessageListener,QueueBrowser 用于读取消息它实际上并不使用消息。

关于apache-spark - Spark 流 : Custom Receiver : Data source : Websphere Message Queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35095286/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com