gpt4 book ai didi

java - 当一个@Around 建议没有进行时的建议优先级问题

转载 作者:行者123 更新时间:2023-12-04 07:47:02 24 4
gpt4 key购买 nike

更新为使用附加信息重新表述问题
我们有两个注解:

  • CustomLogging
  • PollableStreamListener

  • 两者都是使用 Spring AOP 的方面实现的。 CustomLogging注解:
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface CustomLogging {

    }
    CustomLoggingAspect类(class):
    @Aspect
    @Component
    @Slf4j
    @Order(value = 1)
    public class CustomLoggingAspect {

    @Before("@annotation(customLogging)")
    public void addCustomLogging(CustomLogging customLogging) {
    log.info("Logging some information");
    }

    }
    PollableStreamListener注解:
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface PollableStreamListener {

    }
    PollableStreamListenerAspect类(class):
    @Aspect
    @Component
    @Slf4j
    public class PollableStreamListenerAspect {

    private final ExecutorService executor = Executors.newFixedThreadPool(1);

    private volatile boolean paused = false;

    @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
    public void receiveMessage(ProceedingJoinPoint joinPoint,
    PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
    Message<?> message = (Message<?>) dataCapsule;
    AcknowledgmentCallback callback = StaticMessageHeaderAccessor
    .getAcknowledgmentCallback(message);
    callback.noAutoAck();

    if (!paused) {
    // The separate thread is not busy with a previous message, so process this message:
    Runnable runnable = () -> {
    try {
    paused = true;

    // Call method to process this Kafka message
    joinPoint.proceed();

    callback.acknowledge(Status.ACCEPT);
    } catch (Throwable e) {
    callback.acknowledge(Status.REJECT);
    throw new PollableStreamListenerException(e);
    } finally {
    paused = false;
    }
    };

    executor.submit(runnable);
    } else {
    // The separate thread is busy with a previous message, so re-queue this message for later:
    callback.acknowledge(Status.REQUEUE);
    log.info("Re-queue");
    }
    }
    }

    }
    我们有一个名为 CleanupController 的类根据计划定期执行。 CleanupController类(class):
    @Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
    public void pollForDeletionRequest() {
    log.trace("Polling for new messages");
    cleanupInput.poll(cleanupSubmissionService::submitDeletion);
    }
    当调度执行时,它会调用另一个类中的一个方法,该方法用 PollableStreamListener 注释。和 CustomLogging .我添加了 Thread.sleep()模仿需要一段时间执行的方法。
    @PollableStreamListener
    @CustomLogging
    public void submitDeletion(Message<?> received) {
    try {
    log.info("Starting processing");
    Thread.sleep(10000);
    log.info("Finished processing");
    } catch (Exception e) {
    log.info("Error", e);
    }
    }
    我面临的问题是 CustomLogging 产生的输出每次我们使用 @Schedule 轮询新消息时打印,但我只希望它在实际执行带注释的方法时打印(这可能现在发生,也可能在将来发生,具体取决于当前是否正在处理另一条消息)。这会导致令人困惑的日志消息,因为这意味着该消息现在正在被处理,而实际上它已被重新排队以供将来执行。
    有什么方法可以让这些注释很好地协同工作,以便 CustomLogging仅当带注释的方法执行时才会发生输出?

    更新以使用 @OrderPollableStreamListener
    根据@dunni 的建议,我对上面的原始示例进行了以下更改。
    PollableStreamListenerAspect 上设置 1 的顺序:
    @Aspect
    @Component
    @Slf4j
    @Order(value = 1)
    public class PollableStreamListenerAspect {
    ...
    }
    CustomLoggingAspect 的顺序增加到 2 :
    @Aspect
    @Component
    @Slf4j
    @Order(value = 2)
    public class CustomLoggingAspect {
    ...
    }
    我发现在进行这些更改后,轮询根本无法检测到新请求。这是 PollableStreamListenerAspect 上的更改这引入了这个问题(我注释掉了那行并重新运行它,事情的表现和以前一样)。

    更新以使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)PollableStreamListener
    我更新了 PollableStreamListener使用 HIGHEST_PRECEDENCE并更新 @Around值(value):
    @Aspect
    @Component
    @Slf4j
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {

    private final ExecutorService executor = Executors.newFixedThreadPool(1);

    private volatile boolean paused = false;

    @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
    public void receiveMessage(ProceedingJoinPoint joinPoint) {
    if (!paused) {
    // The separate thread is not busy with a previous message, so process this message:
    Runnable runnable = () -> {
    try {
    paused = true;

    // Call method to process this Kafka message
    joinPoint.proceed();
    } catch (Throwable e) {
    e.printStackTrace();
    throw new PollableStreamListenerException(e);
    } finally {
    paused = false;
    }
    };

    executor.submit(runnable);
    } else {
    // The separate thread is busy with a previous message, so re-queue this message for later:
    log.info("Re-queue");
    }
    }
    }
    这现在部分工作。当我发送 Kafka 消息时,它会被处理,并且来自 CustomLogging 的日志记录只有在没有处理另一条 Kafka 消息时才会打印注释。到现在为止还挺好。
    下一个挑战是获取 @Around接受 Message这是通过 Kafka 提供的。我尝试使用上面的示例更改以下行:
      @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
    public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
    ...
    }
    服务器正确启动,但是当我发布 Kafka 消息时,出现以下异常:
    2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
    at xyx.pollForDeletionRequest(CleanupController.java:35)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$$8737f6f8.submitDeletion(<generated>)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
    ... 17 more

    最佳答案

    因为this problem , 你需要使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)PollableStreamListenerAspect .确实很奇怪,但是它可以按您的意愿工作。不过,IMO 这个问题应该在 Spring 得到解决。不得不使用这个变通方法是丑陋的,只有当你的方面调用 proceed() 时才有效。异步实际上确实具有最高优先级,但并非总是如此。作为替代方案,您可以使用 native AspectJ 及其自己的声明建议优先级的概念,该概念独立于 Spring 内部。
    这是您的应用程序的简化版本 MCVE :
    注释:

    package de.scrum_master.spring.q67155048;

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface CustomLogging {}
    package de.scrum_master.spring.q67155048;

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface PollableStreamListener {}
    带有带有两个注解的方法的组件:
    package de.scrum_master.spring.q67155048;

    import org.springframework.stereotype.Component;

    @Component
    public class MyComponent {
    private int counter = 0;

    @PollableStreamListener
    @CustomLogging
    public void submitDeletion() {
    try {
    System.out.println(" Starting processing #" + ++counter);
    Thread.sleep(1000);
    System.out.println(" Finished processing #" + counter);
    }
    catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    方面:
    package de.scrum_master.spring.q67155048;

    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.springframework.stereotype.Component;

    @Aspect
    @Component
    public class CustomLoggingAspect {
    @Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
    public void addCustomLogging() {
    System.out.println("Logging");
    }
    }
    package de.scrum_master.spring.q67155048;

    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    @Aspect
    @Component
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {
    public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
    private volatile boolean paused = false;

    @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
    public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
    System.out.println("Receiving message");
    if (!paused) {
    // The separate thread is not busy with a previous message, so process this message:
    Runnable runnable = () -> {
    try {
    paused = true;
    joinPoint.proceed();
    }
    catch (Throwable throwable) {
    throwable.printStackTrace();
    }
    finally {
    paused = false;
    }
    };
    EXECUTOR_SERVICE.submit(runnable);
    }
    else {
    System.out.println(" Re-queue");
    }
    }
    }
    驱动申请:
    应用程序每 500 毫秒调用一次目标方法,但执行需要 1,000 毫秒。因此,我们希望在这种情况下看到约 50% 的调用在没有任何日志记录的情况下重新排队,因为更高优先级的方面不会继续到目标方法。
    package de.scrum_master.spring.q67155048;

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;

    @SpringBootApplication
    @Configuration
    public class DemoApplication {
    public static void main(String[] args) throws InterruptedException {
    try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
    doStuff(appContext);
    }
    }

    private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
    MyComponent myComponent = appContext.getBean(MyComponent.class);
    for (int i = 0; i < 10; i++) {
    myComponent.submitDeletion();
    Thread.sleep(500);
    }
    // This is just to make the application exit cleanly
    PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
    }
    }
    控制台日志:
      .   ____          _            __ _ _
    /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
    \\/ ___)| |_)| | | | | || (_| | ) ) ) )
    ' |____| .__|_| |_|_| |_\__, | / / / /
    =========|_|==============|___/=/_/_/_/
    :: Spring Boot :: (v2.1.8.RELEASE)

    2021-04-20 12:56:03.675 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
    ...
    2021-04-20 12:56:07.666 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
    Receiving message
    Logging
    Starting processing #1
    Receiving message
    Re-queue
    Finished processing #1
    Receiving message
    Logging
    Starting processing #2
    Receiving message
    Re-queue
    Finished processing #2
    Receiving message
    Logging
    Starting processing #3
    Receiving message
    Re-queue
    Finished processing #3
    Receiving message
    Logging
    Starting processing #4
    Receiving message
    Re-queue
    Finished processing #4
    Receiving message
    Logging
    Starting processing #5
    Receiving message
    Re-queue
    Finished processing #5
    2021-04-20 12:56:12.767 INFO 13560 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
    ...
    看?我们计算 10 倍“接收消息”,但只有 5 倍“重新排队”和 5 倍“记录”。请注意,我对处理调用进行了编号,因为它们是异步的。这样,他们开始和结束时就更容易理解。

    根据用户评论更新:
    我已经更新了我的 MCVE,以便重现您的参数绑定(bind)问题。新的或更改的文件是:
    package de.scrum_master.spring.q67155048;

    public class Message<T> {
    private T content;

    public Message(T content) {
    this.content = content;
    }

    @Override
    public String toString() {
    return "Message{" +
    "content=" + content +
    '}';
    }
    }
    package de.scrum_master.spring.q67155048;

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;

    @SpringBootApplication
    @Configuration
    public class DemoApplication {
    public static void main(String[] args) throws InterruptedException {
    try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
    doStuff(appContext);
    }
    }

    private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
    MyComponent myComponent = appContext.getBean(MyComponent.class);
    Message<String> message = new Message<>("Hi there!");
    for (int i = 0; i < 10; i++) {
    myComponent.submitDeletion(message);
    Thread.sleep(500);
    }
    // This is just to make the application exit cleanly
    PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
    }
    }
    package de.scrum_master.spring.q67155048;

    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    @Aspect
    @Component
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {
    public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
    private volatile boolean paused = false;

    @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
    public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
    System.out.println("Receiving message");
    if (!paused) {
    // The separate thread is not busy with a previous message, so process this message:
    Runnable runnable = () -> {
    try {
    paused = true;
    System.out.println("dataCapsule = " + dataCapsule);
    joinPoint.proceed();
    }
    catch (Throwable throwable) {
    throwable.printStackTrace();
    }
    finally {
    paused = false;
    }
    };
    EXECUTOR_SERVICE.submit(runnable);
    }
    else {
    System.out.println(" Re-queue");
    }
    }
    }
    根据您自己的经验,这会产生:
    Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$$4baa410d.submitDeletion(<generated>)
    at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
    at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)
    您正在点击 this problem ,我已经评论了关闭的 Spring issue #16956关于它,希望得到它repoened和有人修复它。
    目前,您的解决方法不是使用优雅的 AOP 参数绑定(bind),而是使用 JoinPoint.getArgs() 手动获取参数。 :
    package de.scrum_master.spring.q67155048;

    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.core.Ordered;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    @Aspect
    @Component
    @Order(value = Ordered.HIGHEST_PRECEDENCE)
    public class PollableStreamListenerAspect {
    public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
    private volatile boolean paused = false;

    //@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
    @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
    //public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
    public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
    Object dataCapsule = joinPoint.getArgs()[0];
    System.out.println("Receiving message");
    if (!paused) {
    // The separate thread is not busy with a previous message, so process this message:
    Runnable runnable = () -> {
    try {
    paused = true;
    System.out.println("dataCapsule = " + dataCapsule);
    joinPoint.proceed();
    }
    catch (Throwable throwable) {
    throwable.printStackTrace();
    }
    finally {
    paused = false;
    }
    };
    EXECUTOR_SERVICE.submit(runnable);
    }
    else {
    System.out.println(" Re-queue");
    }
    }
    }
    现在它再次像这样工作:
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
    Starting processing #1, message = Message{content=Hi there!}
    Receiving message
    Re-queue
    Receiving message
    Re-queue
    Finished processing #1, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
    Starting processing #2, message = Message{content=Hi there!}
    Receiving message
    Re-queue
    Finished processing #2, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
    Starting processing #3, message = Message{content=Hi there!}
    Receiving message
    Re-queue
    Finished processing #3, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
    Starting processing #4, message = Message{content=Hi there!}
    Receiving message
    Re-queue
    Finished processing #4, message = Message{content=Hi there!}
    Receiving message
    dataCapsule = Message{content=Hi there!}
    Logging
    Starting processing #5, message = Message{content=Hi there!}

    关于java - 当一个@Around 建议没有进行时的建议优先级问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67155048/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com