gpt4 book ai didi

java - 创建像环形缓冲区一样批处理的 Observable(需要建议)

转载 作者:行者123 更新时间:2023-11-29 07:37:37 25 4
gpt4 key购买 nike

在为我描述的问题找到合适的解决方案后here ,我决定实现这个。

但是我缺乏使用 monad 的经验,像 lift(..) 这样的东西对我来说仍然看起来有点神奇......

我打开这个的目的是让那些在 rxjava 之上实现了一些自定义东西的人可以给我关于如何实现它的建议。

现在这是怎么回事,这是界面。

enter image description here

我想这对你们大多数人来说都是不言自明的,但为了确保我会举一个例子。

假设我们有一个订阅者(消费者),它实际上对数据库进行持久化,很明显,如果你给它 1 或 1000 个对象来持久化,差异不会是 1000 的因数,而是 10 或更少的因数,这意味着它是一个可以采用加载的消费者......所以一次推送一个项目是愚蠢的,而你可以一次坚持多个项目,另一方面,等待一批 N 元素填满直到你是愚蠢的坚持(一秒钟你可能会得到 1000 个元素,而另一秒钟你可能什么也得不到,所以假设我们不知道传入数据的频率)...

所以我们现在拥有的是 Observable.batch(),它会请求一些 N 大小的批处理,我们通常会等待而不工作......在另一边我们有 Disruptor 完全符合我们的要求,但没有提供漂亮的 Observable 接口(interface)... Disruptor 将处理单个元素,当您处理它时,它将收集所有传入的元素,下次你会得到一批因为你的消费者忙于最后一个值而收集的所有东西......

目前我想我应该使用 Observable.from() 来实现这个或者 lift()...

请分享您对此的想法,也许已经有我不知道的可用解决方案,或者我将以错误的方式实现...

最佳答案

这是一个运算符,它会批处理在异步边界后面堆积的值:

public final class OperatorRequestBatcher<T> 
implements Operator<List<T>, T> {
final Scheduler scheduler;
public OperatorRequestBatcher(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber<? super T> call(Subscriber<? super List<T>> t) {
Scheduler.Worker w = scheduler.createWorker();
RequestBatcherSubscriber<T> parent =
new RequestBatcherSubscriber<>(t, w);

t.add(w);
t.add(parent);

return parent;
}

static final class RequestBatcherSubscriber<T>
extends Subscriber<T> implements Action0 {
final Subscriber<? super List<T>> actual;
final Scheduler.Worker w;
final Queue<T> queue;
final AtomicInteger wip;

volatile boolean done;
Throwable error;

public RequestBatcherSubscriber(
Subscriber<? super List<T>> actual,
Scheduler.Worker w) {
this.actual = actual;
this.w = w;
this.wip = new AtomicInteger();
this.queue = new SpscLinkedArrayQueue<>(256);
}

@Override
public void onNext(T t) {
if (done) {
return;
}
queue.offer(t);
schedule();
}

@Override
public void onError(Throwable e) {
if (done) {
return;
}
error = e;
done = true;
schedule();
}

@Override
public void onCompleted() {
done = true;
schedule();
}

void schedule() {
if (wip.getAndIncrement() == 0) {
w.schedule(this);
}
}

@Override
public void call() {
int missed = 1;

final Queue<T> q = queue;
final Subscriber<? super List<T>> a = actual;
final AtomicInteger wip = this.wip;

for (;;) {

List<T> list = new ArrayList<>();

for (;;) {
boolean d = done;
T v = q.poll();
boolean e = v == null;

if (isUnsubscribed()) {
q.clear();
return;
}

if (d) {
Throwable err = error;
if (err != null) {
a.onError(err);
return;
} else
if (e) {
if (!list.isEmpty()) {
a.onNext(list);
}
a.onCompleted();
return;
}
}

if (e) {
break;
}

list.add(v);
}

if (!list.isEmpty()) {
a.onNext(list);
}

missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}

public static void main(String[] args) {
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler sch = Schedulers.test();

ps.lift(new OperatorRequestBatcher<>(sch))
.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done"));

ps.onNext(1);
ps.onNext(2);

sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

ps.onNext(3);

sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();

sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
}
}

但是请注意,您在 API 中描述的是热 Observable 的一种形式:冷源不跨多个订阅者进行协调。为此,您需要创建自定义 ConnectableObservable

publish() 可能适用于 disruptForEachSubscriberpublish().observeOn() 不适用于 disruptForAllSubscriber可能是因为 observeOn 将请求一堆值,而 publish 会将其解释为成功处理 N 个批处理。

关于java - 创建像环形缓冲区一样批处理的 Observable(需要建议),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33760083/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com