gpt4 book ai didi

java - 如何使用 MockIntegrationContext.substituteMessageHandlerFor 模拟 WebFluxRequestExecutingMessageHandler

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:24:02 27 4
gpt4 key购买 nike

我已经实现了一个IntegrationFlow,我想在其中执行以下任务:

  1. 轮询目录中的文件
  2. 将文件内容转化为字符串
  3. 通过 WebFluxRequestExecutingMessageHandler 将字符串发送到 REST 端点并使用 AdviceChain 处理成功和错误响应

实现

@Configuration
@Slf4j
public class JsonToRestIntegration {

@Autowired
private LoadBalancerExchangeFilterFunction lbFunction;

@Value("${json_folder}")
private String jsonPath;

@Value("${json_success_folder}")
private String jsonSuccessPath;

@Value("${json_error_folder}")
private String jsonErrorPath;

@Value("${rest-service-url}")
private String restServiceUrl;

@Bean
public DirectChannel httpResponseChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel successChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel failureChannel() {
return new DirectChannel();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}

@Bean
public IntegrationFlow jsonFileToRestFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
.transform(org.springframework.integration.file.dsl.Files.toStringTransformer())
.enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
.handle(reactiveOutbound())
.log()
.channel(httpResponseChannel())
.get();
}

@Bean
public FileReadingMessageSource fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(jsonPath));
source.setFilter(new SimplePatternFileListFilter("*.json"));
source.setUseWatchService(true);
source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

return source;
}

@Bean
public MessageHandler reactiveOutbound() {
WebClient webClient = WebClient.builder()
.baseUrl("http://jsonservice")
.filter(lbFunction)
.build();

WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(restServiceUrl, webClient);

handler.setHttpMethod(HttpMethod.POST);
handler.setCharset(StandardCharsets.UTF_8.displayName());
handler.setOutputChannel(httpResponseChannel());
handler.setExpectedResponseType(String.class);
handler.setAdviceChain(singletonList(expressionAdvice()));

return handler;
}

public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

advice.setTrapException(true);
advice.setSuccessChannel(successChannel());
advice.setOnSuccessExpressionString("payload + ' war erfolgreich'");
advice.setFailureChannel(failureChannel());
advice.setOnFailureExpressionString("payload + ' war nicht erfolgreich'");

return advice;
}

@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlows.from(httpResponseChannel())
.handle(message -> {
String originalFileName = (String) message.getHeaders().get(FileHeaders.FILENAME);
log.info("some log");
})
.get();
}

@Bean
public IntegrationFlow successFlow() {
return IntegrationFlows.from(successChannel())
.handle(message -> {
MessageHeaders messageHeaders = ((AdviceMessage) message).getInputMessage().getHeaders();

File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
String originalFileName = (String) messageHeaders.get(FILENAME);

if (originalFile != null && originalFileName != null) {

File jsonSuccessFolder = new File(jsonSuccessPath);
File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

try {
Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
} catch (IOException e) {
log.error("some log", e);
}
}
})
.get();
}

@Bean
public IntegrationFlow failureFlow() {
return IntegrationFlows.from(failureChannel())
.handle(message -> {
Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

if (failedMessage != null) {

File originalFile = (File) failedMessage.getHeaders().get(FileHeaders.ORIGINAL_FILE);
String originalFileName = (String) failedMessage.getHeaders().get(FileHeaders.FILENAME);

if (originalFile != null && originalFileName != null) {

File jsonErrorFolder = new File(tonisJsonErrorPath);
File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

try {
Files.move(originalFile.toPath(), jsonErrorFile.toPath());
} catch (IOException e) {
log.error("some log", e);
}
}
}
})
.get();
}
}

到目前为止,它似乎在生产中有效。在测试中我想做以下步骤:

  1. 将 JSON 文件复制到输入目录
  2. 开始对 json 文件进行轮询
  3. 对通过我的建议链路由的 WebFluxRequestExecutingMessageHandler 的 HTTP 响应进行断言

但我在测试中遇到了以下任务:

  1. 使用 MockIntegrationContext.substituteMessageHandlerFor() 方法模拟 WebFluxRequestExecutingMessageHandler
  2. 手动启动 json 文件的轮询

测试

