gpt4 book ai didi

java - 使用生产者/消费者模型处理文件

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:22:58 25 4
gpt4 key购买 nike

在最近删除的帖子中,我提出了以下问题:


我正在尝试编写一个实现生产者/消费者模型的多线程程序。通常,我想使用一个生产者从文件中读取行并将它们放入 BlockingQueue,并让多个消费者在从 BlockingQueue 中检索行并将结果存储在新文件中后进行一些处理。

我希望您能给我一些反馈,告诉我应该考虑什么才能实现高性能。我花了数周时间阅读有关并发和同步的内容,因为我不想错过任何内容,但我正在寻找一些外部反馈。请在下面找到我需要的信息。

  • 我应该使用哪种类型的 BlockingQueue 实现以获得更好的性能?我不能使用固定大小的 BlockingQueue,因为我们不知道文件中有多少行。或者即使 Producer 会被锁定我也应该使用它吗? (如果队列已满)
  • 如果“f()”是生产者用来处理文件行的方法。知道我正在使用 BlockingQueue,我是否应该同步 f()?如果是,那不会影响我的申请吗?因为其他消费者将等待锁的释放。

希望我没有说错什么。


你建议在提问之前先实现一些东西,所以我删除了帖子并尝试实现模型。这是我的代码。

我有一个线程从文件中读取并将它们放入 BlockingQueue 中的生产者。

class Producer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;

private float numline=0;


protected transient BufferedReader bufferedReader;
protected transient BufferedWriter bufferedWriter;


public Producer (String location, BlockingQueue<String> blockingQueue) {
this.location=location;
this.blockingQueue=blockingQueue;

try {
bufferedReader = new BufferedReader(new FileReader(location));

// Create the file where the processed lines will be stored
createCluster();

} catch (FileNotFoundException e1) {
e1.printStackTrace();
}
}

@Override
public void run() {
String line=null;
try {
while ((line = bufferedReader.readLine()) != null) {
// Count the read lines
numline++;
blockingQueue.put(line);
}
} catch (IOException e) {
System.out.println("Problem reading the log file!");
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

public void createCluster () {
try {
String clusterName=location+".csv";
bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
bufferedWriter.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
}

}

Consumer,其中多个线程将从 BlockingQueue 中执行一些处理“f()”并将结果存储在新文件中。

class Consumer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;

protected transient BufferedWriter bufferedWriter;

private String clusterName;

public Consumer (String location, BlockingQueue<String> blockingQueue) {
this.blockingQueue=blockingQueue;
this.location=location;

clusterName=location+".csv";
}

@Override
public void run() {
while (true) {
try {
//Retrieve the lines
String line = blockingQueue.take();
String result = doNormalize (line);
// TO DO
//
//bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
//BufferedWriter.write(result+ "\n");

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//Pattern pattern, Matcher matcher
private String doNormalize(String line){
String rules [] = getRules(); // return an array of Regex
String tmp="";

for (String rule : rules) {
Pattern pattern = Pattern.compile(rule);
Matcher matcher = pattern.matcher(line);

if (matcher.find()){
Set<String> namedGroups = getNamedGroupCandidates(rule);
Iterator<String> itr = namedGroups.iterator();
while(itr.hasNext()){
String value=itr.next();
tmp=tmp+matcher.group(value)+", ";
}


tmp = tmp + "\t";
break;
}
}
return tmp;

}
private Set<String> getNamedGroupCandidates(String regex) {
Set<String> namedGroups = new TreeSet<String>();
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
while (m.find()) {
namedGroups.add(m.group(1));
}
return namedGroups;
}
}

和我的主类中的代码。使用 1 个生产者和 3 个消费者

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

Producer readingThread = new Producer(location, queue);
new Thread(readingThread).start();

Consumer normalizers = new Consumer(location,queue);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
executor.submit(normalizers);
}
System.out.println("Stopped");
executor.shutdown();

我知道我的代码不完整,因为我需要关闭并刷新读取器和写入器等。但是你能告诉我到目前为止我在实现生产者/消费者模型时犯的错误吗?还有方法 f(),它是一个处理一行并产生结果的方法,我认为我不应该同步它,因为我希望所有消费者同时使用。

编辑

最后,这个post真的让我很困惑,它表明如果消费者将结果存储在文件中,它会减慢这个过程。这可能是个问题,因为我想要性能和速度。

最佳,

最佳答案

对于我的第二个问题:“SingleConsumer ‘知道’多个消费者已经完成消费/处理所有行。”。我的灵感来自于这个post结合这条评论:每个消费者都应该向队列 2 发送“我已终止”消息,如果单个输出消费者收到所有这些消息,它也可以终止。

因此,对于消费者而言;这是我在 run() 方法中写的:

@Override
public void run() {
// A Consumer keeps taking elements from the queue 1, as long as the Producer is
// producing and as long as queue 1 is not empty.
while (true) {
try {

//Retrieve the lines
String line = firstBlockingQueue.take();
If a special terminating value is found.
if (line==POISON_PILL) {
// The consumer notifies other consumers and the SignleConsumer that operates on queue 2
// and then terminates.
firstBlockingQueue.put(POISON_PILL);
secondBlockingQueue.put(SINGLE_POISIN_PILL);
return;
}
// Put the normalized events on the new Queue
String result = doNormalize (line);
if (result!=null) {
secondBlockingQueue.put(result);
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

至于 SinglerConsumer,它应该计算 Consumers 发送的“I finished processing”消息或者我将其用作 SINGLE_POISON_PILL。并在该计数器达到队列 1 中的消费者数量时终止。

while (true) {
try {
//Retrieve the lines
String line = secondBlockingQueue.take();
if (line==SINGLE_POISIN_PILL) {

setCounter(getCounter()+1);
if (getCounter()== threadNumber) {
System.out.println("All "+getCounter()+" threads have finished. \n Stopping..");
return;
}
}

try {
if (line != SINGLE_POISIN_PILL) {
System.out.println(line);
bufferedWriter.write(line+"\n");
}
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

对于我的第二个问题,显然我所要做的就是添加:

        if (line==SINGLE_POISIN_PILL) {
setCounter(getCounter()+1);
if (getCounter()== threadNumber) {
System.out.println("All "+getCounter()+" threads have finished. \n Stopping..");
try {
if (bufferedWriter != null)
{
bufferedWriter.flush();
bufferedWriter.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return;
}
}

一旦我刷新并关闭缓冲区,缓冲区就开始写入。

希望得到您的反馈。

关于java - 使用生产者/消费者模型处理文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50745284/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com