gpt4 book ai didi

java - 使两个操作原子化

转载 作者:行者123 更新时间:2023-12-01 12:28:02 26 4
gpt4 key购买 nike

我正在实现一个应遵循以下要求的自定义数据结构:

  1. 元素应该是可搜索的(即根据 .equals() 查找和检索元素)
  2. 当我尝试检索某个元素但未找到该元素时,该方法应该阻塞,直到该元素可用为止。如果从不,该方法将永远阻塞或在给定的超时时间内阻塞。

我已经实现了自己的类 - 我首先开始修改 LinkedBlockingQueue - 但我有一个点,我可能会遇到糟糕的交错,而且我不知道如何避免它。

为了测试数据结构,我创建了一个具有 3 个线程的方法。每个线程都会尝试接收一个唯一的字符串(例如,第一个线程的“Thread1”等)。如果在数据存储中找到该字符串,它将为下一个线程插入一个字符串(例如,线程 1 将插入“Thread2”)。

这样,线程将阻塞,并且仅当消息在存储中时才发送消息。首先,在启动线程后,我手动将“Thread1”添​​加到存储中。这应该触发线程 1 从存储中获取他的值并为线程 2 插入一个值。然后应该通知线程 2 并获取他的值并为线程 3 插入一个值,依此类推。

但是,我注意到循环在随机传递消息几次后停止。我认为这是由于 takeOrWait() 方法中可能存在错误的交错(如此处所示)。我尝试了几种解决方案,但找不到方法。

问题是 - 我认为 - 我必须释放锁才能修改,然后调用 sync.wait()。在这些调用之间,线程已经可以在队列中插入一个元素,这将导致所有等待线程错过通知。

在释放锁之前是否可以启动wait()

为了完整起见,我添加了用于测试的 Main.java 类。

阻止数据存储

public class BlockingStore<T> {

// Linked list node.
static class Node<E> {
/**
* The item, volatile to ensure barrier separating write and read
*/
volatile E item;
Node<E> next;

Node(E x) {
item = x;
}
}


private Node<T> _head;
private Node<T> _lastPtr;
private int _size;

// Locks
private Lock changeLock = new ReentrantLock();
private final Object sync = new Object();

//////////////////////////////////
// CONSTRUCTOR //
//////////////////////////////////
public BlockingStore() {
_head = null;
_lastPtr = null;
}

//////////////////////////////////
// INTERNAL MODIFICATION //
//////////////////////////////////

/**
* Locates an element in the storage and removes it.
* Returns null if the element is not found in the list.
*
* @param toRemove Element to remove.
* @return Returns the removed element.
*/
private T findAndRemove(T toRemove) {
T result = null;

// Empty queue.
if (_head == null)
return result;

// Do we want the head?
if (_head.item.equals(toRemove)) {
result = _head.item;
_head = _head.next;
this._size--;
return result;
}

Node<T> previous = _head;
Node<T> current = previous.next;

while (current != null) {
if (current.item.equals(toRemove)) {
// Take the element out of the list.
result = current.item;

// we have to update the last pointer.
if (current == _lastPtr)
_lastPtr = previous.next;
else
previous.next = current.next;
this._size--;
return result;
}
previous = current;
current = current.next;
}
return result;
}

/**
* Adds an element to the end of the list.
*
* @param toAdd Element to add to the end of the list.
*/
private void addToEnd(T toAdd) {
// If the queue is empty
if (_head == null) {
_head = new Node<T>(toAdd);
_lastPtr = _head;
} else {
_lastPtr.next = new Node<T>(toAdd);
_lastPtr = _lastPtr.next;
}
this._size++;
}

/**
* Takes an element from the front of the list.
* Returns null if list is empty.
*
* @return Element taken from the front of the list.
*/
private T takeFromFront() {
// Check for empty queue.
if (_head == null)
return null;

T result = _head.item;
_head = _head.next;
this._size--;
return result;
}

//////////////////////////////////
// API METHODS //
//////////////////////////////////

/**
* Takes an element from the datastore,
* if it is not found the method blocks
* and retries every time a new object
* is inserted into the store.
*
* @param toTake
* @return
*/
public T takeOrWait(T toTake) {
T value;
changeLock.lock();
value = findAndRemove(toTake);

// Wait until somebody adds to the store
// and then try again.
while (value == null)
// Sync on the notification object
// such that we are waken up when there
// is a new element.
synchronized (sync) {
changeLock.unlock(); // allow writes.
// I think I can have bad inter-leavings here.
// If an insert is interleaved here, the thread
// will never be notified..
try {
sync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
changeLock.lock();
value = findAndRemove(toTake);
}

changeLock.unlock();
return value;
}

public T dequeue() {
T value;
changeLock.lock();
value = takeFromFront();
changeLock.unlock();
return value;
}

public void enqueue(T toPut) {
changeLock.lock();
addToEnd(toPut);

// Notify about new element in queue.
synchronized (sync) {
sync.notifyAll();
changeLock.unlock();
}

}

public int size() {
return _size;
}
}

** Main.java **

public class Main {
public static void main(String[] args) throws InterruptedException {
final BlockingStore<String> q = new BlockingStore<String>();

new Thread(new Runnable() {
public String name = "Thread 1: ";
public String to = "Thread 2: ";

public void print(String message) {
System.out.println(name + message);
}

@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();

new Thread(new Runnable() {
public String name = "Thread 2: ";
public String to = "Thread 3: ";

public void print(String message) {
System.out.println(name + message);
}

@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();

new Thread(new Runnable() {
public String name = "Thread 3: ";
public String to = "Thread 1: ";

public void print(String message) {
System.out.println(name + message);
}

@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();

Thread.sleep(1000);
System.out.println("Main: Sending new message to queue for thread 1");
q.enqueue("Thread 1: ");

}

}

最佳答案

The problem is - I think - that I have to release the lock to modify, followed by a call to sync.wait()

听起来像是丢失通知。要知道,如果 sync.wait() 中没有其他线程被阻塞,则 sync.notify() 根本不会执行任何操作。 sync 对象不记得它已被通知。

这不起作用(根据您的示例):

public void waitForFlag() {
...
while (! flag) {
synchronized (sync) {
try {
sync.wait();
} catch (InterruptedException e) { ... }
}
}
}

public void setFlag() {
flag = true;
synchronized (sync) {
sync.notifyAll();
}
}

假设线程A调用waitForFlag(),发现flag为假,然后被抢占。然后,线程 B 调用 setFlag(),不通知任何人。最后,线程A调用sync.wait()。

现在您已经设置了标志,并且线程 A 阻塞在 wait() 调用中,等待有人设置标志。这就是丢失通知的样子。

它应该是这样的:

public void waitForFlag() {
...
synchronized(sync) {
while (! flag) {
try {
sync.wait();
} catch (InterruptedException e) { ... }
}
}
}


public void setFlag() {
synchronized (sync) {
flag = true;
sync.notifyAll();
}
}

这样,通知就不会丢失,因为设置标志的语句和测试标志的语句都同步块(synchronized block)内。

关于java - 使两个操作原子化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26182461/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com