- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我使用“int-file: inbound-channel-adapter”来加载存在于目录中的文件。而且我喜欢按顺序处理文件:这意味着当第一个文件处理完成后,我加载第二个文件......等。
我看到一个 sample但我无法预测处理一个文件所需的时间,这取决于文件的大小。
我的源代码:
<int-file:inbound-channel-adapter
directory="${directory.files.local}" id="filesIn" channel="channel.filesIn">
<int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-file:inbound-channel-adapter>
一个文件的流程是file:inbound-channel-adapter--->transformer--->splitter---->http:outbound-gateway--->outbound-mail-adapter---->一个文件的处理完成,那么此时,我是下一个要处理的文件。
我的项目配置太复杂了。下面,我向您展示更多配置:配置的第一部分是:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
auto-startup="true" channel="receiveChannel" session-factory="sftpSessionFactory"
local-directory="file:${directory.files.local}" remote-directory="${directory.files.remote}"
auto-create-local-directory="true" delete-remote-files="true"
filename-regex=".*\.txt$">
<int:poller fixed-delay="${sftp.interval.request}"
max-messages-per-poll="-1" />
</int-sftp:inbound-channel-adapter>
<!-- <int:poller cron="0 * 17 * * ?"></int:poller> -->
<int-file:inbound-channel-adapter
filter="compositeFileFilter" directory="${directory.files.local}" id="filesIn"
channel="channel.filesIn" prevent-duplicates="true">
<int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-file:inbound-channel-adapter>
<int:transformer input-channel="channel.filesIn"
output-channel="channel.file.router" ref="fileTransformer" method="transform" />
<int:recipient-list-router id="fileRouter"
input-channel="channel.file.router">
<int:recipient channel="channel.empty.files"
selector-expression="payload.length()==0" />
<int:recipient channel="channel.filesRejected"
selector-expression="payload.toString().contains('rejected')" />
<int:recipient channel="toSplitter"
selector-expression="(payload.length()>0) and(!payload.toString().contains('rejected'))" />
</int:recipient-list-router>
然后从 channel 到splitter,我的程序逐行读取一个文件:
<int-file:splitter input-channel="toSplitter"
output-channel="router" requires-reply="false" />
<int:recipient-list-router id="recipentRouter"
input-channel="router">
<int:recipient channel="channelA"
selector-expression="headers['file_name'].startsWith('${filenameA.prefix}')" />
<int:recipient channel="channelB"
selector-expression="headers['file_name'].startsWith('${filenameB.prefix}')" />
</int:recipient-list-router>
每个 channel A 和 B 应该为每条线路调用两个不同的 WS。每个文件都使用异步调用 ws 代码如下文件 A:
<int:header-enricher input-channel="channelA"
output-channel="channelA.withHeader">
<int:header name="content-type" value="application/json" />
<int:header name="key1" expression="payload.split('${line.column.separator}')[0]" />
<int:header name="key2" expression="payload"></int:header>
</int:header-enricher>
<int:transformer input-channel="channelA.withHeader"
output-channel="channelA.request" ref="imsiMsgTransformer"
method="transform">
</int:transformer>
<int:channel id="channelA.request">
<int:queue capacity="10" />
<int-http:outbound-gateway id="maspUpdatorSimChangedGateway"
request-channel="channelA.request"
url="${url}"
http-method="PUT" expected-response-type="java.lang.String" charset="UTF-8"
reply-timeout="${ws.reply.timeout}" reply-channel="channelA.reply">
<int-http:uri-variable name="foo" expression="headers['key1']" />
<int:poller fixed-delay="1000" error-channel="channelA.error"
task-executor="executorA" />
<int-http:request-handler-advice-chain>
<int:retry-advice max-attempts="${ws.max.attempts}"
recovery-channel="recovery.channelA">
<int:fixed-back-off interval="${ws.interval.attempts}" />
</int:retry-advice>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<int:service-activator input-channel="recovery.channelA"
ref="updateImsiHttpResponseErrorHandler" method="handleMessage" output-channel="updateImsi.channel.error.toenricher">
</int:service-activator>
<int:service-activator input-channel="channelA.reply"
ref="updateImsiHttpResponseMessageHandler" method="handleMessage">
<int:poller fixed-delay="1000"></int:poller>
</int:service-activator>
在(回复 channel 和恢复 channel )的每个激活器中,我计算文件的进度,直到文件完成此时我应该加载第二个文件 A2 或文件 B ...等
最佳答案
这是默认行为,只要
task-executor
(你的没有)。DirectChannel
s(默认值)在适配器的下游使用 - 这意味着没有 QueueChannel
s 或 ExecutorChannel
(即 channel 上没有 task-executor
或 <queue/>
)。在这种情况下,在当前轮询完成之前甚至不会考虑下一次轮询 - 流程在轮询器线程上运行并且一次只能处理一个轮询。
fixed-delay
直到当前文件被完全处理后才开始。
编辑
如果你需要在流上使用异步处理,你需要使用一个Conditional Poller或者一个简单的 PollSkipAdvice .
您将提供 PollSkipStrategy
在文件完成之前将返回 false 的实现。
这样,在您做出决定之前将跳过后续投票。
EDIT2
像这样的……
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.scheduling;
/**
* @author Gary Russell
* @since 4.3
*
*/
public class SimplePollSkipStrategy implements PollSkipStrategy {
private volatile boolean skip;
@Override
public boolean skipPoll() {
return this.skip;
}
public void skipPolls() {
this.skip = true;
}
public void reset() {
this.skip = false;
}
}
<bean/>
根据您的情况。PollSkipAdvice
将其添加到轮询器的建议链中skipPolls()
.reset()
.关于spring-integration - Spring 集成 : How processing files sequentially,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34332972/
我想知道是否可以访问放在 tomcat 的 conf 文件夹中的文件。通常我会在这个文件中放置多个 webapp 的配置,在 war 之外。 我想使用类路径独立于文件系统。 我过去使用过 lib 文件
我有一个 PowerShell 脚本,它获取文件列表并移动满足特定条件的文件。为什么即使对象为空,foreach 循环也会运行? 我假设如果 $i 不存在,它就不会运行。但是如果 $filePath
我已将 BasicAccountRule.drl 放置在我的 Web 应用程序中,位置为:C:/workspace/exim_design/src/main/resources/rules/drl/i
我使用 File.open('file.txt').class 和 File.open('file.txt').readlines.class 以及前者进行了检查一个返回 File,后者返回 Arra
我正在尝试使用 FileOutputStream 删除文件,在其中写入内容后。这是我用来编写的代码: private void writeContent(File file, String fileC
我正在尝试使用 flink 和 python 批处理 api 测试 Wordcount 经典示例。我的问题是,将数据源从 env.from_elements() 修改为 env.read_text()
我正在尝试制作一个可以同时处理多个不同文件的程序。我的想法是制作一个包含 20 个 FILE* 的数组,以便在我达到此限制时能够关闭其中一个并打开请求的新文件。 为此,我想到了一个函数,它选择一个选项
我有两个文件A和B文件A: 976464 792992 文件B TimeStamp,Record1,976464,8383,ABCD 我想搜索文件 A 和文件 B 中的每条记录并打印匹配的记录。打印的
我有一些保存在 map 中的属性文件。示例: Map map = new HashMap<>(); map.put("1", "One"); map.put("2", "Two"); map.put(
我正在尝试找出一个脚本文件,该文件接受一个包含文件列表的文件(每一行都是一个文件路径,即 path/to/file)并将它们合并到一个文件中。 例如: list.text -- path/to/fil
为了使用 File.CreateText() 和 File.AppendText() 你必须: 通过调用这些方法之一打开流 写消息 关闭流 处理流 为了使用 File.AppendAllText()
使用rsync时,如何在使用--files-from参数复制时重命名文件?我有大约190,000个文件,在从源复制到目标时,每个文件都需要重命名。我计划将文件列表放在一个文本文件中传递给--files
我在非服务器应用程序中使用 Spring(只需从 Eclipse 中某个类的 main() 编译并运行它)。 我的问题是作为 new FileSystemXmlApplicationContext 的
QNX (Neutrino 6.5.0) 使用 ksh 的开源实现作为其 shell 。许多提供的脚本,包括系统启动脚本,都使用诸如 if ! test /dev/slog -ef /dev/slog
当我尝试打开从我的应用程序下载的 xls 文件时,出现此错误: excel cannot open the file because the file format or file extension
有一些相关的概念,即文件指针、流和文件描述符。 我知道文件指针是指向数据类型 FILE 的指针(在例如 FILE.h 和 struct_FILE.h 中声明)。 我知道文件描述符是 int ,例如成员
好吧,这应该很容易... 我是groovy的新手,我希望实现以下逻辑: def testFiles = findAllTestFiles(); 到目前为止,我想出了下面的代码,该代码可以成功打印所有文
我理解为什么以下内容会截断文件的内容: Get-Content | Out-File 这是因为 Out-File 首先运行,它会在 Get-Content 有机会读取文件之前清空文件。 但是当我尝
您好,我正在尝试将文件位置表示为变量,因为最终脚本将在另一台机器上运行。这是我尝试过的代码,然后是我得到的错误。在我看来,python 是如何添加“\”的,这就是导致问题的原因。如果是这种情况,我如何
我有一个只包含一行的输入文件: $ cat input foo bar 我想在我的脚本中使用这一行,据我所知有 3 种方法: line=$(cat input) line=$( input"...,
我是一名优秀的程序员,十分优秀!