@RunWith(SpringRunner.class)
@SpringIntegrationTest()
@Import({JsonToRestIntegration.class})
@JsonTest
public class JsonToRestIntegrationTest {

@Autowired
public DirectChannel httpResponseChannel;

@Value("${json_folder}")
private String jsonPath;

@Value("${json_success_folder}")
private String jsonSuccessPath;

@Value("${json_error_folder}")
private String jsonErrorPath;

@Autowired
private MockIntegrationContext mockIntegrationContext;

@Autowired
private MessageHandler reactiveOutbound;

@Before
public void setUp() throws Exception {
Files.createDirectories(Paths.get(jsonPath));
Files.createDirectories(Paths.get(jsonSuccessPath));
Files.createDirectories(Paths.get(jsonErrorPath));
}

@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(new File(jsonPath));
FileUtils.deleteDirectory(new File(jsonSuccessPath));
FileUtils.deleteDirectory(new File(jsonErrorPath));
}

@Test
public void shouldSendJsonToRestEndpointAndReceiveOK() throws Exception {
File jsonFile = new ClassPathResource("/test.json").getFile();
Path targetFilePath = Paths.get(jsonPath + "/" + jsonFile.getName());
Files.copy(jsonFile.toPath(), targetFilePath);

httpResponseChannel.subscribe(httpResponseHandler());

this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);
}

private MessageHandler httpResponseHandler() {
return message -> Assert.assertThat(message.getPayload(), is(notNullValue()));
}

@Configuration
@Import({JsonToRestIntegration.class})
public static class JsonToRestIntegrationTest {

@Autowired
public MessageChannel httpResponseChannel;

@Bean
public MessageHandler reactiveOutbound() {
ArgumentCaptor<Message<?>> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);

MockMessageHandler mockMessageHandler = mockMessageHandler(messageArgumentCaptor).handleNextAndReply(m -> m);
mockMessageHandler.setOutputChannel(httpResponseChannel);
return mockMessageHandler;
}

}

}

使用模拟的 WebFluX 网络客户端更新工作示例:

实现

public class JsonToRestIntegration {

private final LoadBalancerExchangeFilterFunction lbFunction;

private final BatchConfigurationProperties batchConfigurationProperties;

@Bean
public DirectChannel httpResponseChannel() {
return new DirectChannel();
}

@Bean
public DirectChannel errorChannel() {
return new DirectChannel();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(100, TimeUnit.MILLISECONDS).get();
}

@Bean
public IntegrationFlow jsonFileToRestFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
.transform(org.springframework.integration.file.dsl.Files.toStringTransformer("UTF-8"))
.enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
.handle(reactiveOutbound())
.channel(httpResponseChannel())
.get();
}

@Bean
public FileReadingMessageSource fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(batchConfigurationProperties.getJsonImportFolder()));
source.setFilter(new SimplePatternFileListFilter("*.json"));
source.setUseWatchService(true);
source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

return source;
}

@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound() {
WebClient webClient = WebClient.builder()
.baseUrl("http://service")
.filter(lbFunction)
.build();

WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(batchConfigurationProperties.getServiceUrl(), webClient);

handler.setHttpMethod(HttpMethod.POST);
handler.setCharset(StandardCharsets.UTF_8.displayName());
handler.setOutputChannel(httpResponseChannel());
handler.setExpectedResponseType(String.class);
handler.setAdviceChain(singletonList(expressionAdvice()));

return handler;
}

public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

advice.setTrapException(true);
advice.setFailureChannel(errorChannel());

return advice;
}

@Bean
public IntegrationFlow responseFlow() {
return IntegrationFlows.from(httpResponseChannel())
.handle(message -> {
MessageHeaders messageHeaders = message.getHeaders();
File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
String originalFileName = (String) messageHeaders.get(FILENAME);

if (originalFile != null && originalFileName != null) {

File jsonSuccessFolder = new File(batchConfigurationProperties.getJsonSuccessFolder());
File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

try {
Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
} catch (IOException e) {
log.error("Could not move file", e);
}
}
})
.get();
}

@Bean
public IntegrationFlow failureFlow() {
return IntegrationFlows.from(errorChannel())
.handle(message -> {
Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

if (failedMessage != null) {

File originalFile = (File) failedMessage.getHeaders().get(ORIGINAL_FILE);
String originalFileName = (String) failedMessage.getHeaders().get(FILENAME);

if (originalFile != null && originalFileName != null) {

File jsonErrorFolder = new File(batchConfigurationProperties.getJsonErrorFolder());
File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

try {
Files.move(originalFile.toPath(), jsonErrorFile.toPath());
} catch (IOException e) {
log.error("Could not move file", originalFileName, e);
}
}
}
})
.get();
}
}

测试

