gpt4 book ai didi

java - 创建操作作业的队列

转载 作者:行者123 更新时间:2023-12-02 11:42:22 24 4
gpt4 key购买 nike

我想构建一个线程来监听队列,并在每当我将项目添加到队列时执行作业。

但我不太清楚如何开发它。我已经尝试过 RxJava2 中的一些 Flowable 示例,但不知道如何做到这一点。

我对 Android 和 Java 中的所有示例持开放态度,也许消息处理程序或执行器将是一个简单的解决方案。可惜没有专业知识。特别是 RxJava2 会很棒。

更新

换句话说,我想在其上构建一个队列机制,因为长日志显示为单独的,并且每当其中两个在附近调用时,计时就会使它们混合。

public final class Logcat {

private static final String TAG = "HOWDY";

public static void v(String message) {
Log.v(TAG, message);
}

public static void d(String message) {
Log.d(TAG, message);
//TODO I will add a for-loop later for long messages to make sure to show all of them for each method.
}

public static void e(Throwable throwable) {
Log.e(TAG, throwable.getMessage());
}

public static void e(String message) {
Log.e(TAG, message);
}

public static void e(ApiError error) {
Log.e(TAG, error.message);
}
}

最佳答案

好的,这就是我要怎么做的..

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

public class DemoRxJava2 {

public static void testWithQueue() {

CompletableFuture<String> allDone = new CompletableFuture<>();
AtomicBoolean submitDone = new AtomicBoolean(Boolean.FALSE);
final Queue<Long> queue = new ConcurrentLinkedQueue<>();

Observable.interval(2, TimeUnit.SECONDS)
.takeWhile(tick -> !queue.isEmpty() || !submitDone.get())
.flatMap(tick -> {
return Observable.create(sub -> {
while (!queue.isEmpty()) {
sub.onNext(queue.poll());
}
sub.onComplete();
});
})
.subscribeOn(Schedulers.single())
.doOnSubscribe(dis -> System.out.println("Queue processing active"))
.doOnComplete(() -> {
System.out.println("Queue processing done");
allDone.complete("DONE");
})
.subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));

Observable.interval(1,TimeUnit.SECONDS)
.take(10)
.doOnSubscribe(dis -> System.out.println("Job submitter start"))
.doOnNext(tick -> {
long ms = System.currentTimeMillis() / 1000;
System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
queue.add(ms);
})
.doOnComplete(() -> submitDone.set(Boolean.TRUE))
.blockingSubscribe();

try {
allDone.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

public static void testWithSubject() {

CompletableFuture<String> allDone = new CompletableFuture<>();

PublishSubject<Long> queue = PublishSubject.create();

queue.observeOn(Schedulers.single())
.flatMap(tx -> Observable.just(tx).delay(2, TimeUnit.SECONDS))
.doOnSubscribe(dis -> System.out.println("Queue processing active"))
.doOnComplete(() -> allDone.complete("DONE"))
.subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));

Observable.interval(1, TimeUnit.SECONDS)
.take(10)
.doOnSubscribe(dis -> System.out.println("Job submitter start"))
.doOnNext(tick -> {
long ms = System.currentTimeMillis() / 1000;
System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
queue.onNext(ms);
})
.doOnComplete(() -> queue.onComplete())
.blockingSubscribe();

//wait until all done
try {
allDone.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
testWithQueue();
testWithSubject();
}
}

这只是演示如何使用 RxJava 在单独的线程中处理对象队列,请根据您的需要进行调整

关于java - 创建操作作业的队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48459754/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com