gpt4 book ai didi

java - Akka + Camel + FTP2 + localWorkingDirectory 不能可靠地工作

转载 作者:太空狗 更新时间:2023-10-29 11:46:01 28 4
gpt4 key购买 nike

我在使用 Camel 的 FTP2 组件时遇到问题,消费者参与者生活在 Akka 系统中。

基本思想是监视文件的 FTP 目录,然后生成一个子 actor 来单独处理每个文件。 Akka 被用来管理并发性和可靠性。父消费者 actor 使用 noop=true 轮询目录,因此它什么都不做,然后子消费者 actor 应该下载文件,并使用“include”Camel 选项进行过滤。重要的是下载是并发的,并且文件没有加载到内存中(因此使用 localWorkDirectory)。

我写了一个简单的重现:

package camelrepro;

import java.io.InputStream;

import org.mockftpserver.core.command.Command;
import org.mockftpserver.core.command.ReplyCodes;
import org.mockftpserver.core.session.Session;
import org.mockftpserver.core.session.SessionKeys;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockftpserver.fake.UserAccount;
import org.mockftpserver.fake.command.AbstractFakeCommandHandler;
import org.mockftpserver.fake.filesystem.FileEntry;
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.testkit.JavaTestKit;

public class Main {

public static class ParentActor extends UntypedConsumerActor {

public ParentActor() {
System.out.println("Parent started");
}
@Override
public String getEndpointUri() {
return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true";
}

@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CamelMessage) {
getContext().actorOf(new Props(ChildActor.class), "0");
} else {
unhandled(msg);
}
}
}

public static class ChildActor extends UntypedConsumerActor {

public ChildActor() {
System.out.println("Child started");
}

@Override
public String getEndpointUri() {
return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp";
}

@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CamelMessage) {
System.out.println("Child got message");
CamelMessage camelMsg = (CamelMessage) msg;

InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext());
System.out.println(source.getClass().getName());
System.exit(0);
} else {
unhandled(msg);
}
}
}

public static void main(String[] args) {

ActorSystem system = ActorSystem.create("default");

FakeFtpServer ftpServer = new FakeFtpServer();
UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem();
ftpServer.setFileSystem(ftpFileSystem);
ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/"));
ftpServer.setServerControlPort(8021);

// fix bug in PWD handling (either Apache FTP client or mock server depending on opinion)
ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() {
@Override
protected void handle(Command command, Session session) {
String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY);
this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR;
verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet");
int replyCode = ReplyCodes.PWD_OK;
String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\""));
session.sendReply(replyCode, replyText);
}
});
ftpFileSystem.add(new FileEntry("/test.txt", "hello world"));
ftpServer.start();

new JavaTestKit(system) {{
getSystem().actorOf(new Props(ParentActor.class));
}};
}
}

显示版本的 Maven 依赖项:

    <dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-camel_2.10</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.10</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.mockftpserver</groupId>
<artifactId>MockFtpServer</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>

我希望看到 BufferedInputStream 写入标准输出 - 并检查 ByteArrayInputStream 是否不是。

但相反,我看到文件未找到异常:

[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162)

有几次,它奏效了,让我怀疑它可能是某处的一场比赛。但它几乎总是失败。

有什么线索、想法、建议吗?

FWW:

uname -a: Linux 3.2.0-37-generic #58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
java: 1.7.0_11-b21

最佳答案

我已经找到了上述问题的解决方案。

事实是 child 消费者autoAck()返回 true(默认情况下这样做)。在这种情况下,akka-camel 将发送 CamelMessage即发即弃,然后继续清理。与此同时, child Actor 实际上并没有打开 InputStream直到 getBodyAs() 调用的类型转换器之一打开它。因此,通过 getBodyAs() 打开文件的 child Actor 之间存在竞争。 ,并且 Camel 清理在异步发送消息后删除文件。

因此解决方法是覆盖 autoAck()返回 false,并发送 Ack.getInstance() (或 new Status.Failure(<cause>),如果您愿意)在子消息处理程序的末尾。

关于java - Akka + Camel + FTP2 + localWorkingDirectory 不能可靠地工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14893400/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com