- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
许多线程将进行数据库调用并阻塞,以提高和扩展性能。
我想要扩展一个需要发出 3000 个异步数据库请求的程序。我不想一次性发出 3000 个请求,而是希望在任何给定时间将其限制为 50 个,并将剩余的 2950 个请求排队,然后在任务完成时一次处理剩余的 2950 个请求。理想情况下,我想使用现有的库来使用新的自定义代码重新发明它,因为我假设有一种方法可以做到这一点,但我不确定如何使用不断出现的各种异步 Java SDK 的 API出来。
最佳答案
我认为有几种方法可以解决无限线程池的问题。正如其他人指出的,一种是从有界线程池支持的执行器创建 RxJava 调度程序。这非常简单,而且很可能是最好的方法。
但是,我确实想指出,RxJava 的“并行化”运算符(flatMap、concatMapEager)还有一个可选的 maxConcurrency 运算符,它允许我们将给定 Rx 管道中的泳道数量与用于执行的调度程序解耦它。
这是一个假设的示例,假设我们有一个执行阻塞查询的数据访问对象。在本例中,它仅 hibernate 1 秒并返回查询本身并附加时间戳:
public class MyDao
{
public Object blockingGetData( String query ) throws InterruptedException
{
Thread.sleep( 1000 );
return query.toUpperCase() + " - " + new Date().toString();
}
}
接下来,让我们将 DAO 包装在一个异步服务中,该服务维护一个 Rx 管道,其中每个元素代表一个查询及其异步结果:
public class MyService
{
private class QueryHolder
{
final String query;
final Subject<Object> result;
public QueryHolder( String query, Subject<Object> result )
{
this.query = query;
this.result = result;
}
}
private static final int MAX_CONCURRENCY = 2;
private final Subject<QueryHolder> querySubject;
private final MyDao dao;
public MyService()
{
dao = new MyDao();
querySubject = PublishSubject.<QueryHolder>create().toSerialized();
querySubject
.flatMap(
// For each element in the pipeline, perform blocking
// get on IO Scheduler, populating the result Subject:
queryHolder -> Observable.just( queryHolder )
.subscribeOn( Schedulers.io() )
.doOnNext( __ -> {
Object data = dao.blockingGetData( queryHolder.query );
queryHolder.result.onNext( data );
queryHolder.result.onComplete();
} ),
// With max concurrency limited:
MAX_CONCURRENCY )
.subscribe();
}
public Single<Object> getData( String query )
{
Subject<Object> result = AsyncSubject.create();
// Emit pipeline element:
querySubject.onNext( new QueryHolder( query, result ));
return result.firstOrError();
}
}
我建议您在谷歌上搜索不同的主题类型和运算符等 - 有大量可用文档。
简单的手动测试:
@Test
public void testService() throws InterruptedException
{
MyService service = new MyService();
// Issue 20 queries immediately, printing the results when they complete:
for ( int i = 0; i < 20; i++ )
{
service.getData( "query #" + i )
.subscribe( System.out::println );
}
// Sleep:
Thread.sleep( 11000 );
}
输出:
QUERY #0 - Wed Mar 11 11:08:21 EDT 2020
QUERY #1 - Wed Mar 11 11:08:21 EDT 2020
QUERY #2 - Wed Mar 11 11:08:22 EDT 2020
QUERY #3 - Wed Mar 11 11:08:22 EDT 2020
QUERY #4 - Wed Mar 11 11:08:23 EDT 2020
QUERY #5 - Wed Mar 11 11:08:23 EDT 2020
QUERY #6 - Wed Mar 11 11:08:24 EDT 2020
QUERY #7 - Wed Mar 11 11:08:24 EDT 2020
QUERY #8 - Wed Mar 11 11:08:25 EDT 2020
QUERY #9 - Wed Mar 11 11:08:25 EDT 2020
QUERY #10 - Wed Mar 11 11:08:26 EDT 2020
QUERY #11 - Wed Mar 11 11:08:26 EDT 2020
QUERY #12 - Wed Mar 11 11:08:27 EDT 2020
QUERY #13 - Wed Mar 11 11:08:27 EDT 2020
QUERY #14 - Wed Mar 11 11:08:28 EDT 2020
QUERY #15 - Wed Mar 11 11:08:28 EDT 2020
QUERY #16 - Wed Mar 11 11:08:29 EDT 2020
QUERY #17 - Wed Mar 11 11:08:29 EDT 2020
QUERY #18 - Wed Mar 11 11:08:30 EDT 2020
QUERY #19 - Wed Mar 11 11:08:30 EDT 2020
关于java - 如何从Java SDK或类似的SDK(即: rxJava,项目 react 器)获得可扩展的I/O绑定(bind)异步多线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60493245/
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!