- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我是 RxJava2 新手,不确定我是否正确使用它。
我有几个从 TCP 流读取数据的输入流,并将数据读入 byte[]
。然后,我采用 byte[]
并为每个 InputStream
创建一个 Flowable
,并将各个 Flowable
合并为一个。
我希望能够关闭 1 个 InputStream
,但仍然让合并的 Flowable
继续从非关闭的 InputStream
读取。
目前,我可以正确地从 2 个 InputStream 读取数据,但是当我调用 close
时,会导致抛出异常,java.net.SocketException: Socket closeed
。
我的问题是,如何正确关闭InputStream或Flowable,以便我可以同时继续从另一个InputStream读取数据。
InputStream inputStream = response.body().asInputStream();
InputStream inputStream2 = response2.body().asInputStream();
Flowable<byte[]> flowable = Bytes.from(inputStream)
.distinct();
Flowable<byte[]> flowable2 = Bytes.from(inputStream2)
.distinct();
Flowable.merge(flowable.subscribeOn(Schedulers.newThread()),
flowable2.subscribeOn(Schedulers.newThread()))
.subscribe(s -> System.out.println("Data: " + new String(s)),
e -> System.out.println("Error: " + e.getLocalizedMessage() + " : " + e.getMessage()),
() -> System.out.println("Complete!"));
//Calling this results in an exception being thrown.
inputSteam2.close();
堆栈跟踪:
io.reactivex.exceptions.UndeliverableException: java.net.SocketException: Socket closed
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:604)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:665)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onError(FlowableSubscribeOn.java:102)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.onError(FlowableGenerate.java:189)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.request(FlowableGenerate.java:114)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.requestUpstream(FlowableSubscribeOn.java:133)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onSubscribe(FlowableSubscribeOn.java:90)
at io.reactivex.internal.operators.flowable.FlowableGenerate.subscribeActual(FlowableGenerate.java:52)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.Flowable.subscribe(Flowable.java:12932)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at okio.Okio$2.read(Okio.java:140)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:238)
at okio.RealBufferedSource.read(RealBufferedSource.java:45)
at okhttp3.internal.http.Http1xStream$UnknownLengthSource.read(Http1xStream.java:476)
at okio.RealBufferedSource$1.read(RealBufferedSource.java:386)
at java.io.InputStream.read(InputStream.java:101)
at com.github.davidmoten.rx2.Bytes$1.accept(Bytes.java:47)
at com.github.davidmoten.rx2.Bytes$1.accept(Bytes.java:43)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$SimpleGenerator.apply(FlowableInternalHelper.java:44)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$SimpleGenerator.apply(FlowableInternalHelper.java:35)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.request(FlowableGenerate.java:109)
... 14 more
Exception in thread "RxNewThreadScheduler-3" io.reactivex.exceptions.UndeliverableException: java.net.SocketException: Socket closed
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:604)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:665)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onError(FlowableSubscribeOn.java:102)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.onError(FlowableGenerate.java:189)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.request(FlowableGenerate.java:114)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.requestUpstream(FlowableSubscribeOn.java:133)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onSubscribe(FlowableSubscribeOn.java:90)
at io.reactivex.internal.operators.flowable.FlowableGenerate.subscribeActual(FlowableGenerate.java:52)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.Flowable.subscribe(Flowable.java:12932)
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
最佳答案
我能够通过创建自己的运算符并从中创建一个Flowable
来成功关闭流。
这个类是一个新的 RxJava Operator,它实现并记录我从关闭 InputStream
中得到的异常,这对于我的用例来说是没问题的。
public class MyFlowableOperator implements FlowableOperator {
@Override
public Subscriber apply(Subscriber subscriber) throws Exception {
return new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}
@Override
public void onNext(Object o) {
subscriber.onNext(o);
}
@Override
public void onError(Throwable t) {
if(t instanceof SocketException) {
log.debug("Input Stream Closed");
} else {
subscriber.onError(t);
}
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
}
下面是我如何声明 RxJava 运算符的新实例并在从 InputStream
创建 Flowable
时使用它
// Private method to utilize the new RxJava Operator
private Subscriber getFlows(Subscriber subscriber) {
try {
return new MyFlowableOperator().apply(subscriber);
} catch (Exception e) {
log.error(e.getMessage());
}
return null;
}
在创建 Flowable
本身时,我使用 lift
方法来调用我的方法并返回带有错误处理的 Flowable
。
// Declaration of the Flowables
Flowable<byte[]> flowable = Bytes.from(inputStream)
.subscribeOn(Schedulers.newThread())
.distinct()
.lift(this::getFlows);
Flowable<byte[]> flowable2 = Bytes.from(inputStream2)
.subscribeOn(Schedulers.newThread())
.distinct()
.lift(this::getFlows);
关于java - 如果输入流的合并 Flowable 已关闭,如何继续合并 Flowable,RxJava 2.x,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45043127/
我在 Eclipse 中使用 Spring Boot 和 Flowable API 创建了一个 Flowable 应用程序。现在我的要求是从 Flowable UI 运行该流程。我们是否可以部署现有的
当我在两个由 Flowable.create() 创建的 Flowable (a1, a2) 上使用 Flowable.merge() 时,它只会发出 a2。但是当我合并两个由 Flowable.in
RxJava 2.0.5 引入了 ParallelFlowable类型,对应Flowable.parallel()运算符(operator)。我发现实现并行性的一般建议是使用 flatMap。像这样:
我对 RxJava 还很陌生,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。 我有 Dao,它提供 Flowable在某些范围内到 Da
我是 RxJava2 新手,不确定我是否正确使用它。 我有几个从 TCP 流读取数据的输入流,并将数据读入 byte[]。然后,我采用 byte[] 并为每个 InputStream 创建一个 Flo
我如何从 room 中读取可流动的值列表并将其转换为另一个对象,该对象是 room 中更多值的组合 database.leadsDao().getLeads(leadState.name) .
我有一个用于将 UserScoreTO 列表分组到单个用户对象(多对一关系)的 Rx 流。 public void execute() { getUsers() .fl
如何使用 Flowable 指定流程持续时间?如果从 startEvent 过去,例如30 天,流程不会完成,应该关闭。我将 Spring Boot 与 flowable 一起使用。应该用图表还是代码
我有一个返回 Flowable 的函数接口(interface) interface Iface{ Flowable get(); } 现在让我们假设具体实现是: class IFaceImpl
根据下面的流程,在基数 = 5 的情况下并行执行“simpleLog”(异步)服务任务时,我们遇到了并发问题,此后流程不再继续执行“dummy”(独占)服务任务下一步也不会重试“simpleLog”中
如何中断长期任务? public Flowable simple(int number) { Flowable flowable = Flowable.create(emitter -> {
使用改造,我从服务器获取 Flowable 数据,然后在 View 模型中,我将 flowable 转换为 Livedata,最后观察 Activity 中的 LiveData。在 Activity
我是 Flowable 新手。我尝试创建 BPMN 部署到 MYSQL,并且部署成功。 但是在取消部署过程中,我将所有部署存储在列表中,然后根据我的条件执行取消部署,就像如果 resouceName
我对 RxJava 还很陌生,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。 首先我有自己写的dao,它处理InputStream,并提
您好,我是 RxJava 新手,我有一个接收 Flowable f2 的类,我需要从中获取值,而不更改任何数据(将值保存到本地缓存)。然后将其与其他 Flowable f1 连接起来并将其发送到更高级
我假设 DefaultAsyncJobExecutor 是默认选择作为 AsyncExecutor 接口(interface)实现的类(不确定这个假设是否正确) 所以基本上我想修改一个异步作业的默认超
我想在 Flowable 中的所有项目发出后发出最终项目。现在我可以通过以下代码实现这一点。 Flowable flowable = getFlowable(); flowable.toList()
我正在使用 reportlab 3.2.0。 SPACER = Spacer(0, 10) buff = BytesIO() doc = SimpleDocTemplate(buff, rightMa
我正在尝试为 Flowable 动态生成工作流文件并在旅途中部署它。 有两个挑战:1. 创建 BAR 文件来打包生成的 XML2. 动态部署。 有人试过吗?如果是,您能否提供帮助或建议替代方案 最佳答
我有一个数据访问对象,它将数据源中的每个项目传递给消费者: public interface Dao { void forEachItem(Consumer item); } 这总是以单线程方
我是一名优秀的程序员,十分优秀!