@RunWith(SpringRunner.class)
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
@Import({JsonToRestIntegration.class, BatchConfigurationProperties.class})
@JsonTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JsonToRestIntegrationIT {

private static final FilenameFilter JSON_FILENAME_FILTER = (dir, name) -> name.endsWith(".json");

@Autowired
private BatchConfigurationProperties batchConfigurationProperties;

@Autowired
private ObjectMapper om;

@Autowired
private MessageHandler reactiveOutbound;

@Autowired
private DirectChannel httpResponseChannel;

@Autowired
private DirectChannel errorChannel;

@Autowired
private FileReadingMessageSource fileReadingMessageSource;

@Autowired
private SourcePollingChannelAdapter fileReadingEndpoint;

@MockBean
private LoadBalancerExchangeFilterFunction lbFunction;

private String jsonImportPath;
private String jsonSuccessPath;
private String jsonErrorPath;

@Before
public void setUp() throws Exception {
jsonImportPath = batchConfigurationProperties.getJsonImportFolder();
jsonSuccessPath = batchConfigurationProperties.getJsonSuccessFolder();
jsonErrorPath = batchConfigurationProperties.getJsonErrorFolder();

Files.createDirectories(Paths.get(jsonImportPath));
Files.createDirectories(Paths.get(jsonSuccessPath));
Files.createDirectories(Paths.get(jsonErrorPath));
}

@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(new File(jsonImportPath));
FileUtils.deleteDirectory(new File(jsonSuccessPath));
FileUtils.deleteDirectory(new File(jsonErrorPath));
}

@Test
public void shouldMoveJsonFileToSuccessFolderWhenHttpResponseIsOk() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);

httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
latch.countDown();
super.postSend(message, channel, sent);
}
});
errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
fail();
}
});

ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

DataBufferFactory bufferFactory = response.bufferFactory();

String valueAsString = null;
try {
valueAsString = om.writeValueAsString(new ResponseDto("1"));
} catch (JsonProcessingException e) {
fail();
}
return response.writeWith(Mono.just(bufferFactory.wrap(valueAsString.getBytes())))
.then(Mono.defer(response::setComplete));

});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.reactiveOutbound)
.setPropertyValue("webClient", webClient);

File jsonFile = new ClassPathResource("/test.json").getFile();
Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
Files.copy(jsonFile.toPath(), targetFilePath);

fileReadingEndpoint.start();

assertThat(latch.await(12, TimeUnit.SECONDS), is(true));

File[] jsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonImportFolder, is(notNullValue()));
assertThat(filesInJsonImportFolder.length, is(0));

File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonSuccessFolder, is(notNullValue()));
assertThat(filesInJsonSuccessFolder.length, is(1));

File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonErrorFolder, is(notNullValue()));
assertThat(filesInJsonErrorFolder.length, is(0));
}

@Test
public void shouldMoveJsonFileToErrorFolderWhenHttpResponseIsNotOk() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);

errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
latch.countDown();
super.postSend(message, channel, sent);
}
});
httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
fail();
}
});

ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.BAD_REQUEST);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

DataBufferFactory bufferFactory = response.bufferFactory();

return response.writeWith(Mono.just(bufferFactory.wrap("SOME BAD REQUEST".getBytes())))
.then(Mono.defer(response::setComplete));

});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.reactiveOutbound)
.setPropertyValue("webClient", webClient);

File jsonFile = new ClassPathResource("/error.json").getFile();
Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
Files.copy(jsonFile.toPath(), targetFilePath);

fileReadingEndpoint.start();

assertThat(latch.await(11, TimeUnit.SECONDS), is(true));

File[] filesInJsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonImportFolder, is(notNullValue()));
assertThat(filesInJsonImportFolder.length, is(0));

File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonSuccessFolder, is(notNullValue()));
assertThat(filesInJsonSuccessFolder.length, is(0));

File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonErrorFolder, is(notNullValue()));
assertThat(filesInJsonErrorFolder.length, is(1));
}
}

最佳答案

  this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);

这个方法的第一个参数是一个endpoint id。 (我想我们只是缺少有关这些方法的 Javadocs...)。

所以,你需要的是这样的东西:

.handle(reactiveOutbound(), e -> e.id("webFluxEndpoint"))

然后在那个测试用例中你做:

 this.mockIntegrationContext.substituteMessageHandlerFor("webFluxEndpoint", reactiveOutbound);

您不需要在测试类配置中覆盖 bean。 MockMessageHandler 可以只在测试方法体中使用。

您通过 .from(fileReadingMessageSource()) 轮询文件。要进行手动控制,您需要在开始时将其停止。为此,您再次添加一个 endpoint id:

.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))

然后在测试配置中执行此操作:

@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")

WebFlux 的另一种方法是通过自定义的 WebClient 来模拟对服务器的请求。例如:

ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())))
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.reactiveOutbound)
.setPropertyValue("webClient", webClient);

关于java - 如何使用 MockIntegrationContext.substituteMessageHandlerFor 模拟 WebFluxRequestExecutingMessageHandler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52114448/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com