- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
一个合格的事务处理系统,应该具备四个性质:原子性(atomicity)、一致性(consistency)、隔离性(isolation)和持久性(durability)。隔离性保证了一个活跃的事务(还没提交或者回滚)对数据库所做的系统对于其他的活跃事务是不可见的,看起来就像某一时刻就只有一个事务在操作数据库。然而完美的隔离性会导致数据库系统并发性能的下降,有些时候我们可以容忍数据的不一致性,所以可以牺牲一部分隔离性,来换取更好的并发性。这篇博客将介绍 CMU15-445 Fall2020 第四个实验 Concurrency Control 的实现过程,使用两阶段锁机制实现读未提交、读已提交和可重复读这三种隔离级别,同时使用等待图来定期检测死锁.
可串行化是最高等级的隔离级别,如果一种事务调度是可串行化的,那么最终的效果等同于将多事务串行执行。如果两个事务都只对数据库进行读操作,就不会有冲突问题,不管怎么调度,都不会有一致性问题。但是一旦有一个事务有写操作,就会产生冲突.
对于下图所示的调度,可以看到里面存在冲突,T1 和 T2 中都存在读写操作。由于 R(B) 和 W(A) 、 W(B) 和 R(A) 互不影响,我们可以通过调整他们的顺序,实现和串行化相同的效果,这时候称为冲突可串行化.
调整之后的调度如下图所示,最终的效果就是 T2 写了记录 A 和 B.
但是有些调度,不管你如何对调顺序,也无法串行化.
如果无法通过调度实现冲突串行化,我们就只能回滚事务,这样就做了很多的无用功。一种鉴定冲突是否可串行化的方式是根据调度构造出一个优先图,每个事务是图中的顶点,图中的边代表一种冲突操作。下图存在 W-R 和 R-W 冲突,构造出来的优先图是有环的,所以无法串行化.
虽然构造优先图的方法很好使,但是这要求我们事先知道整个调度长什么样子。为了解决此问题,两阶段锁协议横空出世。该协议要求每个事务分为两个阶段提出锁请求:
最初事务处于增长阶段,根据需要获得锁,一旦释放锁就进入缩减阶段。假设下图中 T1 在 R(A) 之前加的是读锁,那么在 R(A) 之后不能通过释放读锁的方式来重新获取写锁,只能通过锁升级的方式将读锁转换为写锁.
虽然两阶段锁可以实现冲突可串行化,但是他无法避免脏读和级联回滚问题,这时候需要使用强两阶段锁协议,只在 Commit 之后一次性释放事务持有的所有锁.
为确保事务操作正确交错,数据库管理系统将使用锁管理器来控制何时允许事务访问数据项。锁管理器的基本思想是维护一个关于当前被活动事务所持有的锁的内部数据结构,事务在允许访问数据项之前向锁管理器发出锁请求。锁管理器将要么将锁授予调用事务,要么阻塞该事务,要么将其中止.
Bustub 中使用的数据结构是锁表 unordered_map<RID, LockRequestQueue> lock_table_ ,如下图所示。锁表实际上是一个哈希表,键为 tuple 的行 ID,值为一个链表,记录了所有对此 tuple 发出加锁请求的事务。深蓝色的事务持有锁,浅蓝色的事务处于阻塞状态.
锁管理器处理锁请求的机制为:
可以看到, 这个机制和读写锁是一样的 ,可以保证锁请求不会发生饿死现象.
LockRequestQueue 的定义如下所示,我们在 LockRequestQueue 中添加了 reader_count_ 和 writer_enter_ 成员,搭配条件变量 cv_ 和锁管理器中的 std::mutex latch_ 可以实现读写锁:
enum class LockMode { SHARED, EXCLUSIVE };
class LockRequest {
public:
LockRequest(txn_id_t txn_id, LockMode lock_mode) : txn_id_(txn_id), lock_mode_(lock_mode), granted_(false) {}
txn_id_t txn_id_;
LockMode lock_mode_;
bool granted_;
};
class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
std::condition_variable cv_; // for notifying blocked transactions on this rid
bool upgrading_ = false;
uint32_t reader_count_ = 0;
bool writer_enter_ = false;
};
实验要求我们实现三种隔离级别:
LockShared()
的时候需要抛出异常 对于加共享锁的请求,如果前面有排它锁请求,就需要阻塞当前事务,直到持有排它锁的事务释放锁.
bool LockManager::LockShared(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
// 收缩阶段不允许上锁
CheckShrinking(txn);
// 不需要重复上锁
if (txn->IsSharedLocked(rid)) {
return true;
}
auto txn_id = txn->GetTransactionId();
// 读未提交不需要加读锁
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn_id, AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
}
// 创建一个加锁请求
auto &queue = lock_table_[rid];
auto &request = queue.request_queue_.emplace_back(txn_id, LockMode::SHARED);
// 没拿到锁就进入阻塞状态
queue.cv_.wait(lock, [&] { return !queue.writer_enter_ || txn->IsAborted(); });
// 死锁会导致事务中止
CheckAborted(txn);
// 更新锁请求的状态
queue.reader_count_++;
request.granted_ = true;
txn->GetSharedLockSet()->emplace(rid);
return true;
}
void LockManager::CheckShrinking(Transaction *txn) {
if (txn->GetState() == TransactionState::SHRINKING) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
}
}
void LockManager::CheckAborted(Transaction *txn) {
if (txn->IsAborted()) {
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
}
}
对于加排它锁的请求,需要等待前方的写请求和读请求完成才能拿到锁:
bool LockManager::LockExclusive(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
CheckShrinking(txn);
if (txn->IsExclusiveLocked(rid)) {
return true;
}
// 创建加锁请求
auto &queue = lock_table_[rid];
auto &request = queue.request_queue_.emplace_back(txn->GetTransactionId(), LockMode::EXCLUSIVE);
// 没有拿到写锁就进入阻塞状态
queue.cv_.wait(lock, [&] { return (!queue.writer_enter_ && queue.reader_count_ == 0) || txn->IsAborted(); });
// 死锁会导致事务中止
CheckAborted(txn);
queue.writer_enter_ = true;
request.granted_ = true;
txn->GetExclusiveLockSet()->emplace(rid);
return true;
}
如果前方有事务提出升级锁的请求,需要抛出异常并中止当前事务,否则得等到当前请求变成请求队列的第一个请求时才能完成升级操作:
bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
txn->GetSharedLockSet()->erase(rid);
auto &queue = lock_table_[rid];
queue.reader_count_--;
auto request_it = GetRequest(txn->GetTransactionId(), rid);
request_it->lock_mode_ = LockMode::EXCLUSIVE;
request_it->granted_ = false;
// 如果前面有正在排队升级锁的事务就直接返回
if (queue.upgrading_) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT);
}
queue.upgrading_ = true;
queue.cv_.wait(lock, [&] { return (!queue.writer_enter_ && queue.reader_count_ == 0) || txn->IsAborted(); });
// 死锁会导致事务中止
CheckAborted(txn);
queue.upgrading_ = false;
queue.writer_enter_ = true;
request_it->granted_ = true;
txn->GetExclusiveLockSet()->emplace(rid);
return true;
}
释放锁之后需要及时唤醒其他被阻塞的事务:
bool LockManager::Unlock(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->erase(rid);
auto request_it = GetRequest(txn->GetTransactionId(), rid);
auto lock_mode = request_it->lock_mode_;
// 更新事务状态,读已提交不需要两阶段锁机制
if (txn->GetState() == TransactionState::GROWING &&
!(lock_mode == LockMode::SHARED && txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED)) {
txn->SetState(TransactionState::SHRINKING);
}
// 从加锁请求队列中移除事务的请求
auto &queue = lock_table_[rid];
queue.request_queue_.erase(request_it);
if (lock_mode == LockMode::SHARED) {
// 唤醒等待读锁的线程
if (--queue.reader_count_ == 0) {
queue.cv_.notify_all();
}
} else {
// 唤醒等待写锁的线程
queue.writer_enter_ = false;
queue.cv_.notify_all();
}
return true;
}
两阶段锁无法避免死锁现象的出现,所以我们需要有一个后台线程来定期做死锁检测。死锁检测的方式是根据事务的等待情况构建一个有向的等待图,如果等待图中有环,就中止环中最年轻的事务,直到没有环出现.
LockManager 中的 unordered_map<txn_id_t, vector<txn_id_t>> waits_for_ 代表等待图,键为事务 T(顶点),值为事务 T 在等待解锁的其他事务的集合,二者组成了等待图中的边.
void LockManager::AddEdge(txn_id_t t1, txn_id_t t2) {
txns_.insert(t1);
txns_.insert(t2);
auto &neighbors = waits_for_[t1];
auto it = std::find(neighbors.begin(), neighbors.end(), t2);
if (it == neighbors.end()) {
neighbors.push_back(t2);
}
}
void LockManager::RemoveEdge(txn_id_t t1, txn_id_t t2) {
auto &neighbors = waits_for_[t1];
auto it = std::find(neighbors.begin(), neighbors.end(), t2);
if (it != neighbors.end()) {
neighbors.erase(it);
}
}
std::vector<std::pair<txn_id_t, txn_id_t>> LockManager::GetEdgeList() {
std::vector<std::pair<txn_id_t, txn_id_t>> edges;
for (auto &[t1, neighbors] : waits_for_) {
for (auto t2 : neighbors) {
edges.emplace_back(t1, t2);
}
}
return edges;
}
要检测环,需要使用深度优先算法:从某个顶点 \(v_s\) 出发,遍历顶点的邻接顶点,如果最终能回到 \(v_s\) ,就说明存在环。具体实现时是在集合 on_stack_txns_ 中维护正在访问的顶点,如果遍历临接顶点的时候能在集合中找到 \(v_s\) 就说明有环.
bool LockManager::HasCycle(txn_id_t *txn_id) {
for (auto &t1 : txns_) {
DFS(t1);
if (has_cycle_) {
*txn_id = *on_stack_txns_.rbegin();
on_stack_txns_.clear();
has_cycle_ = false;
return true;
}
}
on_stack_txns_.clear();
return false;
}
void LockManager::DFS(txn_id_t txn_id) {
if (has_cycle_) {
return;
}
on_stack_txns_.insert(txn_id);
auto &neighbors = waits_for_[txn_id];
std::sort(neighbors.begin(), neighbors.end());
for (auto t2 : neighbors) {
if (!on_stack_txns_.count(t2)) {
DFS(t2);
} else {
has_cycle_ = true;
return;
}
}
on_stack_txns_.erase(txn_id);
}
定期检测环时需要先根据锁表构建等待图,然后移除所有环并中止相关事务,最后清空等待图:
void LockManager::RunCycleDetection() {
while (enable_cycle_detection_) {
std::this_thread::sleep_for(cycle_detection_interval);
{
std::unique_lock<std::mutex> l(latch_);
// 构建等待图
for (auto &[rid, queue] : lock_table_) {
std::vector<txn_id_t> grants;
auto it = queue.request_queue_.begin();
while (it != queue.request_queue_.end() && it->granted_) {
grants.push_back(it->txn_id_);
it++;
}
while (it != queue.request_queue_.end()) {
for (auto &t2 : grants) {
AddEdge(it->txn_id_, t2);
}
wait_rids_[it->txn_id_] = rid;
it++;
}
}
// 移除环中 id 最小的事务
txn_id_t txn_id;
while (HasCycle(&txn_id)) {
AbortTransaction(txn_id);
}
// 清空图
waits_for_.clear();
wait_rids_.clear();
txns_.clear();
}
}
}
void LockManager::AbortTransaction(txn_id_t txn_id) {
auto txn = TransactionManager::GetTransaction(txn_id);
txn->SetState(TransactionState::ABORTED);
waits_for_.erase(txn_id);
// 释放所有 txn 持有的写锁
for (auto &rid : *txn->GetExclusiveLockSet()) {
for (auto &req : lock_table_[rid].request_queue_) {
if (!req.granted_) {
RemoveEdge(req.txn_id_, txn_id);
}
}
}
// 释放所有 txn 持有的读锁
for (auto &rid : *txn->GetSharedLockSet()) {
for (auto &req : lock_table_[rid].request_queue_) {
if (!req.granted_) {
RemoveEdge(req.txn_id_, txn_id);
}
}
}
// 通知 txn 所在线程事务被终止了
lock_table_[wait_rids_[txn_id]].cv_.notify_all();
}
上一节中我们实现了单线程的执行器,现在需要对其进行修改,使其支持三种隔离级别的并发事务.
如果隔离级别高于读未提交,就给事务加上共享锁,读已提交的事务需要在返回 tuple 之前释放锁
void SeqScanExecutor::Unlock(Transaction *txn, const RID &rid) {
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
exec_ctx_->GetLockManager()->Unlock(txn, rid);
}
}
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
auto predicate = plan_->GetPredicate();
auto txn = exec_ctx_->GetTransaction();
while (it_ != table_metadata_->table_->End()) {
*rid = it_->GetRid();
// 上锁
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED && !txn->IsExclusiveLocked(*rid)) {
exec_ctx_->GetLockManager()->LockShared(txn, *rid);
}
*tuple = *it_++;
if (!predicate || predicate->Evaluate(tuple, &table_metadata_->schema_).GetAs<bool>()) {
// 只保留输出列
std::vector<Value> values;
for (auto &col : GetOutputSchema()->GetColumns()) {
values.push_back(col.GetExpr()->Evaluate(tuple, &table_metadata_->schema_));
}
*tuple = {values, GetOutputSchema()};
// 解锁
Unlock(txn, *rid);
return true;
}
Unlock(txn, *rid);
}
return false;
}
插入的时候需要给 tuple 加写锁,但是不应该在 TableHeap::InsertTuple() 后面加,而是应该在函数里面加,这样插入之后才不会有其他事务改动了此 tuple。为了实现回滚操作,还需要添加一条 IndexWriteRecord 。由于 TableHeap::InsertTuple() 内部已经添加了 TableWriteRecord ,所以我们无需再次添加.
void InsertExecutor::InsertTuple(Tuple *tuple, RID *rid) {
// 更新数据表,需要在 TablePage::InsertTuple 中加锁
table_metadata_->table_->InsertTuple(*tuple, rid, exec_ctx_->GetTransaction());
// 更新索引
for (auto &index_info : index_infos_) {
index_info->index_->InsertEntry(
tuple->KeyFromTuple(table_metadata_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
IndexWriteRecord record(*rid, table_metadata_->oid_, WType::INSERT, *tuple, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->AppendTableWriteRecord(record);
}
}
更新之前需要将读锁升级为写锁,如果先前没有拿锁就需要直接请求写锁.
bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
if (!child_executor_->Next(tuple, rid)) {
return false;
}
// 更新数据表
auto new_tuple = GenerateUpdatedTuple(*tuple);
// 加锁
auto txn = exec_ctx_->GetTransaction();
if (txn->IsSharedLocked(*rid)) {
exec_ctx_->GetLockManager()->LockUpgrade(txn, *rid);
} else {
exec_ctx_->GetLockManager()->LockExclusive(txn, *rid);
}
table_info_->table_->UpdateTuple(new_tuple, *rid, exec_ctx_->GetTransaction());
// 更新索引
for (auto &index_info : index_infos_) {
// 删除旧的 tuple
index_info->index_->DeleteEntry(
tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
// 插入新的 tuple
index_info->index_->InsertEntry(
new_tuple.KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
IndexWriteRecord record(*rid, table_info_->oid_, WType::UPDATE, *tuple, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->AppendTableWriteRecord(record);
}
return true;
}
通过这次实验,可以加深对隔离级别和两阶段锁协议的理解,代码上层面多了对多线程同步技术的要求,不过做过去年实验的话应该问题也不大,以上~ 。
最后此篇关于CMU15445(Fall2020)数据库系统Project#4-ConcurrencyControl详解的文章就讲到这里了,如果你想了解更多关于CMU15445(Fall2020)数据库系统Project#4-ConcurrencyControl详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
全称“Java Virtual Machine statistics monitoring tool”(statistics 统计;monitoring 监控;tool 工具) 用于监控虚拟机的各种运
主要是讲下Mongodb的索引的查看、创建、删除、类型说明,还有就是Explain执行计划的解释说明。 可以转载,但请注明出处。  
1>单线程或者单进程 相当于短链接,当accept之后,就开始数据的接收和数据的发送,不接受新的连接,即一个server,一个client 不存在并发。 2>循环服务器和并发服务器
详解 linux中的关机和重启命令 一 shutdown命令 shutdown [选项] 时间 选项: ?
首先,将json串转为一个JObject对象: ? 1
matplotlib官网 matplotlib库默认英文字体 添加黑体(‘SimHei')为绘图字体 代码: plt.rcParams['font.sans-serif']=['SimHei'
在并发编程中,synchronized关键字是常出现的角色。之前我们都称呼synchronized关键字为重量锁,但是在jdk1.6中对synchronized进行了优化,引入了偏向锁、轻量锁。本篇
一般我们的项目中会使用1到2个数据库连接配置,同程艺龙的数据库连接配置被收拢到统一的配置中心,由DBA统一配置和维护,业务方通过某个字符串配置拿到的是Connection对象。  
实例如下: ? 1
1. MemoryCahe NetCore中的缓存和System.Runtime.Caching很相似,但是在功能上做了增强,缓存的key支持object类型;提供了泛型支持;可以读缓存和单个缓存
argument是javascript中函数的一个特殊参数,例如下文,利用argument访问函数参数,判断函数是否执行 复制代码 代码如下: <script
一不小心装了一个Redis服务,开了一个全网的默认端口,一开始以为这台服务器没有公网ip,结果发现之后悔之莫及啊 某天发现cpu load高的出奇,发现一个minerd进程 占了大量cpu,googl
今天写这个是为了 提醒自己 编程过程 不仅要有逻辑 思想 还有要规范 代码 这样可读性 1、PHP 编程规范与编码习惯最主要的有以下几点: 1 文件说明 2 funct
摘要:虚拟机安装时一般都采用最小化安装,默认没有lspci工具。一台测试虚拟网卡性能的虚拟机,需要lspci工具来查看网卡的类型。本文描述了在一个虚拟机中安装lspci工具的具体步骤。 由于要测试
1、修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统
目录 算术运算符 基本四则运算符 增量赋值运算符 自增/自减运算符 关系运算符 逻
如下所示: ? 1
MapperScannerConfigurer之sqlSessionFactory注入方式讲解 首先,Mybatis中的有一段配置非常方便,省去我们去写DaoImpl(Dao层实现类)的时间,这个
Linux的网络虚拟化是LXC项目中的一个子项目,LXC包括文件系统虚拟化,进程空间虚拟化,用户虚拟化,网络虚拟化,等等,这里使用LXC的网络虚拟化来模拟多个网络环境。 本文从基本的网络设备讲
? 1
我是一名优秀的程序员,十分优秀!