gpt4 book ai didi

project-reactor - 使用 Hooks 和 Lift 将 Context 插入 ThreadLocal

转载 作者:行者123 更新时间:2023-12-02 02:59:07 26 4
gpt4 key购买 nike

我问自己是否有办法在订阅者收到 onNext 信号之前将 react 上下文推送到 ThreadLocal 变量中。在挖掘 react 堆核心时,我发现了 Hooks 类和 Lift BiFunction。

我创建了一个具有以下实现的类。该类由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口(interface)。它将所有调用委托(delegate)给实际订阅者,如果在实际订阅者上调用 onNext 之前修改到 ThreadLocal 变量中,它还将推送上下文。

package com.example.demo;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;

import java.util.function.BiFunction;

public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);

private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();

public static Context getContext() {
Context context = contextHolder.get();
if (context == null) {
context = Context.empty();
contextHolder.set(context);
}
return context;
}

public static void setContext(Context context) {
contextHolder.set(context);
}

@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
}

final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {

private CoreSubscriber<? super U> delegate;

public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
this.delegate = delegate;
}

@Override
public Context currentContext() {
return delegate.currentContext();
}

@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}

@Override
public void onNext(U u) {
Context context = delegate.currentContext();
if (!context.isEmpty()) {
Context currentContext = ThreadLocalContextLifter.getContext();
if (!currentContext.equals(context)) {
logger.info("Pushing reactive context to holder {}", context);
ThreadLocalContextLifter.setContext(context);
}
}
delegate.onNext(u);
}

@Override
public void onError(Throwable t) {
delegate.onError(t);
}

@Override
public void onComplete() {
delegate.onComplete();
}
}

}

使用以下代码将该实例加载到 Hooks 中:
Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));

我已经进行了一些测试,它似乎工作正常,但我不相信该解决方案。我猜这个钩子(Hook)会降低 react 器的性能,或者在我不知道的某些情况下它不会工作。

我的问题很简单:这是一个坏主意吗?

最佳答案

我不认为这个想法有什么问题......这个钩子(Hook)被每个 Reactor 提供的运算符(operator)使用。
Context onNext 之间不变, 所以电梯 ThreadLocalContextCoreSubscriber可以在 onSubscribe 中捕获它.但是您仍然需要在 onNext 中至少检查一次 ThreadLocal , 自 onNextonSubscribe可能发生在两个不同的线程上,所以您使用 delegate.currentContext() 的解决方案也可以。最后,您的方法看起来不错。

关于project-reactor - 使用 Hooks 和 Lift 将 Context 插入 ThreadLocal,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47501349/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com