gpt4 book ai didi

android - RxJava2如何观察UDP包?

转载 作者:行者123 更新时间:2023-11-29 17:03:04 26 4
gpt4 key购买 nike

我刚刚开始使用 RxJava2,想知道如何才能正确实现 UDP observable。
我已经有了一些工作代码,但我认为可能存在一些问题:请参阅下面源代码注释中的 4 个问题。

我还在 GitHub 上发布了代码 RxJava2_Udp :欢迎评论、问题和拉取请求。

class UdpObservable {

private static class UdpThread extends Thread {
private final int portNo;
private final int bufferSizeInBytes;
private final ObservableEmitter<DatagramPacket> emitter;
private DatagramSocket udpSocket;

private UdpThread(@NonNull ObservableEmitter<DatagramPacket> emitter
, int portNo, int bufferSizeInBytes) {
this.emitter = emitter;
this.portNo = portNo;
this.bufferSizeInBytes = bufferSizeInBytes;
}

@Override
public void run() {
try {
// we don't want to create the DatagramSocket in the constructor, because this
// might raise an Exception that the observer wants to handle
udpSocket = new DatagramSocket(portNo);
try {
/* QUESTION 1:
Do I really need to check isInterrupted() and emitter.isDisposed()?

When the thread is interrupted an interrupted exception will
be raised anyway and the emitter is being disposed (this is what
caused the interruption)
*/
while (!isInterrupted() && !emitter.isDisposed()) {
byte[] rcvBuffer = new byte[bufferSizeInBytes];
DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
udpSocket.receive(datagramPacket);
// QUESTION 1a: same as QUESTION 1 above
if (!isInterrupted() && !emitter.isDisposed()) {
emitter.onNext(datagramPacket);
}
}
} finally {
closeUdpSocket();
}
} catch (Throwable th) {
// the thread will only be interrupted when the observer has unsubscribed:
// so we need not report it
if (!isInterrupted()) {
if (!emitter.isDisposed()) {
emitter.onError(th);
} else {
// QUESTION 2: is this the correct way to handle errors, when the emitter
// is already disposed?
RxJavaPlugins.onError(th);
}
}
}
}

private void closeUdpSocket() {
if (!udpSocket.isClosed()) {
udpSocket.close();
}
}

@Override
public void interrupt() {
super.interrupt();
// QUESTION 3: this is called from an external thread, right, so
// how can we correctly synchronize the access to udpSocket?
closeUdpSocket();
}
}

/**
* creates an Observable that will emit all UDP datagrams of a UDP port.
* <p>
* This will be an infinite stream that ends when the observer unsubscribes, or when an error
* occurs. The observer does not handle backpressure.
* </p>
*/
public static Observable<DatagramPacket> create(final int portNo, final int bufferSizeInBytes) {
return Observable.create(
new ObservableOnSubscribe<DatagramPacket>() {
@Override
public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
final UdpThread udpThread = new UdpThread(emitter, portNo, bufferSizeInBytes);
/* QUESTION 4: Is this the right way to handle unsubscription?
*/
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
udpThread.interrupt();
}
});
udpThread.start();
}
}
);
}

}

最佳答案

  • 一般来说,我认为这不是创建它的正确方法,你不应该自己创建线程,因为 RxJava 和它的 Schedulers 应该为你做。
    考虑到在 ObservableOnSubscribe 执行的代码将根据您的 Scheduler 策略在一个线程中运行,因此您不需要自己构建它。只需在 create 中执行 ude while 循环即可。
  • 您不需要调用Thread.interrupt() 方法,当您处理(取消订阅)Observable 时,RxJava 会为您完成。 (当然是在while循环之前设置cancelable)

关于您的问题:

  1. 您不需要检查中断,因为异常会如果你正在等待 io 操作,你也不需要检查处置,因为 onNext() 会为您完成并将不发出取消订阅。

  2. 您可以再次调用 onError,发射器将负责检查 Observable 是否已取消订阅。

  3. 如前所述,应该没有Thread,但是对于资源清理,您可以使用emitter.setCancellable 方法。 (关闭流),这发生在您的代码运行的同一线程上。
  4. 之前回答过,Thread.interrput() 会被 RxJava 处理/取消订阅,资源清理应该转到 emitter.setCancellable 方法

关于android - RxJava2如何观察UDP包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42326667/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com