gpt4 book ai didi

java - 多线程 FTP 输入流的输出不一致

转载 作者:行者123 更新时间:2023-12-01 11:27:44 27 4
gpt4 key购买 nike

我正在尝试创建一个 java 程序,将某些 Assets 文件从 FTP 服务器下载到本地文件。由于我的(免费)FTP 服务器不支持超过几兆字节的文件大小,因此我决定在上传文件时将其拆分,并在程序下载文件时重新组合它们。这可行,但速度相当慢,因为对于每个文件,它都必须获取InputStream,这需要一些时间。

我使用的 FTP 服务器有一种无需实际登录服务器即可下载文件的方法,因此我使用此代码来获取 InputStream:

private static final InputStream getInputStream(String file) throws IOException {
return new URL("http://site.website.com/path/" + file).openStream();
}

要获取 Assets 文件一部分的InputStream,我使用以下代码:

public static InputStream getAssetInputStream(String asset, int num) throws IOException, FTPException {
try {
return getInputStream("assets/" + asset + "_" + num + ".raf");
} catch (Exception e) {
// error handling
}
}

由于 getAssetInputStreams(String, int) 方法需要一些时间来运行(特别是文件大小超过一兆字节时),我决定将实际下载文件的代码编写为多字节线程。这就是我的问题所在。

final Map<Integer, Boolean> done = new HashMap<Integer, Boolean>();
final Map<Integer, byte[]> parts = new HashMap<Integer, byte[]>();

for (int i = 0; i < numParts; i++) {
final int part = i;
done.put(part, false);

new Thread(new Runnable() {
@Override
public void run() {
try {
InputStream is = FTP.getAssetInputStream(asset, part);
ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buf = new byte[DOWNLOAD_BUFFER_SIZE];
int len = 0;

while ((len = is.read(buf)) > 0) {
baos.write(buf, 0, len);
curDownload.addAndGet(len);
totAssets.addAndGet(len);
}

parts.put(part, baos.toByteArray());
done.put(part, true);
} catch (IOException e) {
// error handling
} catch (FTPException e) {
// error handling
}
}
}, "Download-" + asset + "-" + i).start();
}

while (done.values().contains(false)) {
try {
Thread.sleep(100);
} catch(InterruptedException e) {
e.printStackTrace();
}
}

File assetFile = new File(dir, "assets/" + asset + ".raf");
assetFile.createNewFile();
FileOutputStream fos = new FileOutputStream(assetFile);

for (int i = 0; i < numParts; i++) {
fos.write(parts.get(i));
}

fos.close();

此代码有效,但并不总是有效。当我在台式计算机上运行它时,它几乎总是有效。不是 100% 的时候,但通常效果很好。在我的笔记本电脑上,它的互联网连接要差得多,它几乎无法工作。结果是文件不完整。有时,它会下载 50% 的文件。有时,它会下载 90% 的文件,每次都不同。

现在,如果我将 .start() 替换为 .run(),则代码 100% 正常工作,即使在我的笔记本电脑上也是如此。然而,它的速度非常慢,所以我宁愿不使用 .run()

有没有办法可以更改我的代码,使其能够多线程工作?任何帮助将不胜感激。

最佳答案

首先,更换您的 FTP 服务器,有很多免费的 FTP 服务器支持任意文件大小,并具有附加功能,但我离题了...

您的代码似乎存在许多不相关的问题,这些问题都可能导致您所看到的行为,如下所述:

  1. 通过多个线程的不 protected /不同步访问来访问 doneparts 映射会产生竞争条件。这可能会导致数据损坏以及线程之间这些变量失去同步,从而可能导致 done.values().contains(false) 返回 true,即使实际上并非如此。

  2. 您频繁地重复调用 done.values().contains()。虽然 javadoc 没有明确说明,但 HashMap 可能会以 O(n) 方式遍历每个值,以检查给定映射是否包含某个值。再加上其他线程正在修改映射,您将得到未定义的行为。根据 values() javadoc:

    If the map is modified while an iteration over the collection is in progress (except through the iterator's own remove operation), the results of the iteration are undefined.

  3. 您以某种方式调用new URL("http://site.website.com/path/"+ file).openStream();,但声明您正在使用 FTP。链接中的 http:// 定义了 openStream() 尝试打开的协议(protocol),并且 http:// 不是 ftp ://。不确定这是一个拼写错误还是您指的是 HTTP(或者您是否有提供相同文件的 HTTP 服务器)。

  4. 任何线程引发任何类型的异常都会导致代码失败,因为并非所有部分都会“完成”(基于您的忙等待循环设计)。当然,您可能会编辑一些其他逻辑来防止这种情况,但否则这是代码的潜在问题。

  5. 您没有关闭任何已打开的流。这可能意味着底层套接字本身也保持打开状态。这不仅会造成资源泄漏,如果服务器本身有某种最大同时连接数限制,则只会导致新连接失败,因为旧的已完成的传输不会关闭。

基于上述问题,我建议将下载逻辑移至 Callable 任务中,并通过 ExecutorService 运行它们,如下所示:

LinkedList<Callable<byte[]>> tasksToExecute = new LinkedList<>();

// Populate tasks to run
for(int i = 0; i < numParts; i++){
final int part = i;

// Lambda to
tasksToExecute.add(() -> {
InputStream is = null;

try{
is = FTP.getAssetInputStream(asset, part);
ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buf = new byte[DOWNLOAD_BUFFER_SIZE];
int len = 0;

while((len = is.read(buf)) > 0){
baos.write(buf, 0, len);
curDownload.addAndGet(len);
totAssets.addAndGet(len);
}

return baos.toByteArray();
}catch(IOException e){
// handle exception
}catch(FTPException e){
// handle exception
}finally{
if(is != null){
try{
is.close();
}catch(IOException ignored){}
}
}

return null;
});
}

// Retrieve an ExecutorService instance, note the use of work stealing pool is Java 8 only
// This can be substituted for newFixedThreadPool(nThreads) for Java < 8 as well for tight control over number of simultaneous links
ExecutorService executor = Executors.newWorkStealingPool(4);

// Tells the executor to execute all the tasks and give us the results
List<Future<byte[]>> resultFutures = executor.invokeAll(tasksToExecute);

// Populates the file
File assetFile = new File(dir, "assets/" + asset + ".raf");
assetFile.createNewFile();

try(FileOutputStream fos = new FileOutputStream(assetFile)){
// Iterate through the futures, writing them to file in order
for(Future<byte[]> result : resultFutures){
byte[] partData = result.get();

if(partData == null){
// exception occured during downloading this part, handle appropriately
}else{
fos.write(partData);
}
}
}catch(IOException ex(){
// handle exception
}

使用执行器服务,您可以进一步优化多线程场景,因为只要片段(按顺序)可用,输出文件就会开始写入,并且线程本身会被重用,以节省线程创建成本。

如上所述,可能存在太多同时链接导致服务器拒绝连接的情况(或者更危险的是,编写 EOF 让您认为该部分已下载)。在这种情况下,可以通过 newFixedThreadPool(nThreads) 调整工作线程的数量,以确保在任何给定时间,只能同时发生 nThreads 数量的下载。

关于java - 多线程 FTP 输入流的输出不一致,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30683475/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com