gpt4 book ai didi

java - 如何在以下代码中实现线程安全

转载 作者:行者123 更新时间:2023-12-02 11:20:20 25 4
gpt4 key购买 nike

我正在阅读有关线程安全和 synchronized 关键字的内容,但我一直在弄清楚它是如何正确实现的。

我有一个场景,Thread A 将数据保存到 Buffer,而 Thread B 读取数据并将其保存到数据库。

如何使用以下代码实现线程安全?
我所说的线程安全是指,在Thread A完成其工作之前,Thread B不会启动,这同样适用于Thread A。

public class Main {
public static void main(String args[]) throws InterruptedException {

LinkedBlockingQueue<Document> queue = new LinkedBlockingQueue<>();
ProducerThread mProducer = new ProducerThread(queue);
ConsumerThread mConsumer = new ConsumerThread(queue);
new Thread(mProducer).start();
new Thread(mConsumer).start();
}
}

--

public class ProducerThread implements Runnable {
private LinkedBlockingQueue<Document> queue;

public ProducerThread(LinkedBlockingQueue<Document> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true) {
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
Document document = new Document("timeAdded", timestamp);
try {
queue.put(document);
System.out.println("Document added " + document.toString());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

--

public class ConsumerThread implements Runnable {
private LinkedBlockingQueue<Document> queue;
private Database mDatabase;

public ConsumerThread(LinkedBlockingQueue<Document> queue) {
this.queue = queue;
}


@Override
public void run() {
try {
Document doc;
mDatabase = Database.getInstance();
if (Client.isAlive()) {
while (queue.take() != null) {
mDatabase.insert(queue.take());
// Thread.sleep(10);
System.out.println("Document consumed " + queue.take().toString());
if (!Client.isAlive()) {
wait();
Client.reconnect();
}
}
} else {
wait();
}
} catch(InterruptedException | IllegalMonitorStateException e){
e.printStackTrace();
}
}
}

我得到以下输出

> Document added{{timeAdded=2018-04-22 13:20:27.88}} 
> Document{{timeAdded=2018-04-22 13:20:27.88}} Document added
> Document{{timeAdded=2018-04-22 13:20:28.881}} Document added
> Document{{timeAdded=2018-04-22 13:20:29.882}} Document added
> Document{{timeAdded=2018-04-22 13:20:30.882}} Consumed
> Document{{timeAdded=2018-04-22 13:20:30.882}} Document added
> Document{{timeAdded=2018-04-22 13:20:31.883}} Document added
> Document{{timeAdded=2018-04-22 13:20:32.883}} Consumed
> Document{{timeAdded=2018-04-22 13:20:33.884}} Document added
> Document{{timeAdded=2018-04-22 13:20:33.884}} Document added
> Document{{timeAdded=2018-04-22 13:20:34.885}} Document added
> Document{{timeAdded=2018-04-22 13:20:35.885}} Document added
> Document{{timeAdded=2018-04-22 13:20:36.886}} Consumed
> Document{{timeAdded=2018-04-22 13:20:36.886}} Document added

最佳答案

我不知道这是否有帮助,但我会编写第三个类,名为ProductionProcess或类似的东西......

简短的附加说明:ProductionClass 保存存储单个Document 对象的队列。只要队列(目前大小为 10)已满,Producer 线程就开始“生产”对象。每当此队列中的槽由于消费者线程删除了文档对象而获得空闲时。 Producer 线程接收信号并再次开始“生成”Document 对象,直到队列已满。完整的源代码应该是线程安全的。

public class ProductionProcess
{
private static final int CAPACITY;
private final Queue QUEUE;

private final Lock LOCK;
private final Condition BUFFER_FULL;
private final Condition BUFFER_EMPTY;

static
{
CAPACITY = 10;
}

ProductionProcess()
{
this.QUEUE = new LinkedList <Document> ();
this.LOCK = new ReentrantLock();

this.BUFFER_FULL = this.LOCK.newCondition();
this.BUFFER_EMPTY = this.LOCK.newCondition();
}

public void produce() throws InterruptedException
{
this.LOCK.lock();

try
{
while(ProductionProcess.CAPACITY == this.QUEUE.size())
{
System.out.println(Thread.currentThread().getName() + " : Buffer is full, waiting!");
this.BUFFER_FULL.await();
}

Timestamp timestamp = new Timestamp(System.currentTimeMillis());
Document document = new Document("timeAdded", timestamp);

if(true == this.QUEUE.offer(document))
{
System.out.printf("Added to queue: " + document);
this.BUFFER_EMPTY.signalAll();
}
}
finally
{
this.LOCK.unlock();
}
}

public void receive() throws InterruptedException
{
this.LOCK.lock();

try
{
Database mDatabase = Database.getInstance();

while(0 == this.QUEUE.size())
{
System.out.println(Thread.currentThread().getName() + " : Buffer is empty, waiting!");
this.BUFFER_EMPTY.await();
}

Document mDocument = (Document) this.QUEUE.poll());

if(null != mDocument)
{
mDatabase.insert(mDocument);
System.out.printf("Consumed from queue: " + document);
System.out.println(Thread.currentThread().getName() + " : Signalling that buffer may be empty now");
this.BUFFER_FULL.signalAll();
}
}
finally
{
this.LOCK.unlock();
}
}
}

之后像这样使用它......

ProductionProcess process = new ProductionProcess();

Runnable runnableProducer = new Runnable() {
@Override public void run()
{
while(true)
{
try
{
process.produce();
Thread.sleep(1000);
}
catch(InterruptedException exc)
{
exc.printStackTrace();
}
}
}
}

Runnable runnableConsumer = new Runnable() {
@Override public void run()
{
while(true)
{
try
{
process.receive();
Thread.sleep(1000);
}
catch(InterruptedException exc)
{
exc.printStackTrace();
}
}
}
}

new Thread(runnableProducer).start();
new Thread(runnableConsumer).start();

我没有对其进行广泛的测试,但它应该可以工作。如果不行就评论一下吧...此外,如果你会说德语,这不是我的代码,请观看此 link .

关于java - 如何在以下代码中实现线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49965036/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com