gpt4 book ai didi

java - 生产者/消费者多线程

转载 作者:太空狗 更新时间:2023-10-29 22:41:17 25 4
gpt4 key购买 nike

背景

由于缺钱,我正在夜类的收费站工作,并使用互联网来教自己一些编码技能,希望明天能有更好的工作或网上销售我制作的一些应用程序。漫长的夜晚,很少的顾客。

我将多线程作为一个主题来解决,因为我在文学中(例如Android SDK)遇到了很多使用它的代码,但是我仍然觉得它晦涩难懂。

精神

在这一点上,我的方法是:尝试编写我能想到的最基本的多线程示例,将头撞到墙上,看看我是否可以使自己的大脑适应某种新颖的思维方式。我使自己处于极限,希望能超越极限。随意批评,挑剔,并指出更好的方法来做我想做的事情。

客观的

  • Get some advice on how to do the above, based on my efforts so far (code provided)

  • 练习

    这是我定义的范围:

    定义

    创建两个类,它们共同作用于数据对象的产生及其消耗。一个 Thread 创建对象并将其交付到共享空间,以供其他对象拾取和使用。我们将其称为生产线程 Producer,消费线程 Consumer和共享空间 SharedSpace。可以通过类似于这种情况的方式来吸收生产供他人消费的对象的行为:
    `Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
    `Consumer` (a hungry child waiting to eat all cakes the mum makes, until told to stop)
    `SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
    `dataValue` (a chocolate-dripping cake which MUST be eaten immediately or else...)

    为了简化运动,我决定在 child 吃蛋糕时不允许妈妈做饭。她只会等待 child 吃完蛋糕,然后立即再做一个蛋糕(达到一定限制),以养育 child 。该练习的本质是实践 Thread的信号传递,以完全实现并发。相反,我专注于完美的序列化,没有轮询或“我还能走吗?”检查。我想我将必须编写后续练习,在该练习中,母子接下来要并行工作。

    方法
  • 让我的类实现 Runnable 接口(interface),以便它们具有自己的
  • 的代码入口点
  • 使用我的类作为 Thread 对象的构造函数参数,这些对象是从程序的main入口点
  • 实例化并启动的
  • 通过 Thread
  • 确保 main程序在 Thread.join()之前没有终止
  • 设置一个限制,即Producer将为Consumer创建数据的次数
  • 同意哨兵值,Produce将用于表示数据生产结束
  • 记录共享资源上的锁以及数据生产/消耗事件的锁,包括最终注销工作线程
  • 从程序的SharedSpace创建单个main对象,并将其传递给每个工作程序,然后再启动
  • 将对private对象的SharedSpace引用内部存储到每个工作人员
  • 提供和防范信息,以描述在生成任何数据之前准备使用Consumer的情况
  • 在给定的迭代次数后停止Producer
  • 在读取哨兵值
  • 后停止 Consumer
    代码
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    class Consumer extends Threaded {
    public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
    }
    @Override
    public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
    synchronized (sharedSpace) {
    logger.info("Acquired lock on sharedSpace.");
    consumedData = sharedSpace.dataValue;
    if (consumedData == 0) {
    try {
    logger.info("Data production has not started yet. "
    + "Releasing lock on sharedSpace, "
    + "until notification that it has begun.");
    sharedSpace.wait();
    } catch (InterruptedException interruptedException) {
    logger.error(interruptedException.getStackTrace().toString());
    }
    } else if (consumedData == -1) {
    logger.info("Consumed: END (end of data production token).");
    } else {
    logger.info("Consumed: {}.", consumedData);
    logger.info("Waking up producer to continue data production.");
    sharedSpace.notify();
    try {
    logger.info("Releasing lock on sharedSpace "
    + "until notified of new data availability.");
    sharedSpace.wait();
    } catch (InterruptedException interruptedException) {
    logger.error(interruptedException.getStackTrace().toString());
    }
    }
    }
    }
    logger.info("Signing off.");
    }
    }
    class Producer extends Threaded {
    private static final int N_ITERATIONS = 10;
    public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
    }
    @Override
    public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
    synchronized (sharedSpace) {
    logger.info("Acquired lock on sharedSpace.");
    nIterations++;
    if (nIterations <= N_ITERATIONS) {
    sharedSpace.dataValue = nIterations;
    logger.info("Produced: {}", nIterations);
    } else {
    sharedSpace.dataValue = -1;
    logger.info("Produced: END (end of data production token).");
    }
    logger.info("Waking up consumer for data consumption.");
    sharedSpace.notify();
    if (nIterations <= N_ITERATIONS) {
    try {
    logger.info("Releasing lock on sharedSpace until notified.");
    sharedSpace.wait();
    } catch (InterruptedException interruptedException) {
    logger.error(interruptedException.getStackTrace().toString());
    }
    }
    }
    }
    logger.info("Signing off.");
    }
    }
    class SharedSpace {
    volatile int dataValue = 0;
    }
    abstract class Threaded implements Runnable {
    protected Logger logger;
    protected SharedSpace sharedSpace;
    public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
    }
    @Override
    public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
    }
    }
    public class ProducerConsumer {
    public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
    producer.join();
    consumer.join();
    } catch (InterruptedException interruptedException) {
    interruptedException.printStackTrace();
    }
    }
    }

    执行日志
    Consumer - Started.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
    Producer - Started.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 1
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 1.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 2
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 2.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 3
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 3.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 4
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 4.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 5
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 5.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 6
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 6.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 7
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 7.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 8
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 8.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 9
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 9.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: 10
    Producer - Waking up consumer for data consumption.
    Producer - Releasing lock on sharedSpace until notified.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: 10.
    Consumer - Waking up producer to continue data production.
    Consumer - Releasing lock on sharedSpace until notified of new data availability.
    Producer - Acquired lock on sharedSpace.
    Producer - Produced: END (end of data production token).
    Producer - Waking up consumer for data consumption.
    Producer - Signing off.
    Consumer - Acquired lock on sharedSpace.
    Consumer - Consumed: END (end of data production token).
    Consumer - Signing off.

    问题
  • 以上是否正确? (例如,它使用正确的语言工具,正确的方法,是否包含任何愚蠢的代码,...)

  • 但是它“看起来正确”吗?

    即使输出“看起来不错”,我也会询问正确性,因为您无法想象在测试中“一次”而不是“另一次”出现错误的次数(例如,当消费者第一次启动时,生产者从未退出时)制作哨兵后等)。我学会了不要从“一次成功的运行”中要求正确性。相反,我对伪并行代码非常怀疑! (根据定义,这甚至都不是并行的!0

    扩展答案

    一个很好的问题仅针对 one requested piece of advice(上面的一个),但是如果您愿意,可以在答案中随意提及对以下其他主题的任何见解:
  • 在编写下一次尝试代码时,如何测试并行代码?
  • 哪些工具可以帮助我进行开发和调试?考虑我使用 Eclipse
  • 如果我允许Producer继续生产,而每次生产花费一些可变的时间,而Consumer消耗了可用的任何东西,这种方法会改变吗?锁是否必须移至其他位置?信号通知是否需要从此等待/通知范式改变?
  • 这种处理方法是否过时了,我是否应该学习其他方法?从这个收费站,我不知道“在Java的真实世界中”
  • 会发生什么

    下一步
  • 我应该从这里去哪里?我已经在某处提到了“ future ”的概念,但是我可以使用编号的主题列表按教学顺序从头到尾按顺序进行工作,并链接到相关的学习资源


  • 蒂诺·诺诺(Tino Sino)

    最佳答案

    Is the above correct?



    我看到的唯一问题是@Tudor和@Bhaskar提到的内容。每当您在等待条件时测试条件时,都必须使用 while循环。但是,这更多是关于与多个生产者和消费者的竞争条件。可能会发生虚假唤醒,但竞争条件更有可能发生。参见 my page on the subject

    是的,您只有1个生产者和1个使用者,但是您可以尝试将代码扩展为多个使用者,或者将您的代码复制到另一个方案中。

    I have learned not to claim correctness from "a successful run". On the contrary, I have become very suspicious of pseudo-parallel code!



    好本能。

    How could I test parallel code as I code my next attempts?



    这很难。扩大规模是一种方法。添加多个生产者和消费者,看看是否有问题。在具有不同数量/类型的处理器的多个体系结构上运行。您最好的防御将是代码正确性。紧密同步,善用 BlockingQueueExecutorService等类,可以使您的关闭变得更加简单/整洁。

    没有简单的答案。测试多线程代码非常困难。

    Which tools can help me in both development and debugging?



    在一般方面,我将研究像 Emma这样的覆盖率工具,以便您可以确保单元测试覆盖了所有代码。

    在多线程代码测试方面,要了解如何读取 kill -QUIT线程转储并查看Jconsole中正在运行的线程。像 YourKit这样的Java分析器也可能会有所帮助。

    Would the approach change if I allowed the Producer to continue producing, with each production taking some variable amount of time...



    我不这么认为。消费者将永远等待生产者。也许我不明白这个问题?

    Is this method of doing things obsolete and should I rather be learning something else? From this tollbooth, I have no idea of what happens "in the real world of Java"



    接下来是 ExecutorService classes的学习。它们处理了大部分 new Thread()样式代码-尤其是当您处理大量使用线程执行的异步任务时。这是 tutorial

    Where should I go from here?



    同样, ExecutorService。我假设您已经阅读 this starting docs。正如@Bhaskar所提到的, Java Concurrency in Practice是一本好圣经。

    以下是有关您的代码的一些一般注释:
  • SharedSpaceThreaded类似乎是一种人为设计的方法。如果您正在玩基类之类的话,那就好了。但总的来说,我从不使用这样的模式。生产者和消费者通常使用BlockingQueue(例如 LinkedBlockingQueue ),在这种情况下,同步和volatile有效负载将由您来照顾。另外,我倾向于将共享信息注入(inject)到对象构造函数中,而不是从基类中获取共享信息。
  • 通常,如果我使用synchronized,则它位于private final字段上。我经常创建一个private final Object lockObject = new Object();进行锁定,除非我已经在使用一个对象。
  • 小心巨大的synchronized块,并将日志消息放在synchronized节中。日志通常对文件系统执行synchronized IO,这可能会非常昂贵。如果可能,您应该有很小的,很紧的synchronized块。
  • 您可以在循环外部定义consumedData。我会在分配时定义它,然后使用break从循环中保释(如果它是== -1)。确保尽可能限制局部变量的范围。
  • 您的日志消息将支配您的代码性能。这意味着当您删除它们时,您的代码将完全不同地执行。当您要调试它的问题时,意识到这一点非常重要。当您转移到具有不同CPU/内核的不同体系结构时,性能也会(最有可能)发生变化。
  • 您可能知道这一点,但是当您调用sharedSpace.notify();时,这仅意味着如果另一个线程当前在sharedSpace.wait();中,则该线程将得到通知。如果没有其他内容,它将丢失通知。仅供引用。
  • 进行if (nIterations <= N_ITERATIONS)有点奇怪,然后在else下3行再次进行一次。复制notify()会更好地简化分支。
  • 您需要先输入int nIterations = 0;,再输入while,然后再输入++。那是for循环的秘诀:
    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {


  • 这是您的代码的紧密版本。这只是我如何编写它的一个示例。再次,除了缺少的 while之外,您的版本似乎没有任何问题。
    public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
    this.queue = queue;
    }
    @Override
    public void run() {
    while (true) {
    int consumedData = queue.take();
    if (consumedData == Producer.FINAL_VALUE) {
    logger.info("Consumed: END (end of data production token).");
    break;
    }
    logger.info("Consumed: {}.", consumedData);
    }
    logger.info("Signing off.");
    }
    }

    public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
    this.queue = queue;
    }
    @Override
    public void run() {
    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    logger.info("Produced: {}", nIterations);
    queue.put(nIterations);
    }
    queue.put(FINAL_VALUE);
    logger.info("Produced: END (end of data production token).");
    logger.info("Signing off.");
    }
    }

    public class ProducerConsumer {
    public static void main(String[] args) {
    // you can add an int argument to the LinkedBlockingQueue constructor
    // to only allow a certain number of items in the queue at one time
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
    Thread producer = new Thread(new Producer(queue), "Producer");
    Thread consumer = new Thread(new Consumer(queue), "Consumer");
    // start and join go here
    }
    }

    关于java - 生产者/消费者多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12554390/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com