gpt4 book ai didi

java - 使用 cloudhopper 发送异步 PDU 响应

转载 作者:行者123 更新时间:2023-11-30 09:05:51 29 4
gpt4 key购买 nike

我有一个项目使用 Cloudhopper 5.0.6 库来保持 SMPP 连接(3.4 版本)并发送或接收 PDU。我需要修改默认 PDUResopnse,因此,自定义 PDU 处理是通过以下方式扩展 DefaultSmppSessionHandler 来组织的:

 public class SmppSessionHandlerController extends DefaultSmppSessionHandler {

@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
PduRequestHandler pduReqHandler = pduRequestHandler;
PduResponse resultPduResponse = pduRequest.createResponse();
return processDefaultPduResponse(resultPduResponse);
}

private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
//do some transformations here on pduResponse...
return pduResponse;
}
}

它仅适用于以下目的:

  1. 更改结果命令状态或一些 pdu 字段/tlv 参数
  2. 不对当前的 PDU 请求发送任何响应。为此,firePduRequestReceived 方法必须返回 null

现在我需要添加延迟的 PDU 响应发送,问题就在这里开始了。我的第一次尝试是这样的:

    @Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
PduRequestHandler pduReqHandler = pduRequestHandler;
PduResponse resultPduResponse = pduRequest.createResponse();
return processDefaultPduResponse(resultPduResponse);
}

private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
try {
Thread.sleep(responseDelay);
} catch (InterruptedException e) {
throw new RuntimeException("Response delay interrupted", e);
}
return pduResponse;
}

添加了当前线程的 hibernate 以延迟发送响应,因此调用线程被保留 responseDelay 毫秒。如果此 session 没有同时发出更多请求,则此方法可以正常工作。在同一 session 中添加一些 submit_sm 负载导致错误:

com.cloudhopper.smpp.type.SmppTimeoutException: Unable to get response within [10000 ms]
at com.cloudhopper.smpp.impl.DefaultSmppSession.sendRequestAndGetResponse(DefaultSmppSession.java:471) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.cloudhopper.smpp.impl.DefaultSmppSession.enquireLink(DefaultSmppSession.java:439) ~[ch-smpp-5.0.6.jar:5.0.6]

在 coudhopper 源中搜索后我发现了问题,它是 DefaultSmppSession 类中任何操作的独占窗口锁定:

 future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutMillis, configuration.getRequestExpiryTimeout(), synchronous);

问题出在 com.cloudhopper.commons.util.windowing.Window 类中,该类使用独占锁来执行任何操作,因此不可能在一个线程中返回 PRUResponse 并发出请求之前等待来自另一个。

接下来的尝试是返回 null 作为请求处理(丢弃请求而不发送任何响应)并使用 com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse) 方法。这种方法工作了一段时间,但总是以以下异常结束:

com.cloudhopper.smpp.type.SmppChannelException: null
at com.cloudhopper.smpp.impl.DefaultSmppSession.sendResponsePdu(DefaultSmppSession.java:581) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.svzn.autotest.smppclient.impl.cloudhopper.SmppSendingManager.sendPduResponse(SmppSendingManager.java:84) ~[smpp-client-1.0.1.jar:na]
at com.svzn.autotest.smppclient.impl.cloudhopper.util.SendPduCommand.sendPduResponse(SendPduCommand.java:80) [smpp-client-1.0.1.jar:na]
at com.svzn.autotest.smppclient.impl.cloudhopper.SmppClientImpl.sendPduResponse(SmppClientImpl.java:91) [smpp-client-1.0.1.jar:na]
at com.svzn.autotest.example.testng_aggr.lib.smpp.event.BaseEventProcessor$1.run(BaseEventProcessor.java:62) [test-classes/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_37]
at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) [na:1.6.0_37]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
Caused by: org.jboss.netty.handler.timeout.WriteTimeoutException: null
at org.jboss.netty.handler.timeout.WriteTimeoutHandler.<clinit>(WriteTimeoutHandler.java:79) ~[netty-3.9.0.Final.jar:na]
at com.cloudhopper.smpp.impl.DefaultSmppClient.createSession(DefaultSmppClient.java:259) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.cloudhopper.smpp.impl.DefaultSmppClient.doOpen(DefaultSmppClient.java:226) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.cloudhopper.smpp.impl.DefaultSmppClient.bind(DefaultSmppClient.java:193) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.svzn.autotest.smppclient.impl.cloudhopper.tasks.RebindTask.run(RebindTask.java:37) ~[smpp-client-1.0.1.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) [na:1.6.0_37]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoo

lExecutor.java:204) [na:1.6.0_37

]
... 3 common frames omitted

不知道如何修复此错误或以其他方式在同一 session 中发送异步 PDUResponse。你对此有什么想法吗?

最佳答案

终于找到问题了。问题出在不正确的同步块(synchronized block)中,它以通常的方式阻止了并行异步事件处理(发送 pdu 响应)以及处理请求和响应而没有处理。

