gpt4 book ai didi

java - Spring Integration Inbound-Channel-Adapter 逐行读取大文件

转载 作者:搜寻专家 更新时间:2023-11-01 03:04:34 26 4
gpt4 key购买 nike

我目前正在使用 Spring Integration 4.1.0 和 Spring 4.1.2。我需要能够逐行读取文件并将读取的每一行用作消息。基本上我想允许“重播”我们的消息源之一,但消息不会保存在单个文件中,而是保存在单个文件中。我对这个用例没有交易要求。我的要求与此帖子类似,除了驻留在与运行 JVM 的服务器相同的服务器上的文件:spring integration - read a remote file line by line

在我看来,我有以下选择:

1.使用 int-file:inbound-channel-adapter 读取文件,然后“拆分”该文件,这样 1 条消息现在变成多条消息。示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>

问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM 会耗尽堆空间。实际上所需的步骤是:读取一行并将行转换为消息、发送消息、从内存中删除消息、重复。

  1. int-file:tail-inbound-channel-adapterend="false" 一起使用(这基本上表示从文件的开头读取).根据每个文件的需要启动和停止此适配器(在每次启动前更改文件名)。示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
    http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int-file:tail-inbound-channel-adapter id="apache"
    channel="exchangeSpringQueueChannel"
    task-executor="exchangeFileReplayTaskExecutor"
    file="C:\p2-test.txt"
    delay="1"
    end="false"
    reopen="true"
    file-delay="10000" />

    <int:channel id="exchangeSpringQueueChannel" />
    <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
  2. 让 Spring Integration 调用 Spring Batch 并使用 ItemReader 来处理文件。当然允许对整个过程进行更细粒度的控制,但需要大量工作来设置作业存储库等(而且我不关心作业历史,所以我要么告诉作业不要记录状态和/或使用内存中的 MapJobRepository)。

4.通过扩展 MessageProducerSupport 创建我自己的 FileLineByLineInboundChannelAdapter。大部分代码可以从 ApacheCommonsFileTailingMessageProducer 中借用(另请参阅 http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。下面是一个示例,但需要做一些工作才能将读取内容放入它自己的 Thread 中,以便我在逐行读取时遵守 stop() 命令。

    package com.xxx.exchgateway.common.util.springintegration;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;

/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}

protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}

@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}

private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());

BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

String strLine;

// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}

//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Override
protected void doStart() {
super.doStart();

// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}

protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}

@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}

在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。不过,我进行了相当多的搜索,并查看了很多示例,不幸的是,我还没有找到适合我需要的内容。

我假设我可能错过了框架已经提供的一些明显的东西(尽管这可能属于 Spring Integraton 和 Spring Batch 之间的模糊界限)。有人可以让我知道我的想法是否完全偏离实际,或者是否有一个我错过的简单解决方案,或者提供替代建议?

最佳答案

Spring Integration 4.x 有一个很好的新特性,使用 Iterator 作为消息:

Spring Integration Reference

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.

这允许将 Iterator 作为消息发送,而不是将整个文件读入内存。

Here is Spring 上下文将 CSV 文件拆分为每行一条消息的简单示例:

<int-file:inbound-channel-adapter 
directory="${inputFileDirectory:/tmp}"
channel="inputFiles"/>

<int:channel id="inputFiles">
<int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter
input-channel="inputFiles"
output-channel="output">
<bean
class="FileSplitter"
p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor
id="executor"
pool-size="${poolSize:8}"
queue-capacity="${aueueCapacity:0}"
rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

这是 splitter implementation :

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

private String commentPrefix = "#";

public Object splitMessage(Message<?> message) {
if(log.isDebugEnabled()) {
log.debug(message.toString());
}
try {

Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");

return new BufferedReaderFileIterator((File) payload);
}
catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}

public void setCommentPrefix(String commentPrefix) {
this.commentPrefix = commentPrefix;
}

public class BufferedReaderFileIterator implements Iterator<String> {

private File file;
private BufferedReader bufferedReader;
private String line;

public BufferedReaderFileIterator(File file) throws IOException {
this.file = file;
this.bufferedReader = new BufferedReader(new FileReader(file));
readNextLine();
}

@Override
public boolean hasNext() {
return line != null;
}

@Override
public String next() {
try {
String res = this.line;
readNextLine();
return res;
}
catch (IOException e) {
log.error("Error reading file", e);
throw new RuntimeException(e);
}
}

void readNextLine() throws IOException {
do {
line = bufferedReader.readLine();
}
while(line != null && line.trim().startsWith(commentPrefix));

if(log.isTraceEnabled()) {
log.trace("Read next line: {}", line);
}

if(line == null) {
close();
}
}

void close() throws IOException {
bufferedReader.close();
file.delete();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

}

}

请注意从 splitMessage() 处理程序方法返回的 Iterator 对象。

关于java - Spring Integration Inbound-Channel-Adapter 逐行读取大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27064737/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com