- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我刚刚开始使用 RxJava2,想知道如何才能正确实现 UDP observable。
我已经有了一些工作代码,但我认为可能存在一些问题:请参阅下面源代码注释中的 4 个问题。
我还在 GitHub 上发布了代码 RxJava2_Udp :欢迎评论、问题和拉取请求。
class UdpObservable {
private static class UdpThread extends Thread {
private final int portNo;
private final int bufferSizeInBytes;
private final ObservableEmitter<DatagramPacket> emitter;
private DatagramSocket udpSocket;
private UdpThread(@NonNull ObservableEmitter<DatagramPacket> emitter
, int portNo, int bufferSizeInBytes) {
this.emitter = emitter;
this.portNo = portNo;
this.bufferSizeInBytes = bufferSizeInBytes;
}
@Override
public void run() {
try {
// we don't want to create the DatagramSocket in the constructor, because this
// might raise an Exception that the observer wants to handle
udpSocket = new DatagramSocket(portNo);
try {
/* QUESTION 1:
Do I really need to check isInterrupted() and emitter.isDisposed()?
When the thread is interrupted an interrupted exception will
be raised anyway and the emitter is being disposed (this is what
caused the interruption)
*/
while (!isInterrupted() && !emitter.isDisposed()) {
byte[] rcvBuffer = new byte[bufferSizeInBytes];
DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
udpSocket.receive(datagramPacket);
// QUESTION 1a: same as QUESTION 1 above
if (!isInterrupted() && !emitter.isDisposed()) {
emitter.onNext(datagramPacket);
}
}
} finally {
closeUdpSocket();
}
} catch (Throwable th) {
// the thread will only be interrupted when the observer has unsubscribed:
// so we need not report it
if (!isInterrupted()) {
if (!emitter.isDisposed()) {
emitter.onError(th);
} else {
// QUESTION 2: is this the correct way to handle errors, when the emitter
// is already disposed?
RxJavaPlugins.onError(th);
}
}
}
}
private void closeUdpSocket() {
if (!udpSocket.isClosed()) {
udpSocket.close();
}
}
@Override
public void interrupt() {
super.interrupt();
// QUESTION 3: this is called from an external thread, right, so
// how can we correctly synchronize the access to udpSocket?
closeUdpSocket();
}
}
/**
* creates an Observable that will emit all UDP datagrams of a UDP port.
* <p>
* This will be an infinite stream that ends when the observer unsubscribes, or when an error
* occurs. The observer does not handle backpressure.
* </p>
*/
public static Observable<DatagramPacket> create(final int portNo, final int bufferSizeInBytes) {
return Observable.create(
new ObservableOnSubscribe<DatagramPacket>() {
@Override
public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
final UdpThread udpThread = new UdpThread(emitter, portNo, bufferSizeInBytes);
/* QUESTION 4: Is this the right way to handle unsubscription?
*/
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
udpThread.interrupt();
}
});
udpThread.start();
}
}
);
}
}
最佳答案
Schedulers
应该为你做。ObservableOnSubscribe
执行的代码将根据您的 Scheduler
策略在一个线程中运行,因此您不需要自己构建它。只需在 create 中执行 ude while 循环即可。Thread.interrupt()
方法,当您处理(取消订阅)Observable
时,RxJava 会为您完成。 (当然是在while循环之前设置cancelable
)关于您的问题:
您不需要检查中断,因为异常会如果你正在等待 io 操作,你也不需要检查处置,因为 onNext()
会为您完成并将不发出取消订阅。
您可以再次调用 onError
,发射器将负责检查 Observable
是否已取消订阅。
emitter.setCancellable
方法。 (关闭流),这发生在您的代码运行的同一线程上。emitter.setCancellable
方法关于android - RxJava2如何观察UDP包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42326667/
我想知道是否有一种方法可以重复记录而不进行排序?有时候,我想保持原始顺序,只想删除重复的记录。 是否可以? 顺便说一句,以下是我所知道的有关重复记录的信息,这些记录最终会进行排序。 1。 proc s
我想更新我的 Activity 中依赖于另一个列表的数据的列表。这两个数据列表都是从我的 View 模型的 Activity 中观察到的。从第一个列表获取数据后,我需要在此列表上运行 for 循环以获
我无法理解这个问题。我怎样才能等待 i==2 完成然后再继续其他 i 的操作? class Observable { constructor() { this.observer
我正在观察这样的 Ember Data RecordArray: myArray: function() { return MyRecord.find(); }.property(), isDir
我想在动画开始时观察 strokeEnd 键路径。但是它不起作用,我哪里出错了? - (void)addAnimation { // do animation CABasicAnima
是否可以在 Algorand 中观看某个交易,就像在以太坊中观看某个事件一样? 最佳答案 官方 algod 和 indexer API 目前不支持在 Algorand 上观看交易/事件。 您可以通过使
我有一个可以拖放到其他 View 之上的 View (可以说是类别)。为了检测我在哪个类别 View 之上,我将它们的帧存储在一个帧数组中,这发生在它们不可见叠加层的 onAppear 中。 (这基于
是否可以将观察者添加到可见性更改(即调用 show() 和 hide())时触发的 DOM 元素?谢谢! 最佳答案 如果您想观察任何对 .show() 或 .hide() 的调用,并且可以访问 jQu
我对保存在 NSUserdefaults 中的特定键的值变化感兴趣。然而,我所拥有的并不适合我。 observeValueForKeyPath 不会被触发。 更新:我想我已经发现了这个问题。如果我使用
我正在寻找在 UITableView 顶部实现捏入/捏出,我已经研究了几种方法,包括这个: Similar question 但是,虽然我可以创建一个 UIViewTouch 对象并将其覆盖到我的 U
我有一个在界面中公开的可变数组。我还公开了数组访问器来修改数组。如果数组内发生任何修改,我将不得不使用 KVO 重置并重新计算一些数据。为了支持 KVO,我使用 array accessors如下图:
当 NSPopupButton 发生变化时如何获得方法调用?谢谢! 最佳答案 您只需添加一个操作方法,就像使用 NSButton 或任何其他控件一样。 关于iphone - 观察 NSPopupBut
我正在尝试让键值观察适用于 NSMutableArray。下面是被观察类 MyObservee 的 .h 文件: @interface MyObservee : NSObject { @pri
我很难理解让 Node.js 进程(异步)运行但仍然触发“退出”状态,以便在 CPU 处理完成后我可以做更多事情。 例如,我有一个 Google 地方信息抓取工具,可以在所有可用的 CPU 上高效地分
我正在尝试编写行为类似于kubectl get pods --watch . 这样,每次 pod 的状态发生变化时,我都会被触发。 我创建了一个 go项目(在集群中运行)并添加以下代码: podsWa
我有这个代码: 当时我需要触发Javascript方法或具有给定 id 的 div 隐藏或显示,这将在屏幕调整大小期间发生(因此 u k-hidden-small ),这可以
我想使用 Couchbase,但我想在一些类似于 RethinkDB 的方式实现更改跟踪。 似乎有很多方法可以将更改从 Couchbase 服务器推送给我。 DCP 点击 XDCR 哪一个是正确的选择
虽然 MutationObserver 允许监视 HTMLElement 属性的显式大小更改,但它似乎没有一种方法/配置允许我监视其大小的隐式更改,这些更改是由浏览器。 这是一个例子: const o
我有一个 auto-carousel 指令,它循环访问链接元素的子元素。 但是,子级尚未加载到 DOM 中,因为它们的 ng-if 表达式尚未解析。 如何确保父指令知道其 DOM 树已发生更改?
有没有办法观察 AngularJS 指令中函数表达式的值变化?我有以下 HTML 和 JavaScript,模板中 {{editable()}} 的插值显示该值计算为 true,而检查 Chrome
我是一名优秀的程序员,十分优秀!