gpt4 book ai didi

spring-integration - Spring 集成 : How processing files sequentially

转载 作者:行者123 更新时间:2023-12-04 07:53:47 26 4
gpt4 key购买 nike

我使用“int-file: inbound-channel-adapter”来加载存在于目录中的文件。而且我喜欢按顺序处理文件:这意味着当第一个文件处理完成后,我加载第二个文件......等。

我看到一个 sample但我无法预测处理一个文件所需的时间,这取决于文件的大小。

我的源代码:

    <int-file:inbound-channel-adapter
directory="${directory.files.local}" id="filesIn" channel="channel.filesIn">
<int:poller fixed-delay="1000" max-messages-per-poll="1" />

</int-file:inbound-channel-adapter>

一个文件的流程是file:inbound-channel-adapter--->transformer--->splitter---->http:outbound-gateway--->outbound-mail-adapter---->一个文件的处理完成,那么此时,我是下一个要处理的文件。

我的项目配置太复杂了。下面,我向您展示更多配置:配置的第一部分是:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
auto-startup="true" channel="receiveChannel" session-factory="sftpSessionFactory"
local-directory="file:${directory.files.local}" remote-directory="${directory.files.remote}"
auto-create-local-directory="true" delete-remote-files="true"
filename-regex=".*\.txt$">
<int:poller fixed-delay="${sftp.interval.request}"
max-messages-per-poll="-1" />
</int-sftp:inbound-channel-adapter>
<!-- <int:poller cron="0 * 17 * * ?"></int:poller> -->

<int-file:inbound-channel-adapter
filter="compositeFileFilter" directory="${directory.files.local}" id="filesIn"
channel="channel.filesIn" prevent-duplicates="true">
<int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-file:inbound-channel-adapter>

<int:transformer input-channel="channel.filesIn"
output-channel="channel.file.router" ref="fileTransformer" method="transform" />

<int:recipient-list-router id="fileRouter"
input-channel="channel.file.router">

<int:recipient channel="channel.empty.files"
selector-expression="payload.length()==0" />
<int:recipient channel="channel.filesRejected"
selector-expression="payload.toString().contains('rejected')" />
<int:recipient channel="toSplitter"
selector-expression="(payload.length()>0) and(!payload.toString().contains('rejected'))" />

</int:recipient-list-router>

然后从 channel 到splitter,我的程序逐行读取一个文件:

    <int-file:splitter input-channel="toSplitter"
output-channel="router" requires-reply="false" />

<int:recipient-list-router id="recipentRouter"
input-channel="router">

<int:recipient channel="channelA"
selector-expression="headers['file_name'].startsWith('${filenameA.prefix}')" />

<int:recipient channel="channelB"
selector-expression="headers['file_name'].startsWith('${filenameB.prefix}')" />

</int:recipient-list-router>

每个 channel A 和 B 应该为每条线路调用两个不同的 WS。每个文件都使用异步调用 ws 代码如下文件 A:

<int:header-enricher input-channel="channelA"
output-channel="channelA.withHeader">
<int:header name="content-type" value="application/json" />
<int:header name="key1" expression="payload.split('${line.column.separator}')[0]" />
<int:header name="key2" expression="payload"></int:header>
</int:header-enricher>

<int:transformer input-channel="channelA.withHeader"
output-channel="channelA.request" ref="imsiMsgTransformer"
method="transform">
</int:transformer>


<int:channel id="channelA.request">
<int:queue capacity="10" />

<int-http:outbound-gateway id="maspUpdatorSimChangedGateway"
request-channel="channelA.request"
url="${url}"
http-method="PUT" expected-response-type="java.lang.String" charset="UTF-8"
reply-timeout="${ws.reply.timeout}" reply-channel="channelA.reply">
<int-http:uri-variable name="foo" expression="headers['key1']" />
<int:poller fixed-delay="1000" error-channel="channelA.error"
task-executor="executorA" />

<int-http:request-handler-advice-chain>
<int:retry-advice max-attempts="${ws.max.attempts}"
recovery-channel="recovery.channelA">
<int:fixed-back-off interval="${ws.interval.attempts}" />
</int:retry-advice>
</int-http:request-handler-advice-chain>

</int-http:outbound-gateway>

<int:service-activator input-channel="recovery.channelA"
ref="updateImsiHttpResponseErrorHandler" method="handleMessage" output-channel="updateImsi.channel.error.toenricher">
</int:service-activator>

<int:service-activator input-channel="channelA.reply"
ref="updateImsiHttpResponseMessageHandler" method="handleMessage">
<int:poller fixed-delay="1000"></int:poller>
</int:service-activator>

在(回复 channel 和恢复 channel )的每个激活器中,我计算文件的进度,直到文件完成此时我应该加载第二个文件 A2 或文件 B ...等

最佳答案

这是默认行为,只要

  1. 轮询器没有 task-executor (你的没有)。
  2. DirectChannel s(默认值)在适配器的下游使用 - 这意味着没有 QueueChannel s 或 ExecutorChannel (即 channel 上没有 task-executor<queue/>)。

在这种情况下,在当前轮询完成之前甚至不会考虑下一次轮询 - 流程在轮询器线程上运行并且一次只能处理一个轮询。

fixed-delay直到当前文件被完全处理后才开始。

编辑

如果你需要在流上使用异步处理,你需要使用一个Conditional Poller或者一个简单的 PollSkipAdvice .

您将提供 PollSkipStrategy在文件完成之前将返回 false 的实现。

这样,在您做出决定之前将跳过后续投票。

EDIT2

像这样的……

/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.scheduling;

/**
* @author Gary Russell
* @since 4.3
*
*/
public class SimplePollSkipStrategy implements PollSkipStrategy {

private volatile boolean skip;

@Override
public boolean skipPoll() {
return this.skip;
}

public void skipPolls() {
this.skip = true;
}

public void reset() {
this.skip = false;
}
}
  • 将其添加为 <bean/>根据您的情况。
  • 使用 PollSkipAdvice 将其添加到轮询器的建议链中
  • 如果您想跳过投票,请调用 skipPolls() .
  • 完成文件后,请调用 reset() .

关于spring-integration - Spring 集成 : How processing files sequentially,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34332972/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com