gpt4 book ai didi

java - 不明原因的停止执行

转载 作者:行者123 更新时间:2023-12-01 11:20:17 24 4
gpt4 key购买 nike

我正在开发一个包含 2 个线程的 Java 进程:一个用于读取文件内容并将其添加到一个共享阻塞队列中;另一个用于读取文件内容并将其添加到一个共享阻塞队列中。一种用于从阻塞队列中检索行并通过网络发送它们(在指定的发送速率下)。我的两个类(class)如下:

更新了下面的代码

生产者线程:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;

public class SourceFileProducer implements Runnable {

private File file;

private BufferedReader reader;

private ArrayBlockingQueue<String> buffer;

private String fileName;

private String endMarker;

public SourceFileProducer(ArrayBlockingQueue<String> buffer,
String endMarker, String fileName) {
this.buffer = buffer;
this.endMarker = endMarker;
file = new File(fileName);
if(file.exists()) {
try {
reader = new BufferedReader(new FileReader(file));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
this.fileName = fileName;
}

@Override
public void run() {
System.out.println("SourceFileProducer thread-" + Thread.currentThread().getId() + " initiating with source file: " + fileName);
String line = "";
try {
while((line = reader.readLine()) != null) {
try {
buffer.put(line);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
buffer.put(endMarker);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("SourceFileProducer thread-" + Thread.currentThread().getId() + " scanned and buffered the whole file.");
} catch (IOException e) {
e.printStackTrace();
}
}
}

和消费者线程:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;

public class SourceFileConsumer implements Runnable {

private ArrayBlockingQueue<String> buffer;

private BufferedReader socketInput;

private PrintWriter socketOutput;

private Socket client;

private ServerSocket serverSocket;

private long checkpoint[] = null;

private int rate[] = null;

private String endMarker;

public SourceFileConsumer(ArrayBlockingQueue<String> buffer, String endMarker,
ServerSocket serverSocket, Socket client, long checkpoint[], int rate[]) {
this.buffer = buffer;
this.endMarker = endMarker;
this.client = client;
try {
socketOutput = new PrintWriter(client.getOutputStream(), true);
socketInput = new BufferedReader(new InputStreamReader(client.getInputStream()));
} catch (IOException e) {
e.printStackTrace();
}
this.checkpoint = new long[checkpoint.length];
this.rate = new int[rate.length];
for(int i = 0; i < checkpoint.length; i++) {
this.checkpoint[i] = checkpoint[i];
this.rate[i] = rate[i];
}
this.serverSocket = serverSocket;
}

@Override
public void run() {
String line = null;
long start = System.currentTimeMillis();
int index = 0;
boolean fileScanFlag = true;
while(fileScanFlag) {
long startTimestamp = System.currentTimeMillis();
long interval = (startTimestamp - start) / 1000L;
if(interval >= checkpoint[index]) {
if(index < checkpoint.length - 1) {
if(interval >= checkpoint[index + 1]) {
index += 1;
System.out.println("SourceFileConsumer thread-" + Thread.currentThread().getId() +
" progressed to checkpoint " + index + " with rate: " + rate[index]);
}
}
}
int counter = 0;
while(counter < rate[index]) {
try {
line = buffer.take();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if(line == endMarker) {
fileScanFlag = false;
break;
}
if(socketOutput != null && socketOutput.checkError()) {
System.out.println("SourceFileConsumer Thread-" + Thread.currentThread().getId() + " detected broken link...");
try {
client = serverSocket.accept();
socketOutput = new PrintWriter(client.getOutputStream(), true);
socketInput = new BufferedReader(new InputStreamReader(client.getInputStream()));
} catch(IOException e) {
e.printStackTrace();
}
System.out.println("SourceFileConsumer Thread-" + Thread.currentThread().getId() + " re-established connection...");
}
if(socketOutput != null)
socketOutput.println(line);
counter += 1;
}
long endTimestamp = System.currentTimeMillis();
if(endTimestamp - startTimestamp <= 1000) {
System.out.println("thread-" + Thread.currentThread().getId() + " input rate: " + counter +
", wait time: " + (1000 - (endTimestamp - startTimestamp)));
try {
Thread.sleep((1000 - (endTimestamp - startTimestamp)));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if(socketInput != null && socketOutput != null && client != null) {
try {
socketInput.close();
socketOutput.close();
client.close();
} catch(IOException e) {
e.printStackTrace();
}
}
System.out.println("SourceFileConsumer Thread-" + Thread.currentThread().getId() + " transfer complete.");
}
}

问题是,一段时间后,两个线程都挂起并且没有发送任何元组。当我在 Linux 机器中运行 top 命令时,我发现运行两个线程的 Java 进程使用的 CPU 时间非常少。为什么会发生这种情况?这是饥饿的问题吗?我认为使用 LinkedBlockingQueue 可以避免饥饿。

有什么提示吗?

谢谢,尼克

最佳答案

这是相当多的代码,尤其是在您的消费者中。因此,不能排除存在多个错误。我建议简化您的代码以缩小问题范围,例如独立测试生产者-消费者交接和网络操作。

一个明显的问题是您试图通过 AtomicBoolean 表示文件结束。但您的消费者在购买商品之前并没有真正对其进行测试。如果你看看它的地方 take s 项,有一个内循环:

while(counter < rate[index]) {
try {
line = buffer.take();

由于生产者对 counter < rate[index] 没有影响力条件下,它无法控制消费者将尝试多少行take在检查 fileScanFlag 的状态之前.

但即使您尝试通过检查 take 之前的 boolean 标志来解决此问题,由于可能的竞争条件,结果被破坏。原子 boolean 值和阻塞队列本身都是线程安全的,但两者的组合却不是。

将最后一项放入队列和设置标志是两个不同的操作。就在这两个操作之间,消费者可以取出最后一个项目,重新检查标志并发现它是 false并转到下一次尝试 take而生产者即将将其设置为 true .

一种解决方案是改变消费者端的操作顺序,这需要诉诸轮询:

polling: for(;;) {
line = buffer.poll(timeout, timeOutUnit); // control the cpu consumption via timeout
if(line!=null) break polling;
if(fileScanFlag.get()) break outerLoop;
}

另一种选择是不使用两种不同的通信结构。一旦文件到达末尾,就将结束标记对象放入队列中,而不是维护 boolean 标志。这是使用 String 身份的罕见情况之一。而不是equals合适的是:

public class SourceFileProducer implements Runnable {
private String endMarker;

public SourceFileProducer(LinkedBlockingQueue<String> buffer,
String endMarker, String fileName) {
this.buffer = buffer;
this.endMarker = endMarker;


@Override
public void run() {
System.out.println("SourceFileProducer thread-" + Thread.currentThread().getId()
+ " initiating with source file: " + fileName);
String line;
try {
while((line = reader.readLine()) != null) buffer.put(line);
} catch (IOException|InterruptedException e) {
e.printStackTrace();
}
buffer.put(endMarker);
}

 

public class SourceFileConsumer implements Runnable {
private String endMarker;


public SourceFileConsumer(LinkedBlockingQueue<String> buffer, String endMarker,
ServerSocket serverSocket, Socket client, long checkpoint[], int rate[]) {
this.buffer = buffer;
this.endMarker = endMarker;


line = buffer.take();
if(line==endMarker) break;

结束标记的值并不重要,但它是对象标识。因此,创建两个线程的代码必须包含如下内容:

 // using new to ensure unique identity
private static final String EOF = new String("end of file");


new SourceFileProducer(queue, EOF, …)
new SourceFileConsumer(queue, EOF, …)

new运算符保证生成具有唯一标识的对象,因此,将该标记对象与任何其他对象进行比较 String ,即 BufferedReader 返回的行,通过==将始终评估为 false。必须小心,不要让标记对象逃逸到不知道其特殊作用的代码中。

关于java - 不明原因的停止执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31322557/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com