- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我目前正在使用 Spring Integration 4.1.0 和 Spring 4.1.2。我需要能够逐行读取文件并将读取的每一行用作消息。基本上我想允许“重播”我们的消息源之一,但消息不会保存在单个文件中,而是保存在单个文件中。我对这个用例没有交易要求。我的要求与此帖子类似,除了驻留在与运行 JVM 的服务器相同的服务器上的文件:spring integration - read a remote file line by line
在我看来,我有以下选择:
1.使用 int-file:inbound-channel-adapter
读取文件,然后“拆分”该文件,这样 1 条消息现在变成多条消息。示例配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM 会耗尽堆空间。实际上所需的步骤是:读取一行并将行转换为消息、发送消息、从内存中删除消息、重复。
将 int-file:tail-inbound-channel-adapter
与 end="false"
一起使用(这基本上表示从文件的开头读取).根据每个文件的需要启动和停止此适配器(在每次启动前更改文件名)。示例配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:tail-inbound-channel-adapter id="apache"
channel="exchangeSpringQueueChannel"
task-executor="exchangeFileReplayTaskExecutor"
file="C:\p2-test.txt"
delay="1"
end="false"
reopen="true"
file-delay="10000" />
<int:channel id="exchangeSpringQueueChannel" />
<task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
</beans>
让 Spring Integration 调用 Spring Batch 并使用 ItemReader
来处理文件。当然允许对整个过程进行更细粒度的控制,但需要大量工作来设置作业存储库等(而且我不关心作业历史,所以我要么告诉作业不要记录状态和/或使用内存中的 MapJobRepository
)。
4.通过扩展 MessageProducerSupport
创建我自己的 FileLineByLineInboundChannelAdapter
。大部分代码可以从 ApacheCommonsFileTailingMessageProducer
中借用(另请参阅 http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。下面是一个示例,但需要做一些工作才能将读取内容放入它自己的 Thread
中,以便我在逐行读取时遵守 stop()
命令。
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。不过,我进行了相当多的搜索,并查看了很多示例,不幸的是,我还没有找到适合我需要的内容。
我假设我可能错过了框架已经提供的一些明显的东西(尽管这可能属于 Spring Integraton 和 Spring Batch 之间的模糊界限)。有人可以让我知道我的想法是否完全偏离实际,或者是否有一个我错过的简单解决方案,或者提供替代建议?
最佳答案
Spring Integration 4.x 有一个很好的新特性,使用 Iterator 作为消息:
Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.
这允许将 Iterator 作为消息发送,而不是将整个文件读入内存。
Here is Spring 上下文将 CSV 文件拆分为每行一条消息的简单示例:
<int-file:inbound-channel-adapter
directory="${inputFileDirectory:/tmp}"
channel="inputFiles"/>
<int:channel id="inputFiles">
<int:dispatcher task-executor="executor"/>
</int:channel>
<int:splitter
input-channel="inputFiles"
output-channel="output">
<bean
class="FileSplitter"
p:commentPrefix="${commentPrefix:#}" />
</int:splitter>
<task:executor
id="executor"
pool-size="${poolSize:8}"
queue-capacity="${aueueCapacity:0}"
rejection-policy="CALLER_RUNS" />
<int:channel id="output"/>
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
private String commentPrefix = "#";
public Object splitMessage(Message<?> message) {
if(log.isDebugEnabled()) {
log.debug(message.toString());
}
try {
Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
return new BufferedReaderFileIterator((File) payload);
}
catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}
public void setCommentPrefix(String commentPrefix) {
this.commentPrefix = commentPrefix;
}
public class BufferedReaderFileIterator implements Iterator<String> {
private File file;
private BufferedReader bufferedReader;
private String line;
public BufferedReaderFileIterator(File file) throws IOException {
this.file = file;
this.bufferedReader = new BufferedReader(new FileReader(file));
readNextLine();
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
try {
String res = this.line;
readNextLine();
return res;
}
catch (IOException e) {
log.error("Error reading file", e);
throw new RuntimeException(e);
}
}
void readNextLine() throws IOException {
do {
line = bufferedReader.readLine();
}
while(line != null && line.trim().startsWith(commentPrefix));
if(log.isTraceEnabled()) {
log.trace("Read next line: {}", line);
}
if(line == null) {
close();
}
}
void close() throws IOException {
bufferedReader.close();
file.delete();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
请注意从 splitMessage() 处理程序方法返回的 Iterator 对象。
关于java - Spring Integration Inbound-Channel-Adapter 逐行读取大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27064737/
我正在制作一个 android 应用程序,它允许用户在 editText 中输入关键字,当他们点击提交时,下面的 recyclerview 将显示来自 API 请求的结果。 我的 recyclerVi
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
我想在我的 Svelte Kit 应用程序中使用 @sveltejs/adapter-static(想把它变成一个 SPA)。 我使用 npm i @sveltejs/adapter-static 安
我有一个简单的表单,在提交重定向到 AuthController 内的 processAction 之后,在这个 action 中我想要创建一个简单的 table bar。 已编辑: 引用Zend f
当我使用 ListView 时,我扩展了 ArrayAdapter 并将数据(一个列表)存储在 ArrayAdapter 中,因此当数据更改时,我调用: adapter.add(item).; 还有另
我有一个使用 Webpack 捆绑的 commonjs 浏览器应用程序( typescript )。它使用 webrtc,所以我想使用 webrtc-adapter package from npm
在 “DesignPatterns: Elements of Reusable Object-Oriented Software” 一书中谈到 C++ 实现 Adapter 模式时,它是这样的这个:
只需要帮助理解 ZF2 中的一些简单数据库查询。在 ZF1 中,我有这样简单的方法: public function recordset() { // listing of all records $
当使用Spring的Java配置时,你一定会看到像WebMvcConfigurerAdapter这样的类& HandlerInterceptorAdapter它们实现单个接口(interface),并
我将 doctrine2 与 ZF2 一起使用,我的一些库与 Zend\Db\Adapter\Adapter 一起使用,其他的与 doctrine2 一起使用。现在,他们两次连接到数据库。是否可以在原
过去几天我一直在工作灯上工作,现在正在尝试制作连接到数据库的应用程序。以下是截图: 1) Worklight\server\conf\worklight.properties 2) mySQLAdap
几年前,我在 this tutorial 之后开始学习 Zend 框架。 .在那里,它显示映射器是使用 Zend\Db\Adapter\Adapter 创建的。类来获取数据库连接,这就是我使用数据库的
我正在学习 RecyclerView 并在 developer 中的站点 Adapter 类扩展 RecyclerView.Adapter 。实现显示: public class MyAdapter
查看文档:http://docs.spring.io/spring-integration/reference/html/ip.html#tcp-adapters我知道适配器仅用于单向通信。但是,这里
当我移动到另一个 fragment 时,我如何恢复分页适配器项状态?我试过下面的文章,但没有用。 https://medium.com/@florina.muntenescu private fun
我正在使用 capistrano 并收到此错误消息: Please install the pg adapter: `gem install activerecord-pg-adapter` (can
我正在尝试在本地主机上安装 Magento 2,当我想连接数据库时出现错误。 错误是: “Magento\Framework\DB\Adapter\Pdo\MysqlFactory”生成的源类“\Ma
我的 Rails 服务器通过正常的开发脚本运行良好,我计划使用 Netbeans for Rails。当我使用 Netbeans 启动我现有的项目并重新启动我的 Webrick 服务器时,它显示错误
运行 Ruby on Rails (RoR) 应用程序或使用 ActiveRecord 框架的 Ruby 代码,您会收到错误消息: Please install the postgresql adap
我在运行 Redmine2.3-stable 时遇到这个错误,随后无法成功安装 mysql2 适配器。 这是错误: Please install the mysql2 adapter: `ge
我是一名优秀的程序员,十分优秀!