gpt4 book ai didi

c# - 多个物理服务器上的单个应用程序中的 Hangfire

转载 作者:行者123 更新时间:2023-12-05 05:48:22 28 4
gpt4 key购买 nike

我在单个 Web 应用程序中运行 hangfire,我的应用程序在 2 个物理服务器上运行,但 hangfire 在 1 个数据库中。

目前,我正在为每个队列生成一个服务器,因为每个队列我需要一次运行 1 个工作人员并且他们必须按顺序运行。我是这样设置的

// core
services.AddHangfire(options =>
{
options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
options.UseSimpleAssemblyNameTypeSerializer();
options.UseRecommendedSerializerSettings();
options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});

// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
options.ServerName = "workflow-queue";
options.WorkerCount = 1;
options.Queues = new string[] { "workflow-queue" };
options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});

services.AddHangfireServer(options =>
{
options.ServerName = "alert-schedule";
options.WorkerCount = 1;
options.Queues = new string[] { "alert-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
options.ServerName = string.Format("trigger-schedule");
options.WorkerCount = 1;
options.Queues = new string[] { "trigger-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
options.ServerName = "report-schedule";
options.WorkerCount = 1;
options.Queues = new string[] { "report-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
options.ServerName = "maintenance";
options.WorkerCount = 5;
options.Queues = new string[] { "maintenance" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});

我的问题是它在具有不同端口的服务器上生成多个队列。 enter image description here

在我的代码中,如果作业正在排队/重试,我会尝试停止运行,但如果作业正在不同的物理服务器上运行,则找不到并再次排队。

这是检查它是否已经运行的代码

public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
var disableJob = false;
var monitoringApi = JobStorage.Current.GetMonitoringApi();

// get the jobId, method and queue using performContext
var jobId = context.BackgroundJob.Id;
var methodInfo = context.BackgroundJob.Job.Method;
var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));

// enqueuedJobs
var enqueuedjobStatesToCheck = new[] { "Processing" };
var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));

if (enqueuedJobsAlready > 0)
disableJob = true;

// scheduledJobs
if (!disableJob)
{
// check if there are any scheduledJobs that are processing
var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

if (scheduledJobsAlready > 0)
disableJob = true;
}

// failedJobs
if (!disableJob)
{
var failedJobs = monitoringApi.FailedJobs(0, 1000);
var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

if (failedJobsAlready > 0)
disableJob = true;
}

// if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
if (disableJob)
{
// use hangfire delete, for cleanup
BackgroundJob.Delete(jobId);

// create our sqlBuilder to remove the entries altogether including the count
var sqlBuilder = new SqlBuilder()
.DELETE_FROM("Hangfire.[Job]")
.WHERE("[Id] = {0};", jobId);

sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");

using (var cmd = _context.CreateCommand(sqlBuilder))
await cmd.ExecuteNonQueryAsync();

return true;
}

return false;
}

每个方法都有类似下面的属性

public interface IAlertScheduleService
{
[Hangfire.Queue("alert-schedule")]
[Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
Task RunAllAsync(PerformContext context);
}

接口(interface)的简单实现

public class AlertScheduleService : IAlertScheduleService
{
public Task RunAllAsync(PerformContext context)
{
if (IsAlreadyQueuedAsync(context))
return;

// guess it isnt queued, so run it here....
}
}

这是我添加计划任务的方式

//// our recurring jobs
//// set these to run hourly, so they can play "catch-up" if needed
RecurringJob.AddOrUpdate<IAlertScheduleService>(e => e.RunAllAsync(null), Cron.Hourly(0), queue: "alert-schedule");

为什么会这样?我怎样才能阻止它发生?

最佳答案

有点盲目,如果作业已经在同一队列中排队,则阻止作业排队。try-catch 逻辑非常丑陋,但我现在没有更好的主意......另外,真的不确定锁定逻辑总是阻止在 EnqueudState 中有两个作业,但无论如何它应该有所帮助。可能与 IApplyStateFilter 混合使用。

public class DoNotQueueIfAlreadyQueued : IElectStateFilter
{
public void OnStateElection(ElectStateContext context)
{
if (context.CandidateState is EnqueuedState)
{
EnqueuedState es = context.CandidateState as EnqueuedState;
IDisposable distributedLock = null;
try
{
while (distributedLock == null)
{
try
{
distributedLock = context.Connection.AcquireDistributedLock($"{nameof(DoNotQueueIfAlreadyQueued)}-{es.Queue}", TimeSpan.FromSeconds(1));
}
catch { }
}

var m = context.Storage.GetMonitoringApi();
if (m.EnqueuedCount(es.Queue) > 0)
{
context.CandidateState = new DeletedState();
}
}
finally
{
distributedLock.Dispose();
}
}
}
}

可以声明过滤器as in this answer

关于c# - 多个物理服务器上的单个应用程序中的 Hangfire,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70832860/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com