gpt4 book ai didi

java - 并发 - 条件同步方法

转载 作者:行者123 更新时间:2023-12-03 13:18:39 24 4
gpt4 key购买 nike

我正在逐行同时读取两个文本文件。

我特别想做的是当lineCount在每个线程上都是相同的我想看看扫描仪当前正在读取的字符串。

我环顾四周寻找可以实现的某些模式,例如 Compare and SwapSlipped Condition但我无法理解它将如何帮助我实现目标。我是并发编程的新手。

到目前为止,我所做的是将字符串读取和打印与 counterSync 同步。方法,我知道我已经在那里执行了我的线程锁定/暂停操作并查看了字符串。

public class concurrencyTest {

public static void main(String[] args) throws IOException {
String filePath1 = "path1.txt";
String filePath2 = "path2.txt";
reader reader = new reader();
MyThread source = new MyThread(reader, filePath1);
MyThread target = new MyThread(reader, filePath2);

source.start();
target.start();
}
static public class reader {
void read(String filePath) throws IOException {
readFile(filePath);
}
}

static synchronized void counterSync(String thread) {
System.out.println(thread);
}

static class MyThread extends Thread {
reader reader;
String filePath;

MyThread(reader reader, String filePath) {
this.reader = reader;
this.filePath = filePath;
}

@Override
public void run() {
try {
reader.read(filePath);
} catch (IOException e) {
e.printStackTrace();
}
}
}

static void readFile(String filePath) throws IOException {
FileInputStream inputStream = null;
Scanner sc = null;
int lineCount = 0;
try {
inputStream = new FileInputStream(filePath);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine()) {
lineCount++;
System.out.println(lineCount + "===" + sc.nextLine());
counterSync(sc.nextLine());
}
if (sc.ioException() != null) {
throw sc.ioException();
}
} finally {
if (inputStream != null) {
inputStream.close();
}
if (sc != null) {
sc.close();
}
}
}
}

最佳答案

好的,您正在寻找的是有点复杂但仍然可能。
你的问题缺少一些例子,所以如果我在某些方面错了,请纠正我。

您有 2 个线程:

  • 线程1
  • 线程2


  • 和2个文件:
  • 文件1
  • 文件2

  • 文件1的内容:
    file1  
    file2
    file3
    file4
    file5
    file6
    file7
    file8
    file9

    文件2的内容:
    file11  
    file22
    file33
    file44
    file55
    file66
    file77
    file88
    file99

    您想停止同一行号上的所有线程并对输出进行一些操作。

    这是读取文件的线程实现,我们将实例化它的 2 个实例,每个实例将管理一个文件。
    static class ReaderThread extends Thread {
    private File fileToRead;

    public final Object lock = new Object();
    private String currentLine;
    private AtomicInteger lineCount = new AtomicInteger(0);

    public ReaderThread(File fileToRead) {
    this.fileToRead = fileToRead;
    }

    @Override
    public void run() {
    synchronized (lock) {
    try {
    Stream<String> lines = Files.lines(Path.of(fileToRead.getPath()));
    lines.forEach(line -> {
    currentLine = line;
    // Here's your logic on different lines
    if (lineCount.get() == 4 || lineCount.get() == 5 || lineCount.get() == 6) {
    try {
    lock.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    lineCount.getAndIncrement();
    });
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    public String getCurrentLine() {
    return currentLine;
    }

    public boolean isLocked() {
    return getState().equals(State.WAITING);
    }
    }

    然后我们将使用一个辅助线程来通知阅读器线程,当我们的 elboration 正常时:
    static class HelperThread extends Thread {
    private List<ReaderThread> threads;

    @Override
    public void run() {
    while (true) {
    if (threads.stream().allMatch(ReaderThread::isLocked)) {
    System.out.println("next line:");

    threads.forEach(thread -> {
    synchronized (thread.lock) {
    System.out.println(thread.getCurrentLine());
    thread.lock.notify();
    }
    });

    System.out.println("\n");
    }
    }

    }

    public HelperThread(List<ReaderThread> threads) {
    this.threads = threads;
    }
    }

    最后是测试所有的主类:
    public static void main(String[] args) {
    File f1 = new File(Objects.requireNonNull(Main.class.getClassLoader().getResource("file1.txt")).getFile());
    File f2 = new File(Objects.requireNonNull(Main.class.getClassLoader().getResource("file2.txt")).getFile());

    ReaderThread t1 = new ReaderThread(f1);
    ReaderThread t2 = new ReaderThread(f2);

    HelperThread helperThread = new HelperThread(List.of(t1, t2));

    helperThread.start();
    t1.start();
    t2.start();
    }

    执行程序将产生以下输出:

    next line:
    file5
    file55


    next line:
    file6
    file66


    next line:
    file7
    file77


    以下是完整的进口 list :
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Stream;

    请注意:这是一个粗鲁的示例,您需要通过正确关闭线程来进行管理,一些修饰符是公共(public)的,因此请按照 java 指南对其进行封装,正确管理所有异常并进行一些常规重构。



    如果您想要一个更通用的实现,插入不同的行,以下应该是可以的:
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Stream;

    public class Main2 {

    public static void main(String[] args) {
    File f1 = new File(Objects.requireNonNull(Main2.class.getClassLoader().getResource("file1.txt")).getFile());
    File f2 = new File(Objects.requireNonNull(Main2.class.getClassLoader().getResource("file2.txt")).getFile());

    ReaderThread t1 = new ReaderThread(f1);
    ReaderThread t2 = new ReaderThread(f2);

    HelperThread helperThread = new HelperThread(List.of(t1, t2));

    helperThread.start();

    t1.setName("Reader1");
    t1.setName("Reader2");
    t1.start();
    t2.start();
    }

    static class ReaderThread extends Thread {
    private final File fileToRead;
    private final Object lock = new Object();
    private final AtomicInteger lineCount = new AtomicInteger(0);
    private String currentLine;

    public ReaderThread(File fileToRead) {
    this.fileToRead = fileToRead;
    }

    @Override
    public void run() {
    synchronized (lock) {
    try {
    Stream<String> lines = Files.lines(Path.of(fileToRead.getPath()));
    lines.forEach(line -> {
    currentLine = line;
    lineCount.getAndIncrement();
    });
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    public void lock() throws InterruptedException {
    this.lock.wait();
    }

    public void unlock() {
    this.lock.notify();
    }

    public boolean isLocked() {
    return getState().equals(State.WAITING);
    }

    public Object getLock() {
    return lock;
    }

    public AtomicInteger getLineCount() {
    return lineCount;
    }

    public String getCurrentLine() {
    return currentLine;
    }

    }

    static class HelperThread extends Thread {
    private List<ReaderThread> threads;

    @Override
    public void run() {
    while (true) {
    threads.forEach(t -> {
    try {
    if (t.getName().equals("Reader1") && t.getLineCount().get() == 3) t.lock();
    if (t.getName().equals("Reader2") && t.getLineCount().get() == 4) t.lock();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });

    if (threads.stream().allMatch(ReaderThread::isLocked)) {
    System.out.println("next line:");

    threads.forEach(t -> {
    synchronized (t.getLock()) {
    System.out.println(t.getCurrentLine());
    t.unlock();
    }
    });

    System.out.println("\n");
    }
    }

    }

    public HelperThread(List<ReaderThread> threads) {
    this.threads = threads;
    }
    }

    }

    确保 HelperThread 在其他线程之前启动,否则可能会丢失一些数据。

    关于java - 并发 - 条件同步方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60759643/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com