- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
目前一个项目中数据持久化采用EF Core + MySQL,使用CodeFirst模式开发,并且对数据进行了分库,按照目前颗粒度分完之后,大概有一两百个库,每个库的数据都是相互隔离的。 借鉴了Github上一个开源的仓库 arch/UnitOfWork 实现UnitOfWork,核心操作就是每个api请求的时候带上库名,在执行CRUD之前先将DbContext切换到目标数据库,我们在切换数据库的时候加了一些操作,如检查数据库是否已创建、检查连接是否可用、判断是否需要 表结构迁移 等 。
/// <summary>
/// 切换数据库 这要求数据库在同一台机器上 注意:这只适用于MySQL。
/// </summary>
/// <param name="database">目标数据库</param>
public void ChangeDatabase(string database)
{
// 检查连接
......
// 检查数据库是否创建
......
var connection = _context.Database.GetDbConnection();
if (connection.State.HasFlag(ConnectionState.Open))
{
connection.ChangeDatabase(database);
}
else
{
var connectionString = Regex.Replace(connection.ConnectionString.Replace(" ", ""), @"(?<=[Dd]atabase=)\w+(?=;)", database, RegexOptions.Singleline);
connection.ConnectionString = connectionString;
}
// 判断是否需要执行表结构迁移
if(_context..Database.GetPendingMigrations().Any())
{
//自定义的迁移的一些逻辑
_context.Database.Migrate(_context);
}
}
但是当多个操作同时对一个库进行Migrate的时候,就会出现问题,比如“新增一张表”的操作已经被第一个迁移执行过了,第二个执行的迁移并不知道已经执行过了Migrate,就会报错表已存在。 于是考虑在执行Migrate的时候,加入一个锁的机制,对当前数据库执行Migrate之前先获取锁,然后再来决定接下来的操作。由于这边有的服务无法访问Redis,这里使用数据库来实现锁的机制,当然用Redis来实现更好,加入锁的机制只是一种解决问题的思路.
MigrationLocks
表来实现迁移锁CREATE TABLE IF NOT EXISTS MigrationLocks (
LockName VARCHAR(255) PRIMARY KEY,
LockedAt DATETIME NOT NULL
);
INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());
DELETE FROM MigrationLocks WHERE LockName = @database;
SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;
/// <summary>
/// 迁移锁
/// </summary>
public interface IMigrateLock
{
/// <summary>
/// 尝试获取锁
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
bool TryAcquireLock(IDbConnection connection);
/// <summary>
/// 尝试获取锁
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
Task<bool> TryAcquireLockAsync(IDbConnection connection);
/// <summary>
/// 释放锁
/// </summary>
void ReleaseLock(IDbConnection connection);
/// <summary>
/// 释放锁
/// </summary>
/// <returns></returns>
Task ReleaseLockAsync(IDbConnection connection);
}
/// <summary>
/// 迁移锁
/// </summary>
public class MigrateLock : IMigrateLock
{
private readonly ILogger<MigrateLock> _logger;
public MigrateLock(ILogger<MigrateLock> logger)
{
_logger = logger;
}
private const string CreateTableSql = @"
CREATE TABLE IF NOT EXISTS MigrationLocks (
LockName VARCHAR(255) PRIMARY KEY,
LockedAt DATETIME NOT NULL
);";
private const string CheckLockedSql = "SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;";
private const string AcquireLockSql = "INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());";
private const string ReleaseLockSql = "DELETE FROM MigrationLocks WHERE LockName = @database;";
/// <summary>
/// 尝试获取锁
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
public bool TryAcquireLock(IDbConnection connection)
{
try
{
CheckLocked(connection);
var result = connection.Execute(AcquireLockSql, new { database = connection.Database });
if (result == 1)
{
_logger.LogInformation("Lock acquired: {LockName}", connection.Database);
return true;
}
_logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database);
return false;
}
catch (Exception ex)
{
if (ex.Message.StartsWith("Duplicate"))
{
_logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database);
}
else
{
_logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database);
}
return false;
}
}
/// <summary>
/// 尝试获取锁
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
public async Task<bool> TryAcquireLockAsync(IDbConnection connection)
{
try
{
await CheckLockedAsync(connection);
var result = await connection.ExecuteAsync(AcquireLockSql, new { database = connection.Database });
if (result == 1)
{
_logger.LogInformation("Lock acquired: {LockName}", connection.Database);
return true;
}
_logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database);
return false;
}
catch (Exception ex)
{
if (ex.Message.StartsWith("Duplicate"))
{
_logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database);
}
else
{
_logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database);
}
return false;
}
}
/// <summary>
/// 释放锁
/// </summary>
public void ReleaseLock(IDbConnection connection)
{
try
{
connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database });
_logger.LogInformation("Lock released: {LockName}", connection.Database);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database);
}
}
/// <summary>
/// 释放锁
/// </summary>
public async Task ReleaseLockAsync(IDbConnection connection)
{
try
{
await connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database });
_logger.LogInformation("Lock released: {LockName}", connection.Database);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database);
}
}
/// <summary>
/// 检查锁
/// </summary>
private void CheckLocked(IDbConnection connection)
{
connection.Execute(CreateTableSql);
var databaseParam = new
{
database = connection.Database
};
var lockExists = connection.QueryFirstOrDefault<int>(CheckLockedSql, databaseParam);
if (lockExists <= 0)
{
return;
}
_logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");
connection.Execute(ReleaseLockSql, databaseParam);
}
/// <summary>
/// 检查锁
/// </summary>
private async Task CheckLockedAsync(IDbConnection connection)
{
await connection.ExecuteAsync(CreateTableSql);
var databaseParam = new
{
database = connection.Database
};
var lockExists = await connection.QueryFirstOrDefaultAsync<int>(CheckLockedSql, databaseParam);
if (lockExists <= 0)
{
return;
}
_logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");
await connection.ExecuteAsync(ReleaseLockSql, databaseParam);
}
}
/// <summary>
/// 数据库迁移执行器
/// </summary>
public interface IMigrateExcutor
{
/// <summary>
/// 执行迁移
/// </summary>
/// <param name="dbContext"></param>
void Migrate(DbContext dbContext);
/// <summary>
/// 执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <returns></returns>
Task MigrateAsync(DbContext dbContext);
/// <summary>
/// 并发场景执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
void ConcurrentMigrate(DbContext dbContext, bool wait = true);
/// <summary>
/// 并发场景执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
/// <returns></returns>
Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true);
/// <summary>
/// 并发场景执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true);
/// <summary>
/// 并发场景执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true);
}
/// <summary>
/// 数据库迁移执行器
/// </summary>
public class MigrateExcutor : IMigrateExcutor
{
private readonly IMigrateLock _migrateLock;
private readonly ILogger<MigrateExcutor> _logger;
public MigrateExcutor(
IMigrateLock migrateLock,
ILogger<MigrateExcutor> logger)
{
_migrateLock = migrateLock;
_logger = logger;
}
/// <summary>
/// 执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <returns></returns>
public void Migrate(DbContext dbContext)
{
try
{
if (dbContext.Database.GetPendingMigrations().Any())
{
dbContext.Database.Migrate();
}
}
catch (Exception e)
{
_logger.LogError(e, "Migration failed");
HandleError(dbContext, e);
}
}
/// <summary>
/// 执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <returns></returns>
public async Task MigrateAsync(DbContext dbContext)
{
try
{
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
{
await dbContext.Database.MigrateAsync();
}
}
catch (Exception e)
{
_logger.LogError(e, "Migration failed");
await HandleErrorAsync(dbContext, e);
}
}
/// <summary>
/// 并发场景执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
/// <returns></returns>
public void ConcurrentMigrate(DbContext dbContext, bool wait = true)
{
if (!dbContext.Database.GetPendingMigrations().Any())
{
return;
}
using var connection = MySqlConnectionHelper.CreateConnection(dbContext.Database.GetDbConnection().Database);
ConcurrentMigrate(dbContext, connection, wait);
}
/// <summary>
/// 并发场景执行迁移
/// </summary>
/// <param name="dbContext"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
/// <returns></returns>
public async Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true)
{
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
{
return;
}
await using var connection = await MySqlConnectionHelper.CreateConnectionAsync(dbContext.Database.GetDbConnection().Database);
await ConcurrentMigrateAsync(dbContext, connection, wait);
}
/// <summary>
/// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成)
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
public void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true)
{
if (!dbContext.Database.GetPendingMigrations().Any())
{
return;
}
while (true)
{
if (_migrateLock.TryAcquireLock(connection))
{
try
{
Migrate(dbContext);
break;
}
finally
{
_migrateLock.ReleaseLock(connection);
}
}
if (wait)
{
_logger.LogWarning("Migration is locked, wait for 2 seconds");
Thread.Sleep(20000);
continue;
}
_logger.LogInformation("Migration is locked, skip");
}
}
/// <summary>
/// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成)
/// </summary>
/// <param name="dbContext"></param>
/// <param name="connection"></param>
/// <param name="wait">是否等待至正在进行中的迁移完成</param>
public async Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true)
{
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
{
return;
}
while (true)
{
if (await _migrateLock.TryAcquireLockAsync(connection))
{
try
{
await MigrateAsync(dbContext);
break;
}
finally
{
await _migrateLock.ReleaseLockAsync(connection);
}
}
if (wait)
{
_logger.LogWarning("Migration is locked, wait for 2 seconds");
Thread.Sleep(20000);
continue;
}
_logger.LogInformation("Migration is locked, skip");
break;
}
}
private void HandleError(DbContext dbContext, Exception e)
{
var needChangeList = dbContext.Database.GetPendingMigrations().ToList();
var allChangeList = dbContext.Database.GetMigrations().ToList();
var hasChangeList = dbContext.Database.GetAppliedMigrations().ToList();
if (needChangeList.Count + hasChangeList.Count > allChangeList.Count)
{
int errIndex = allChangeList.Count - needChangeList.Count;
if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])
{
int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal);
string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);
if (hasChangeList[errIndex].EndsWith(errSuffix))
{
dbContext.Database.ExecuteSqlRaw($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");
dbContext.Database.Migrate();
}
else
{
throw e;
}
}
else
{
throw e;
}
}
else
{
throw e;
}
_logger.LogInformation("Migration failed, but success on second try.");
}
private async Task HandleErrorAsync(DbContext dbContext, Exception e)
{
var needChangeList = (await dbContext.Database.GetPendingMigrationsAsync()).ToList();
var allChangeList = dbContext.Database.GetMigrations().ToList();
var hasChangeList = (await dbContext.Database.GetAppliedMigrationsAsync()).ToList();
if (needChangeList.Count + hasChangeList.Count > allChangeList.Count)
{
int errIndex = allChangeList.Count - needChangeList.Count;
if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])
{
int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal);
string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);
if (hasChangeList[errIndex].EndsWith(errSuffix))
{
await dbContext.Database.ExecuteSqlRawAsync($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");
await dbContext.Database.MigrateAsync();
}
else
{
throw e;
}
}
else
{
throw e;
}
}
else
{
throw e;
}
_logger.LogInformation("Migration failed, but success on second try.");
}
}
最后此篇关于EntityFrameworkCore并发迁移解决方案的文章就讲到这里了,如果你想了解更多关于EntityFrameworkCore并发迁移解决方案的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我只是想知道要安装哪个版本的 Visual Studio 2010(专业版或高级版)提示升级项目.. 项目包括:asp.net mvc、数据库和silverlight。 最佳答案 通常,由不同版本的相
几种通过 iproute2 来打通不同节点间容器网络的方式 几种通过 iproute2 来打通不同节点间容器网络的方式 host-gw ipip vxlan 背景 之前由于需
目录 前言 1、TypeHandler 简介 1.1转换步骤 1.2转换规则 2、JSON 转换 3、枚举转换 4、文章小结
目录 前言 1、常见 key-value 2、时效性强 3、计数器相关 4、高实时性 5、排行榜系列 6、文章小结 前言 在笔者 3 年的
目录 前言 四、技术选型 五、后端接口设计 5.1业务系统接口 5.2App 端接口 六、关键逻辑实现 6.1Red
目录 前言 一、需求分析 1.1发送通知 1.2撤回通知 1.3通知消息数 1.4通知消息列表 二、数据模型设计
目录 前言 一、多租户的概念 二、隔离模式 2.1独立数据库模式 2.2共享数据库独立数据架构 2.3共享数据库共享数据架构
导读: 虽然锁在一定程度上能够解决并发问题,但稍有不慎,就可能造成死锁。本文介绍死锁的产生及处理。 死锁的产生和预防 发生死锁的必要条件有4个,分别为互斥条件、不可剥夺条件、请求与保持条件和循环等待条
在浏览网页后,我找不到任何功能来执行此操作,我有可行的个人解决方案。也许它对某人有用。 **使用 Moment 插件转换日期。***moment(currentPersianDate).clone()
是否有一种解决方案可以很好地处理数字(1-10)手写?我试过tesseract,但我得到的只是垃圾。 理想情况下是 OSS,但商业也可以。 最佳答案 OpenCV 现在带有手写数字识别 OCR 示例。
在服务器应用程序上,我们有以下内容:一个称为 JobManager 的单例类。另一个类,Scheduler,不断检查是否需要向 JobManager 添加任何类型的作业。 当需要这样做时,调度程序会执
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 5年前关闭。 Improve this qu
当您尝试从 GitHub 存储库安装某些 R 包时 install_github('rWBclimate', 'ropensci') 如果您遇到以下错误: Installing github repo
问题在以下链接中进行了描述和演示: Paul Stovell WPF: Blurry Text Rendering www.gamedev.net forum Microsoft Connect: W
我正在寻找一种解决方案,使用标准格式 a × 10 b 在科学记数法下格式化 R 中的数字。一些同行评审的科学期刊都要求这样做,并且手动修改图表可能会变得乏味。 下面是 R 标准“E 表示法”的示例,
已编辑解决方案(如下...) 我有一个启动画面,它被打包到它自己的 jar 中。它有效。 我可以通过以下方式从另一个 java 应用程序内部调用 Splash.jar: Desktop.getDesk
什么是创建像 PageFlakes 或 iGoogle 这样的门户网站的好框架/包? ?我们希望创建一个为员工提供 HR 服务的员工/HR 门户,但我们也需要一种足够灵活的产品,以便我们可以使用它来为
我正在寻找一种解决方案,使用标准格式 a × 10 b 在科学记数法下格式化 R 中的数字。一些同行评审的科学期刊都要求这样做,并且手动修改图表可能会变得乏味。 下面是 R 标准“E 表示法”的示例,
如何将 solr 与 heritrix 集成? 我想使用 heritrix 归档一个站点,然后使用 solr 在本地索引和搜索该文件。 谢谢 最佳答案 使用 Solr 进行索引的问题在于它是一个纯文本
完整日历不包含工作时间功能选项(在任何一天的议程 View 中选择第一行和最后一行 - 例如公司不工作)。我做到了类似的事情: viewDisplay: function(view){
我是一名优秀的程序员,十分优秀!