- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
一个合格的事务处理系统,应该具备四个性质:原子性(atomicity)、一致性(consistency)、隔离性(isolation)和持久性(durability)。隔离性保证了一个活跃的事务(还没提交或者回滚)对数据库所做的系统对于其他的活跃事务是不可见的,看起来就像某一时刻就只有一个事务在操作数据库。然而完美的隔离性会导致数据库系统并发性能的下降,有些时候我们可以容忍数据的不一致性,所以可以牺牲一部分隔离性,来换取更好的并发性。这篇博客将介绍 CMU15-445 Fall2020 第四个实验 Concurrency Control 的实现过程,使用两阶段锁机制实现读未提交、读已提交和可重复读这三种隔离级别,同时使用等待图来定期检测死锁.
可串行化是最高等级的隔离级别,如果一种事务调度是可串行化的,那么最终的效果等同于将多事务串行执行。如果两个事务都只对数据库进行读操作,就不会有冲突问题,不管怎么调度,都不会有一致性问题。但是一旦有一个事务有写操作,就会产生冲突.
对于下图所示的调度,可以看到里面存在冲突,T1 和 T2 中都存在读写操作。由于 R(B) 和 W(A) 、 W(B) 和 R(A) 互不影响,我们可以通过调整他们的顺序,实现和串行化相同的效果,这时候称为冲突可串行化.
调整之后的调度如下图所示,最终的效果就是 T2 写了记录 A 和 B.
但是有些调度,不管你如何对调顺序,也无法串行化.
如果无法通过调度实现冲突串行化,我们就只能回滚事务,这样就做了很多的无用功。一种鉴定冲突是否可串行化的方式是根据调度构造出一个优先图,每个事务是图中的顶点,图中的边代表一种冲突操作。下图存在 W-R 和 R-W 冲突,构造出来的优先图是有环的,所以无法串行化.
虽然构造优先图的方法很好使,但是这要求我们事先知道整个调度长什么样子。为了解决此问题,两阶段锁协议横空出世。该协议要求每个事务分为两个阶段提出锁请求:
最初事务处于增长阶段,根据需要获得锁,一旦释放锁就进入缩减阶段。假设下图中 T1 在 R(A) 之前加的是读锁,那么在 R(A) 之后不能通过释放读锁的方式来重新获取写锁,只能通过锁升级的方式将读锁转换为写锁.
虽然两阶段锁可以实现冲突可串行化,但是他无法避免脏读和级联回滚问题,这时候需要使用强两阶段锁协议,只在 Commit 之后一次性释放事务持有的所有锁.
为确保事务操作正确交错,数据库管理系统将使用锁管理器来控制何时允许事务访问数据项。锁管理器的基本思想是维护一个关于当前被活动事务所持有的锁的内部数据结构,事务在允许访问数据项之前向锁管理器发出锁请求。锁管理器将要么将锁授予调用事务,要么阻塞该事务,要么将其中止.
Bustub 中使用的数据结构是锁表 unordered_map<RID, LockRequestQueue> lock_table_ ,如下图所示。锁表实际上是一个哈希表,键为 tuple 的行 ID,值为一个链表,记录了所有对此 tuple 发出加锁请求的事务。深蓝色的事务持有锁,浅蓝色的事务处于阻塞状态.
锁管理器处理锁请求的机制为:
可以看到, 这个机制和读写锁是一样的 ,可以保证锁请求不会发生饿死现象.
LockRequestQueue 的定义如下所示,我们在 LockRequestQueue 中添加了 reader_count_ 和 writer_enter_ 成员,搭配条件变量 cv_ 和锁管理器中的 std::mutex latch_ 可以实现读写锁:
enum class LockMode { SHARED, EXCLUSIVE };
class LockRequest {
public:
LockRequest(txn_id_t txn_id, LockMode lock_mode) : txn_id_(txn_id), lock_mode_(lock_mode), granted_(false) {}
txn_id_t txn_id_;
LockMode lock_mode_;
bool granted_;
};
class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
std::condition_variable cv_; // for notifying blocked transactions on this rid
bool upgrading_ = false;
uint32_t reader_count_ = 0;
bool writer_enter_ = false;
};
实验要求我们实现三种隔离级别:
LockShared()
的时候需要抛出异常 对于加共享锁的请求,如果前面有排它锁请求,就需要阻塞当前事务,直到持有排它锁的事务释放锁.
bool LockManager::LockShared(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
// 收缩阶段不允许上锁
CheckShrinking(txn);
// 不需要重复上锁
if (txn->IsSharedLocked(rid)) {
return true;
}
auto txn_id = txn->GetTransactionId();
// 读未提交不需要加读锁
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn_id, AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
}
// 创建一个加锁请求
auto &queue = lock_table_[rid];
auto &request = queue.request_queue_.emplace_back(txn_id, LockMode::SHARED);
// 没拿到锁就进入阻塞状态
queue.cv_.wait(lock, [&] { return !queue.writer_enter_ || txn->IsAborted(); });
// 死锁会导致事务中止
CheckAborted(txn);
// 更新锁请求的状态
queue.reader_count_++;
request.granted_ = true;
txn->GetSharedLockSet()->emplace(rid);
return true;
}
void LockManager::CheckShrinking(Transaction *txn) {
if (txn->GetState() == TransactionState::SHRINKING) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
}
}
void LockManager::CheckAborted(Transaction *txn) {
if (txn->IsAborted()) {
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
}
}
对于加排它锁的请求,需要等待前方的写请求和读请求完成才能拿到锁:
bool LockManager::LockExclusive(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
CheckShrinking(txn);
if (txn->IsExclusiveLocked(rid)) {
return true;
}
// 创建加锁请求
auto &queue = lock_table_[rid];
auto &request = queue.request_queue_.emplace_back(txn->GetTransactionId(), LockMode::EXCLUSIVE);
// 没有拿到写锁就进入阻塞状态
queue.cv_.wait(lock, [&] { return (!queue.writer_enter_ && queue.reader_count_ == 0) || txn->IsAborted(); });
// 死锁会导致事务中止
CheckAborted(txn);
queue.writer_enter_ = true;
request.granted_ = true;
txn->GetExclusiveLockSet()->emplace(rid);
return true;
}
如果前方有事务提出升级锁的请求,需要抛出异常并中止当前事务,否则得等到当前请求变成请求队列的第一个请求时才能完成升级操作:
bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
txn->GetSharedLockSet()->erase(rid);
auto &queue = lock_table_[rid];
queue.reader_count_--;
auto request_it = GetRequest(txn->GetTransactionId(), rid);
request_it->lock_mode_ = LockMode::EXCLUSIVE;
request_it->granted_ = false;
// 如果前面有正在排队升级锁的事务就直接返回
if (queue.upgrading_) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT);
}
queue.upgrading_ = true;
queue.cv_.wait(lock, [&] { return (!queue.writer_enter_ && queue.reader_count_ == 0) || txn->IsAborted(); });
// 死锁会导致事务中止
CheckAborted(txn);
queue.upgrading_ = false;
queue.writer_enter_ = true;
request_it->granted_ = true;
txn->GetExclusiveLockSet()->emplace(rid);
return true;
}
释放锁之后需要及时唤醒其他被阻塞的事务:
bool LockManager::Unlock(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->erase(rid);
auto request_it = GetRequest(txn->GetTransactionId(), rid);
auto lock_mode = request_it->lock_mode_;
// 更新事务状态,读已提交不需要两阶段锁机制
if (txn->GetState() == TransactionState::GROWING &&
!(lock_mode == LockMode::SHARED && txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED)) {
txn->SetState(TransactionState::SHRINKING);
}
// 从加锁请求队列中移除事务的请求
auto &queue = lock_table_[rid];
queue.request_queue_.erase(request_it);
if (lock_mode == LockMode::SHARED) {
// 唤醒等待读锁的线程
if (--queue.reader_count_ == 0) {
queue.cv_.notify_all();
}
} else {
// 唤醒等待写锁的线程
queue.writer_enter_ = false;
queue.cv_.notify_all();
}
return true;
}
两阶段锁无法避免死锁现象的出现,所以我们需要有一个后台线程来定期做死锁检测。死锁检测的方式是根据事务的等待情况构建一个有向的等待图,如果等待图中有环,就中止环中最年轻的事务,直到没有环出现.
LockManager 中的 unordered_map<txn_id_t, vector<txn_id_t>> waits_for_ 代表等待图,键为事务 T(顶点),值为事务 T 在等待解锁的其他事务的集合,二者组成了等待图中的边.
void LockManager::AddEdge(txn_id_t t1, txn_id_t t2) {
txns_.insert(t1);
txns_.insert(t2);
auto &neighbors = waits_for_[t1];
auto it = std::find(neighbors.begin(), neighbors.end(), t2);
if (it == neighbors.end()) {
neighbors.push_back(t2);
}
}
void LockManager::RemoveEdge(txn_id_t t1, txn_id_t t2) {
auto &neighbors = waits_for_[t1];
auto it = std::find(neighbors.begin(), neighbors.end(), t2);
if (it != neighbors.end()) {
neighbors.erase(it);
}
}
std::vector<std::pair<txn_id_t, txn_id_t>> LockManager::GetEdgeList() {
std::vector<std::pair<txn_id_t, txn_id_t>> edges;
for (auto &[t1, neighbors] : waits_for_) {
for (auto t2 : neighbors) {
edges.emplace_back(t1, t2);
}
}
return edges;
}
要检测环,需要使用深度优先算法:从某个顶点 \(v_s\) 出发,遍历顶点的邻接顶点,如果最终能回到 \(v_s\) ,就说明存在环。具体实现时是在集合 on_stack_txns_ 中维护正在访问的顶点,如果遍历临接顶点的时候能在集合中找到 \(v_s\) 就说明有环.
bool LockManager::HasCycle(txn_id_t *txn_id) {
for (auto &t1 : txns_) {
DFS(t1);
if (has_cycle_) {
*txn_id = *on_stack_txns_.rbegin();
on_stack_txns_.clear();
has_cycle_ = false;
return true;
}
}
on_stack_txns_.clear();
return false;
}
void LockManager::DFS(txn_id_t txn_id) {
if (has_cycle_) {
return;
}
on_stack_txns_.insert(txn_id);
auto &neighbors = waits_for_[txn_id];
std::sort(neighbors.begin(), neighbors.end());
for (auto t2 : neighbors) {
if (!on_stack_txns_.count(t2)) {
DFS(t2);
} else {
has_cycle_ = true;
return;
}
}
on_stack_txns_.erase(txn_id);
}
定期检测环时需要先根据锁表构建等待图,然后移除所有环并中止相关事务,最后清空等待图:
void LockManager::RunCycleDetection() {
while (enable_cycle_detection_) {
std::this_thread::sleep_for(cycle_detection_interval);
{
std::unique_lock<std::mutex> l(latch_);
// 构建等待图
for (auto &[rid, queue] : lock_table_) {
std::vector<txn_id_t> grants;
auto it = queue.request_queue_.begin();
while (it != queue.request_queue_.end() && it->granted_) {
grants.push_back(it->txn_id_);
it++;
}
while (it != queue.request_queue_.end()) {
for (auto &t2 : grants) {
AddEdge(it->txn_id_, t2);
}
wait_rids_[it->txn_id_] = rid;
it++;
}
}
// 移除环中 id 最小的事务
txn_id_t txn_id;
while (HasCycle(&txn_id)) {
AbortTransaction(txn_id);
}
// 清空图
waits_for_.clear();
wait_rids_.clear();
txns_.clear();
}
}
}
void LockManager::AbortTransaction(txn_id_t txn_id) {
auto txn = TransactionManager::GetTransaction(txn_id);
txn->SetState(TransactionState::ABORTED);
waits_for_.erase(txn_id);
// 释放所有 txn 持有的写锁
for (auto &rid : *txn->GetExclusiveLockSet()) {
for (auto &req : lock_table_[rid].request_queue_) {
if (!req.granted_) {
RemoveEdge(req.txn_id_, txn_id);
}
}
}
// 释放所有 txn 持有的读锁
for (auto &rid : *txn->GetSharedLockSet()) {
for (auto &req : lock_table_[rid].request_queue_) {
if (!req.granted_) {
RemoveEdge(req.txn_id_, txn_id);
}
}
}
// 通知 txn 所在线程事务被终止了
lock_table_[wait_rids_[txn_id]].cv_.notify_all();
}
上一节中我们实现了单线程的执行器,现在需要对其进行修改,使其支持三种隔离级别的并发事务.
如果隔离级别高于读未提交,就给事务加上共享锁,读已提交的事务需要在返回 tuple 之前释放锁
void SeqScanExecutor::Unlock(Transaction *txn, const RID &rid) {
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
exec_ctx_->GetLockManager()->Unlock(txn, rid);
}
}
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
auto predicate = plan_->GetPredicate();
auto txn = exec_ctx_->GetTransaction();
while (it_ != table_metadata_->table_->End()) {
*rid = it_->GetRid();
// 上锁
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED && !txn->IsExclusiveLocked(*rid)) {
exec_ctx_->GetLockManager()->LockShared(txn, *rid);
}
*tuple = *it_++;
if (!predicate || predicate->Evaluate(tuple, &table_metadata_->schema_).GetAs<bool>()) {
// 只保留输出列
std::vector<Value> values;
for (auto &col : GetOutputSchema()->GetColumns()) {
values.push_back(col.GetExpr()->Evaluate(tuple, &table_metadata_->schema_));
}
*tuple = {values, GetOutputSchema()};
// 解锁
Unlock(txn, *rid);
return true;
}
Unlock(txn, *rid);
}
return false;
}
插入的时候需要给 tuple 加写锁,但是不应该在 TableHeap::InsertTuple() 后面加,而是应该在函数里面加,这样插入之后才不会有其他事务改动了此 tuple。为了实现回滚操作,还需要添加一条 IndexWriteRecord 。由于 TableHeap::InsertTuple() 内部已经添加了 TableWriteRecord ,所以我们无需再次添加.
void InsertExecutor::InsertTuple(Tuple *tuple, RID *rid) {
// 更新数据表,需要在 TablePage::InsertTuple 中加锁
table_metadata_->table_->InsertTuple(*tuple, rid, exec_ctx_->GetTransaction());
// 更新索引
for (auto &index_info : index_infos_) {
index_info->index_->InsertEntry(
tuple->KeyFromTuple(table_metadata_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
IndexWriteRecord record(*rid, table_metadata_->oid_, WType::INSERT, *tuple, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->AppendTableWriteRecord(record);
}
}
更新之前需要将读锁升级为写锁,如果先前没有拿锁就需要直接请求写锁.
bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
if (!child_executor_->Next(tuple, rid)) {
return false;
}
// 更新数据表
auto new_tuple = GenerateUpdatedTuple(*tuple);
// 加锁
auto txn = exec_ctx_->GetTransaction();
if (txn->IsSharedLocked(*rid)) {
exec_ctx_->GetLockManager()->LockUpgrade(txn, *rid);
} else {
exec_ctx_->GetLockManager()->LockExclusive(txn, *rid);
}
table_info_->table_->UpdateTuple(new_tuple, *rid, exec_ctx_->GetTransaction());
// 更新索引
for (auto &index_info : index_infos_) {
// 删除旧的 tuple
index_info->index_->DeleteEntry(
tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
// 插入新的 tuple
index_info->index_->InsertEntry(
new_tuple.KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
IndexWriteRecord record(*rid, table_info_->oid_, WType::UPDATE, *tuple, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->AppendTableWriteRecord(record);
}
return true;
}
通过这次实验,可以加深对隔离级别和两阶段锁协议的理解,代码上层面多了对多线程同步技术的要求,不过做过去年实验的话应该问题也不大,以上~ 。
最后此篇关于CMU15445(Fall2020)数据库系统Project#4-ConcurrencyControl详解的文章就讲到这里了,如果你想了解更多关于CMU15445(Fall2020)数据库系统Project#4-ConcurrencyControl详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我是一名优秀的程序员,十分优秀!