gpt4 book ai didi

c# - 带有 Masstransit 发布的中间件

转载 作者:太空狗 更新时间:2023-10-30 01:12:34 28 4
gpt4 key购买 nike

我有 .net 核心 WEB API 应用程序 MassTransit (用于实现 RabbitMQ 消息代理)。 RabbitMQ-MassTransit 配置很简单,只需在 Startup.cs 文件中用几行代码即可完成。

services.AddMassTransit(x =>
{
x.AddConsumer<CustomLogConsume>();

x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
{
h.Username("guest");
h.Password("guest");
});

cfg.ExchangeType = ExchangeType.Fanout;

cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
{
e.PrefetchCount = 16;
});

// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
});

我在我的项目解决方案中使用依赖注入(inject)以获得更好的代码标准。发布消息适用于 Controller 依赖注入(inject)。但是当我实现自定义 middle ware 时对于日志操作,Masstransit 未能正确发布消息,它在 RabbitMQ Web 控制台中创建了一个带有 _error 的额外队列。

public class RequestResponseLoggingMiddleware
{
#region Private Variables

/// <summary>
/// RequestDelegate
/// </summary>
private readonly RequestDelegate _next;

/// <summary>
/// IActionLogPublish
/// </summary>
private readonly IActionLogPublish _logPublish;

#endregion

#region Constructor
public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
{
_next = next;
_logPublish = logPublish;
}
#endregion

#region PrivateMethods

#region FormatRequest
/// <summary>
/// FormatRequest
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
private async Task<ActionLog> FormatRequest(HttpRequest request)
{
ActionLog actionLog = new ActionLog();
var body = request.Body;
request.EnableRewind();

var context = request.HttpContext;

var buffer = new byte[Convert.ToInt32(request.ContentLength)];
await request.Body.ReadAsync(buffer, 0, buffer.Length);
var bodyAsText = Encoding.UTF8.GetString(buffer);
request.Body = body;

var injectedRequestStream = new MemoryStream();

var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";

using (var bodyReader = new StreamReader(context.Request.Body))
{
bodyAsText = bodyReader.ReadToEnd();

if (string.IsNullOrWhiteSpace(bodyAsText) == false)
{
requestLog += $", Body : {bodyAsText}";
}

var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
injectedRequestStream.Seek(0, SeekOrigin.Begin);
context.Request.Body = injectedRequestStream;
}

actionLog.Request = $"{bodyAsText}";
actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";

return actionLog;
}
#endregion

#region FormatResponse
private async Task<string> FormatResponse(HttpResponse response)
{
response.Body.Seek(0, SeekOrigin.Begin);
var text = await new StreamReader(response.Body).ReadToEndAsync();
response.Body.Seek(0, SeekOrigin.Begin);

return $"Response {text}";
}
#endregion

#endregion

#region PublicMethods

#region Invoke
/// <summary>
/// Invoke - Hits before executing any action. Actions call executes from _next(context)
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public async Task Invoke(HttpContext context)
{
ActionLog actionLog = new ActionLog();

actionLog = await FormatRequest(context.Request);


var originalBodyStream = context.Response.Body;

using (var responseBody = new MemoryStream())
{
context.Response.Body = responseBody;

await _next(context);

actionLog.Response = await FormatResponse(context.Response);

await _logPublish.Publish(actionLog);
await responseBody.CopyToAsync(originalBodyStream);
}
}
#endregion

#endregion
}

启动时配置中间件

  public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
{
............
app.UseMiddleware<RequestResponseLoggingMiddleware>();
....................
}

MassTransit 在启动时是否有任何额外的配置来与中间件一起工作

编辑

IActionLogPublish

public interface IActionLogPublish
{
Task Publish(ActionLog model);
}

ActionLogPublish

public class ActionLogPublish : IActionLogPublish
{

private readonly IBus _bus;

public ActionLogPublish(IBus bus)
{
_bus = bus;
}

public async Task Publish(ActionLog actionLogData)
{
/* Publish values to RabbitMQ Service Bus */

await _bus.Publish(actionLogData);

/* Publish values to RabbitMQ Service Bus */
}

}

编辑

RabbitMQ 网络控制台

enter image description here

最佳答案

中间件需要将原始主体放回响应中。

注入(inject)的依赖项也适用于 Controller 而不是中间件,因为它可能在作用域生命周期内注册。

在那种情况下,它不应该被构造函数注入(inject)到 middlewre 中,而是直接注入(inject)到 Invoke

Because middleware is constructed at app startup, not per-request, scoped lifetime services used by middleware constructors aren't shared with other dependency-injected types during each request. If you must share a scoped service between your middleware and other types, add these services to the Invoke method's signature. The Invoke method can accept additional parameters that are populated by DI:

//...omitted for brevity

public RequestResponseLoggingMiddleware(RequestDelegate next) {
_next = next;
}

//...

private async Task<string> FormatResponseStream(Stream stream) {
stream.Seek(0, SeekOrigin.Begin);
var text = await new StreamReader(stream).ReadToEndAsync();
stream.Seek(0, SeekOrigin.Begin);
return $"Response {text}";
}

public async Task Invoke(HttpContext context, IActionLogPublish logger) {
ActionLog actionLog = await FormatRequest(context.Request);
//keep local copy of response stream
var originalBodyStream = context.Response.Body;

using (var responseBody = new MemoryStream()) {
//replace stream for down stream calls
context.Response.Body = responseBody;

await _next(context);

//put original stream back in the response object
context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT

//Copy local stream to original stream
responseBody.Position = 0;
await responseBody.CopyToAsync(originalBodyStream);

//custom logging
actionLog.Response = await FormatResponse(responseBody);
await logger.Publish(actionLog);
}
}

引用Dependency injection in ASP.NET Core: Scoped Service lifetime

When using a scoped service in a middleware, inject the service into the Invoke or InvokeAsync method. Don't inject via constructor injection because it forces the service to behave like a singleton. For more information, see Write custom ASP.NET Core middleware.

强调我的

关于c# - 带有 Masstransit 发布的中间件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58297986/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com