- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我已经实现了一个IntegrationFlow
,我想在其中执行以下任务:
WebFluxRequestExecutingMessageHandler
将字符串发送到 REST 端点并使用 AdviceChain
处理成功和错误响应实现
@Configuration
@Slf4j
public class JsonToRestIntegration {
@Autowired
private LoadBalancerExchangeFilterFunction lbFunction;
@Value("${json_folder}")
private String jsonPath;
@Value("${json_success_folder}")
private String jsonSuccessPath;
@Value("${json_error_folder}")
private String jsonErrorPath;
@Value("${rest-service-url}")
private String restServiceUrl;
@Bean
public DirectChannel httpResponseChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel successChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel failureChannel() {
return new DirectChannel();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow jsonFileToRestFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
.transform(org.springframework.integration.file.dsl.Files.toStringTransformer())
.enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
.handle(reactiveOutbound())
.log()
.channel(httpResponseChannel())
.get();
}
@Bean
public FileReadingMessageSource fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(jsonPath));
source.setFilter(new SimplePatternFileListFilter("*.json"));
source.setUseWatchService(true);
source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);
return source;
}
@Bean
public MessageHandler reactiveOutbound() {
WebClient webClient = WebClient.builder()
.baseUrl("http://jsonservice")
.filter(lbFunction)
.build();
WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(restServiceUrl, webClient);
handler.setHttpMethod(HttpMethod.POST);
handler.setCharset(StandardCharsets.UTF_8.displayName());
handler.setOutputChannel(httpResponseChannel());
handler.setExpectedResponseType(String.class);
handler.setAdviceChain(singletonList(expressionAdvice()));
return handler;
}
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
advice.setSuccessChannel(successChannel());
advice.setOnSuccessExpressionString("payload + ' war erfolgreich'");
advice.setFailureChannel(failureChannel());
advice.setOnFailureExpressionString("payload + ' war nicht erfolgreich'");
return advice;
}
@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlows.from(httpResponseChannel())
.handle(message -> {
String originalFileName = (String) message.getHeaders().get(FileHeaders.FILENAME);
log.info("some log");
})
.get();
}
@Bean
public IntegrationFlow successFlow() {
return IntegrationFlows.from(successChannel())
.handle(message -> {
MessageHeaders messageHeaders = ((AdviceMessage) message).getInputMessage().getHeaders();
File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
String originalFileName = (String) messageHeaders.get(FILENAME);
if (originalFile != null && originalFileName != null) {
File jsonSuccessFolder = new File(jsonSuccessPath);
File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);
try {
Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
} catch (IOException e) {
log.error("some log", e);
}
}
})
.get();
}
@Bean
public IntegrationFlow failureFlow() {
return IntegrationFlows.from(failureChannel())
.handle(message -> {
Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();
if (failedMessage != null) {
File originalFile = (File) failedMessage.getHeaders().get(FileHeaders.ORIGINAL_FILE);
String originalFileName = (String) failedMessage.getHeaders().get(FileHeaders.FILENAME);
if (originalFile != null && originalFileName != null) {
File jsonErrorFolder = new File(tonisJsonErrorPath);
File jsonErrorFile = new File(jsonErrorFolder, originalFileName);
try {
Files.move(originalFile.toPath(), jsonErrorFile.toPath());
} catch (IOException e) {
log.error("some log", e);
}
}
}
})
.get();
}
}
到目前为止,它似乎在生产中有效。在测试中我想做以下步骤:
WebFluxRequestExecutingMessageHandler
的 HTTP 响应进行断言但我在测试中遇到了以下任务:
MockIntegrationContext.substituteMessageHandlerFor()
方法模拟 WebFluxRequestExecutingMessageHandler
测试
@RunWith(SpringRunner.class)
@SpringIntegrationTest()
@Import({JsonToRestIntegration.class})
@JsonTest
public class JsonToRestIntegrationTest {
@Autowired
public DirectChannel httpResponseChannel;
@Value("${json_folder}")
private String jsonPath;
@Value("${json_success_folder}")
private String jsonSuccessPath;
@Value("${json_error_folder}")
private String jsonErrorPath;
@Autowired
private MockIntegrationContext mockIntegrationContext;
@Autowired
private MessageHandler reactiveOutbound;
@Before
public void setUp() throws Exception {
Files.createDirectories(Paths.get(jsonPath));
Files.createDirectories(Paths.get(jsonSuccessPath));
Files.createDirectories(Paths.get(jsonErrorPath));
}
@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(new File(jsonPath));
FileUtils.deleteDirectory(new File(jsonSuccessPath));
FileUtils.deleteDirectory(new File(jsonErrorPath));
}
@Test
public void shouldSendJsonToRestEndpointAndReceiveOK() throws Exception {
File jsonFile = new ClassPathResource("/test.json").getFile();
Path targetFilePath = Paths.get(jsonPath + "/" + jsonFile.getName());
Files.copy(jsonFile.toPath(), targetFilePath);
httpResponseChannel.subscribe(httpResponseHandler());
this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);
}
private MessageHandler httpResponseHandler() {
return message -> Assert.assertThat(message.getPayload(), is(notNullValue()));
}
@Configuration
@Import({JsonToRestIntegration.class})
public static class JsonToRestIntegrationTest {
@Autowired
public MessageChannel httpResponseChannel;
@Bean
public MessageHandler reactiveOutbound() {
ArgumentCaptor<Message<?>> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
MockMessageHandler mockMessageHandler = mockMessageHandler(messageArgumentCaptor).handleNextAndReply(m -> m);
mockMessageHandler.setOutputChannel(httpResponseChannel);
return mockMessageHandler;
}
}
}
使用模拟的 WebFluX 网络客户端更新工作示例:
实现
public class JsonToRestIntegration {
private final LoadBalancerExchangeFilterFunction lbFunction;
private final BatchConfigurationProperties batchConfigurationProperties;
@Bean
public DirectChannel httpResponseChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel errorChannel() {
return new DirectChannel();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(100, TimeUnit.MILLISECONDS).get();
}
@Bean
public IntegrationFlow jsonFileToRestFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
.transform(org.springframework.integration.file.dsl.Files.toStringTransformer("UTF-8"))
.enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
.handle(reactiveOutbound())
.channel(httpResponseChannel())
.get();
}
@Bean
public FileReadingMessageSource fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(batchConfigurationProperties.getJsonImportFolder()));
source.setFilter(new SimplePatternFileListFilter("*.json"));
source.setUseWatchService(true);
source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);
return source;
}
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound() {
WebClient webClient = WebClient.builder()
.baseUrl("http://service")
.filter(lbFunction)
.build();
WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(batchConfigurationProperties.getServiceUrl(), webClient);
handler.setHttpMethod(HttpMethod.POST);
handler.setCharset(StandardCharsets.UTF_8.displayName());
handler.setOutputChannel(httpResponseChannel());
handler.setExpectedResponseType(String.class);
handler.setAdviceChain(singletonList(expressionAdvice()));
return handler;
}
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
advice.setFailureChannel(errorChannel());
return advice;
}
@Bean
public IntegrationFlow responseFlow() {
return IntegrationFlows.from(httpResponseChannel())
.handle(message -> {
MessageHeaders messageHeaders = message.getHeaders();
File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
String originalFileName = (String) messageHeaders.get(FILENAME);
if (originalFile != null && originalFileName != null) {
File jsonSuccessFolder = new File(batchConfigurationProperties.getJsonSuccessFolder());
File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);
try {
Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
} catch (IOException e) {
log.error("Could not move file", e);
}
}
})
.get();
}
@Bean
public IntegrationFlow failureFlow() {
return IntegrationFlows.from(errorChannel())
.handle(message -> {
Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();
if (failedMessage != null) {
File originalFile = (File) failedMessage.getHeaders().get(ORIGINAL_FILE);
String originalFileName = (String) failedMessage.getHeaders().get(FILENAME);
if (originalFile != null && originalFileName != null) {
File jsonErrorFolder = new File(batchConfigurationProperties.getJsonErrorFolder());
File jsonErrorFile = new File(jsonErrorFolder, originalFileName);
try {
Files.move(originalFile.toPath(), jsonErrorFile.toPath());
} catch (IOException e) {
log.error("Could not move file", originalFileName, e);
}
}
}
})
.get();
}
}
测试
@RunWith(SpringRunner.class)
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
@Import({JsonToRestIntegration.class, BatchConfigurationProperties.class})
@JsonTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JsonToRestIntegrationIT {
private static final FilenameFilter JSON_FILENAME_FILTER = (dir, name) -> name.endsWith(".json");
@Autowired
private BatchConfigurationProperties batchConfigurationProperties;
@Autowired
private ObjectMapper om;
@Autowired
private MessageHandler reactiveOutbound;
@Autowired
private DirectChannel httpResponseChannel;
@Autowired
private DirectChannel errorChannel;
@Autowired
private FileReadingMessageSource fileReadingMessageSource;
@Autowired
private SourcePollingChannelAdapter fileReadingEndpoint;
@MockBean
private LoadBalancerExchangeFilterFunction lbFunction;
private String jsonImportPath;
private String jsonSuccessPath;
private String jsonErrorPath;
@Before
public void setUp() throws Exception {
jsonImportPath = batchConfigurationProperties.getJsonImportFolder();
jsonSuccessPath = batchConfigurationProperties.getJsonSuccessFolder();
jsonErrorPath = batchConfigurationProperties.getJsonErrorFolder();
Files.createDirectories(Paths.get(jsonImportPath));
Files.createDirectories(Paths.get(jsonSuccessPath));
Files.createDirectories(Paths.get(jsonErrorPath));
}
@After
public void tearDown() throws Exception {
FileUtils.deleteDirectory(new File(jsonImportPath));
FileUtils.deleteDirectory(new File(jsonSuccessPath));
FileUtils.deleteDirectory(new File(jsonErrorPath));
}
@Test
public void shouldMoveJsonFileToSuccessFolderWhenHttpResponseIsOk() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
latch.countDown();
super.postSend(message, channel, sent);
}
});
errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
fail();
}
});
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
DataBufferFactory bufferFactory = response.bufferFactory();
String valueAsString = null;
try {
valueAsString = om.writeValueAsString(new ResponseDto("1"));
} catch (JsonProcessingException e) {
fail();
}
return response.writeWith(Mono.just(bufferFactory.wrap(valueAsString.getBytes())))
.then(Mono.defer(response::setComplete));
});
WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();
new DirectFieldAccessor(this.reactiveOutbound)
.setPropertyValue("webClient", webClient);
File jsonFile = new ClassPathResource("/test.json").getFile();
Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
Files.copy(jsonFile.toPath(), targetFilePath);
fileReadingEndpoint.start();
assertThat(latch.await(12, TimeUnit.SECONDS), is(true));
File[] jsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonImportFolder, is(notNullValue()));
assertThat(filesInJsonImportFolder.length, is(0));
File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonSuccessFolder, is(notNullValue()));
assertThat(filesInJsonSuccessFolder.length, is(1));
File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonErrorFolder, is(notNullValue()));
assertThat(filesInJsonErrorFolder.length, is(0));
}
@Test
public void shouldMoveJsonFileToErrorFolderWhenHttpResponseIsNotOk() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
latch.countDown();
super.postSend(message, channel, sent);
}
});
httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
fail();
}
});
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.BAD_REQUEST);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("SOME BAD REQUEST".getBytes())))
.then(Mono.defer(response::setComplete));
});
WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();
new DirectFieldAccessor(this.reactiveOutbound)
.setPropertyValue("webClient", webClient);
File jsonFile = new ClassPathResource("/error.json").getFile();
Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
Files.copy(jsonFile.toPath(), targetFilePath);
fileReadingEndpoint.start();
assertThat(latch.await(11, TimeUnit.SECONDS), is(true));
File[] filesInJsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonImportFolder, is(notNullValue()));
assertThat(filesInJsonImportFolder.length, is(0));
File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonSuccessFolder, is(notNullValue()));
assertThat(filesInJsonSuccessFolder.length, is(0));
File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
assertThat(filesInJsonErrorFolder, is(notNullValue()));
assertThat(filesInJsonErrorFolder.length, is(1));
}
}
最佳答案
this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);
这个方法的第一个参数是一个endpoint id
。 (我想我们只是缺少有关这些方法的 Javadocs...)。
所以,你需要的是这样的东西:
.handle(reactiveOutbound(), e -> e.id("webFluxEndpoint"))
然后在那个测试用例中你做:
this.mockIntegrationContext.substituteMessageHandlerFor("webFluxEndpoint", reactiveOutbound);
您不需要在测试类配置中覆盖 bean。 MockMessageHandler
可以只在测试方法体中使用。
您通过 .from(fileReadingMessageSource())
轮询文件。要进行手动控制,您需要在开始时将其停止。为此,您再次添加一个 endpoint id
:
.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
然后在测试配置中执行此操作:
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
WebFlux 的另一种方法是通过自定义的 WebClient
来模拟对服务器的请求。例如:
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())))
.then(Mono.defer(response::setComplete));
});
WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();
new DirectFieldAccessor(this.reactiveOutbound)
.setPropertyValue("webClient", webClient);
关于java - 如何使用 MockIntegrationContext.substituteMessageHandlerFor 模拟 WebFluxRequestExecutingMessageHandler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52114448/
我在网上搜索但没有找到任何合适的文章解释如何使用 javascript 使用 WCF 服务,尤其是 WebScriptEndpoint。 任何人都可以对此给出任何指导吗? 谢谢 最佳答案 这是一篇关于
我正在编写一个将运行 Linux 命令的 C 程序,例如: cat/etc/passwd | grep 列表 |剪切-c 1-5 我没有任何结果 *这里 parent 等待第一个 child (chi
所以我正在尝试处理文件上传,然后将该文件作为二进制文件存储到数据库中。在我存储它之后,我尝试在给定的 URL 上提供文件。我似乎找不到适合这里的方法。我需要使用数据库,因为我使用 Google 应用引
我正在尝试制作一个宏,将下面的公式添加到单元格中,然后将其拖到整个列中并在 H 列中复制相同的公式 我想在 F 和 H 列中输入公式的数据 Range("F1").formula = "=IF(ISE
问题类似于this one ,但我想使用 OperatorPrecedenceParser 解析带有函数应用程序的表达式在 FParsec . 这是我的 AST: type Expression =
我想通过使用 sequelize 和 node.js 将这个查询更改为代码取决于在哪里 select COUNT(gender) as genderCount from customers where
我正在使用GNU bash,版本5.0.3(1)-发行版(x86_64-pc-linux-gnu),我想知道为什么简单的赋值语句会出现语法错误: #/bin/bash var1=/tmp
这里,为什么我的代码在 IE 中不起作用。我的代码适用于所有浏览器。没有问题。但是当我在 IE 上运行我的项目时,它发现错误。 而且我的 jquery 类和 insertadjacentHTMl 也不
我正在尝试更改标签的innerHTML。我无权访问该表单,因此无法编辑 HTML。标签具有的唯一标识符是“for”属性。 这是输入和标签的结构:
我有一个页面,我可以在其中返回用户帖子,可以使用一些 jquery 代码对这些帖子进行即时评论,在发布新评论后,我在帖子下插入新评论以及删除 按钮。问题是 Delete 按钮在新插入的元素上不起作用,
我有一个大约有 20 列的“管道分隔”文件。我只想使用 sha1sum 散列第一列,它是一个数字,如帐号,并按原样返回其余列。 使用 awk 或 sed 执行此操作的最佳方法是什么? Accounti
我需要将以下内容插入到我的表中...我的用户表有五列 id、用户名、密码、名称、条目。 (我还没有提交任何东西到条目中,我稍后会使用 php 来做)但由于某种原因我不断收到这个错误:#1054 - U
所以我试图有一个输入字段,我可以在其中输入任何字符,但然后将输入的值小写,删除任何非字母数字字符,留下“。”而不是空格。 例如,如果我输入: 地球的 70% 是水,-!*#$^^ & 30% 土地 输
我正在尝试做一些我认为非常简单的事情,但出于某种原因我没有得到想要的结果?我是 javascript 的新手,但对 java 有经验,所以我相信我没有使用某种正确的规则。 这是一个获取输入值、检查选择
我想使用 angularjs 从 mysql 数据库加载数据。 这就是应用程序的工作原理;用户登录,他们的用户名存储在 cookie 中。该用户名显示在主页上 我想获取这个值并通过 angularjs
我正在使用 autoLayout,我想在 UITableViewCell 上放置一个 UIlabel,它应该始终位于单元格的右侧和右侧的中心。 这就是我想要实现的目标 所以在这里你可以看到我正在谈论的
我需要与 MySql 等效的 elasticsearch 查询。我的 sql 查询: SELECT DISTINCT t.product_id AS id FROM tbl_sup_price t
我正在实现代码以使用 JSON。 func setup() { if let flickrURL = NSURL(string: "https://api.flickr.com/
我尝试使用for循环声明变量,然后测试cols和rols是否相同。如果是,它将运行递归函数。但是,我在 javascript 中执行 do 时遇到问题。有人可以帮忙吗? 现在,在比较 col.1 和
我举了一个我正在处理的问题的简短示例。 HTML代码: 1 2 3 CSS 代码: .BB a:hover{ color: #000; } .BB > li:after {
我是一名优秀的程序员,十分优秀!