gpt4 book ai didi

spring-amqp - 如何为@RabbitListener 注解编写集成测试?

转载 作者:行者123 更新时间:2023-12-04 15:32:38 25 4
gpt4 key购买 nike

我的问题实际上是一个后续问题

RabbitMQ Integration Test and Threading

在那里它声明包装“你的听众”并传入一个 CountDownLatch ,最终所有线程都将合并。如果我们手动创建和注入(inject)消息监听器但对于 @RabbitListener 注释,则此答案有效...我不确定如何传入 CountDownLatch。该框架在幕后自动神奇地创建消息监听器。

还有其他方法吗?

最佳答案

在@Gary Russell 的帮助下,我得到了答案并使用了以下解决方案。

结论:我必须承认我对这个解决方案漠不关心(感觉就像一个黑客),但这是我唯一可以开始工作的事情,一旦你完成了最初的一次性设置并真正理解了“工作流程”,它就不会那么痛苦了.基本上归结为定义 (2) @Beans 并将它们添加到您的集成测试配置中。

下面发布的示例解决方案带有解释。请随时提出对此解决方案的改进建议。

1. 定义一个 ProxyListenerBPP,它在 spring 初始化期间将监听指定的 clazz(即我们包含 @RabbitListener 的测试类)和
注入(inject)我们在下一步中定义的自定义 CountDownLatchListenerInterceptor 建议。

import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;

/**
* Implements BeanPostProcessor bean... during spring initialization we will
* listen for a specified clazz
* (i.e our @RabbitListener annotated class) and
* inject our custom CountDownLatchListenerInterceptor advice
* @author sjacobs
*
*/
public class ProxyListenerBPP implements BeanPostProcessor, BeanFactoryAware, Ordered, PriorityOrdered{

private BeanFactory beanFactory;
private Class<?> clazz;
public static final String ADVICE_BEAN_NAME = "wasCalled";

public ProxyListenerBPP(Class<?> clazz) {
this.clazz = clazz;
}


@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

if (clazz.isAssignableFrom(bean.getClass())) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.setProxyTargetClass(true); // CGLIB, false for JDK proxy (interface needed)
pfb.setTarget(bean);
pfb.addAdvice(this.beanFactory.getBean(ADVICE_BEAN_NAME, Advice.class));
return pfb.getObject();
}
else {
return bean;
}
}

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 1000; // Just before @RabbitListener post processor
}

2. 创建 MethodInterceptor 通知 impl,它将保存对 CountDownLatch 的引用。 CountDownLatch 需要在集成测试线程和@RabbitListener 中的异步工作线程中引用。所以我们可以稍后发布回集成测试线程 尽快 @RabbitListener 异步线程已完成执行。无需轮询。

import java.util.concurrent.CountDownLatch;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

/**
* AOP MethodInterceptor that maps a <b>Single</b> CountDownLatch to one method and invokes
* CountDownLatch.countDown() after the method has completed execution. The motivation behind this
* is for integration testing purposes of Spring RabbitMq Async Worker threads to be able to merge
* the Integration Test thread after an Async 'worker' thread completed its task.
* @author sjacobs
*
*/
public class CountDownLatchListenerInterceptor implements MethodInterceptor {

private CountDownLatch countDownLatch = new CountDownLatch(1);

private final String methodNameToInvokeCDL ;

public CountDownLatchListenerInterceptor(String methodName) {
this.methodNameToInvokeCDL = methodName;
}

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
String methodName = invocation.getMethod().getName();

if (this.methodNameToInvokeCDL.equals(methodName) ) {

//invoke async work
Object result = invocation.proceed();

//returns us back to the 'awaiting' thread inside the integration test
this.countDownLatch.countDown();

//"reset" CountDownLatch for next @Test (if testing for more async worker)
this.countDownLatch = new CountDownLatch(1);

return result;
} else
return invocation.proceed();
}


public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
}

3. 接下来将以下 @Bean(s) 添加到您的集成测试配置中

public class SomeClassThatHasRabbitListenerAnnotationsITConfig extends BaseIntegrationTestConfig {

// pass into the constructor the test Clazz that contains the @RabbitListener annotation into the constructor
@Bean
public static ProxyListenerBPP listenerProxier() { // note static
return new ProxyListenerBPP(SomeClassThatHasRabbitListenerAnnotations.class);
}

// pass the method name that will be invoked by the async thread in SomeClassThatHasRabbitListenerAnnotations.Class
// I.E the method name annotated with @RabbitListener or @RabbitHandler
// in our example 'listen' is the method name inside SomeClassThatHasRabbitListenerAnnotations.Class
@Bean(name=ProxyListenerBPP.ADVICE_BEAN_NAME)
public static Advice wasCalled() {
String methodName = "listen";
return new CountDownLatchListenerInterceptor( methodName );
}

// this is the @RabbitListener bean we are testing
@Bean
public SomeClassThatHasRabbitListenerAnnotations rabbitListener() {
return new SomeClassThatHasRabbitListenerAnnotations();
}

}

4. 最后,在集成@Test调用...通过rabbitTemplate发送消息触发异步线程...现在调用从拦截器获取的CountDownLatch#await(...)方法并确保传入一个TimeUnit args所以它可以在长时间运行的过程或出现问题的情况下超时。一旦异步集成测试线程被通知(唤醒),现在我们终于可以开始实际测试/验证/验证异步工作的结果。

@ContextConfiguration(classes={ SomeClassThatHasRabbitListenerAnnotationsITConfig.class } )
public class SomeClassThatHasRabbitListenerAnnotationsIT extends BaseIntegrationTest{

@Inject
private CountDownLatchListenerInterceptor interceptor;

@Inject
private RabbitTemplate rabbitTemplate;

@Test
public void shouldReturnBackAfterAsyncThreadIsFinished() throws Exception {

MyObject payload = new MyObject();
rabbitTemplate.convertAndSend("some.defined.work.queue", payload);
CountDownLatch cdl = interceptor.getCountDownLatch();

// wait for async thread to finish
cdl.await(10, TimeUnit.SECONDS); // IMPORTANT: set timeout args.

//Begin the actual testing of the results of the async work
// check the database?
// download a msg from another queue?
// verify email was sent...
// etc...
}

关于spring-amqp - 如何为@RabbitListener 注解编写集成测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34454159/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com