完全可以调用com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse)一个线程中的方法,并通过从另一个线程扩展 DefaultSmppSessionHandler 来处理请求和响应。一切都将在同一个 session 中处理。

更新:这是我用来处理 pdu 请求的实现:

public class SmppSessionHandlerController extends DefaultSmppSessionHandler {
private static final Logger log = LoggerFactory.getLogger(SmppSessionHandlerController.class);

private volatile PduHandler pduHandler;
private PduResponseHandler pduResponseHandler;
private PduRequestHandler pduRequestHandler;

public SmppSessionHandlerController() {
super(log);
}

public PduHandler getPduHandler() {
return pduHandler;
}

public void setPduHandler(PduHandler pduHandler) {
this.pduHandler = pduHandler;
}

public PduResponseHandler getPduResponseHandler() {
return pduResponseHandler;
}

public void setPduResponseHandler(PduResponseHandler pduResponseHandler) {
this.pduResponseHandler = pduResponseHandler;
}

public PduRequestHandler getPduRequestHandler() {
return pduRequestHandler;
}

public void setPduRequestHandler(PduRequestHandler pduRequestHandler) {
this.pduRequestHandler = pduRequestHandler;
}

@Override
public void fireExpectedPduResponseReceived(PduAsyncResponse pduAsyncResponse) {
log.trace("Handling response PDU: {}", pduAsyncResponse);
pduAsyncResponse.getResponse().setReferenceObject(pduAsyncResponse.getRequest().getReferenceObject());
processPduResponse(pduAsyncResponse.getResponse());
}


@Override
public void fireUnexpectedPduResponseReceived(PduResponse pduResponse) {
log.warn("Handling unexpected response PDU: {}", pduResponse);
processPduResponse(pduResponse);
}

@Override
public boolean firePduReceived(Pdu pdu) {
PduHandler currPduHandler = pduHandler;
if (currPduHandler != null) {
SmppPdu smppPdu = PduToApiConverter.convertToApiObject(pdu);
currPduHandler.handlePduReceived(smppPdu);
}
// default handling is to accept pdu for processing up chain
return true;
}

public void firePduRequestExpired(PduRequest pduRequest) {
super.firePduRequestExpired(pduRequest);
}

private void processPduResponse(PduResponse pduResponse) {
HandlersContextHelper referenceObj = (HandlersContextHelper) pduResponse.getReferenceObject();
if (referenceObj != null) {
referenceObj.getSequenceIdHolder().addReceivedSequenceId(pduResponse.getSequenceNumber());
}
PduResponseHandler pduRespHandler = pduResponseHandler;
if (pduRespHandler != null) {
SmppPduResponse smppPduResponse = PduToApiConverter.convertToApiResponse(pduResponse);
if (smppPduResponse != null) {
pduRespHandler.handlePduResponse(smppPduResponse);
}
}
if (referenceObj != null) {
referenceObj.getSequenceIdHolder().checkSentAndReceivedClosed();
}
}

@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
PduRequestHandler pduReqHandler = pduRequestHandler;
PduResponse resultPduResponse = pduRequest.createResponse();
if (pduReqHandler == null) {
return resultPduResponse;
}
PduResponse defaultPduResponse = pduRequest.createResponse();
SmppPduRequest smppPduRequest = PduToApiConverter.convertToApiRequest(pduRequest);
SmppPduResponse defaultSmppPduResponse = PduToApiConverter.convertToApiResponse(defaultPduResponse);
if (smppPduRequest == null || defaultSmppPduResponse == null) {
return resultPduResponse;
}
SmppPduResponse resultSmppPduResponse = pduReqHandler.handlePduRequest(smppPduRequest, defaultSmppPduResponse);
if (resultSmppPduResponse == null) {
return null;
}
PduResponse convertedPduResponse = ApiToPduConverter.convertToPduResponse(resultSmppPduResponse);
if (convertedPduResponse == null) {
return resultPduResponse;
}
if (!resultPduResponse.getClass().isAssignableFrom(convertedPduResponse.getClass())) {
return resultPduResponse;
}
return convertedPduResponse;
}
}

Wich 是这样添加到 clowdhopper smpp 客户端的

SmppSession session = smppClient.bind(SmppSessionConfiguration_instance, SmppSessionHandlerController_instance );

我为 PduHandler PduRequestHandler 和 PduResponseHandler 定义了自定义接口(interface),它们处理 smpp 事件的特殊情况,您可以看到 SmppSessionHandlerController 只是将调用委托(delegate)给其中之一。

使用方法

public PduResponse firePduRequestReceived(PduRequest pduRequest) 

defiend 在 SmppSessionHandler 中,您可以在同步模式下发送您想要的任何响应。如果您想在异步模式下执行此操作,请返回 null pduResponse 并使用 SmppSession.sendResponsePdu(Pdu) 而不是当前或任何其他线程。

关于java - 使用 cloudhopper 发送异步 PDU 响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24702356/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com