gpt4 book ai didi

NestJS 使用 Redis 实现外部事件总线

转载 作者:行者123 更新时间:2023-12-05 00:55:21 27 4
gpt4 key购买 nike

我正在尝试将我的 nestjs 应用程序的 cqrs 设置与外部消息服务(例如 Redis)集成。我找到了 pull requestcomment在nestJS github上说我应该能够将我的查询/事件/命令总线与cqrs 7.0版以来的外部服务集成。

我一直在尝试实现这一点,但我无法从nestjs 中找到关于该主题的太多信息。我唯一能找到的是 outdated configuration exampleopen topic在 github 上创建有关如何实现此功能的教程。我设法通过取消我可以在 github 上找到的关于这个主题的有限帮助来替换默认的发布者和订阅者,但我真的不明白如何使用它来连接到外部服务,或者这是否是最好的方法这个问题。

EventBus

import { RedisEventSubscriber } from '../busses/redisEventSubscriber';
import { RedisEventPublisher } from '../busses/redisEventPublisher';
import { OnModuleInit } from '@nestjs/common';
import { ModuleRef } from "@nestjs/core";
import { CommandBus, EventBus as NestJsEventBus } from "@nestjs/cqrs";

export class EventBus extends NestJsEventBus implements OnModuleInit {

constructor( commandBus: CommandBus, moduleRef: ModuleRef) {
super(commandBus, moduleRef);
}

onModuleInit() {

const subscriber = new RedisEventSubscriber();
subscriber.bridgeEventsTo(this._subject$);
this.publisher = new RedisEventPublisher();

}
}

出版商

export class RedisEventPublisher implements IEventPublisher {

publish<T extends IEvent = IEvent>(event: T) {
console.log("Event published to Redis")
}
}

订阅者

export class RedisEventSubscriber implements IMessageSource {

bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
console.log('bridged event to thingy')
}
}

如果之前使用外部消息系统设置了 nestjs 的任何人可以分享他们的想法或分享如何正确执行此操作的资源,我们将不胜感激。

最佳答案

所以几天后,我设法找到了两种连接到外部事件总线的方法。我发现我真的不需要外部命令或查询总线,因为它们是通过 API 调用进来的。因此,如果您想使用 NestJS 连接到外部事件总线,这是我发现的两个选项:

  1. 通过自定义发布者和订阅者
  2. 通过 NestJS 微服务包

这两种方法的主要区别在于它们连接到外部事件总线的方式以及它们处理传入消息的方式。根据您的需要,一个可能比另一个更适合您,但我选择了第一个选项。

自定义发布者和订阅者

在我的应用程序中,我已经通过使用 NestJS 中的 EventBus 类并为我的事件调用 .publish() 对我的事件总线进行手动发布调用。我创建了一个服务,它与自定义发布者和自定义订阅者一起围绕本地 NestJS 事件总线。

eventBusService.ts

export class EventBusService implements IEventBusService {

constructor(
private local: EventBus, // Injected from NestJS CQRS Module
@Inject('eventPublisher') private publisher: IEventPublisher,
@Inject('eventSubscriber') subscriber: IMessageSource) {
subscriber.bridgeEventsTo(this.local.subject$);
}

publish(event: IEvent): void {
this.publisher.publish(event);
};
}

事件服务使用自定义订阅者使用 .bridgeEventsTo() 将来自远程事件总线的任何传入事件重定向到本地事件总线。自定义订阅者使用 redis NPM 包的客户端与事件总线通信。

subscriber.ts

export class RedisEventSubscriber implements IMessageSource {

constructor(@Inject('redisClient') private client: RedisClient) {}

bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
this.client.subscribe('Foo');
this.client.on("message", (channel: string, message: string) => {

const { payload, header } = JSON.parse(message);
const event = Events[header.name];

subject.next(new event(data.event.payload));
});
}
};

此函数还包含将传入的 Redis 事件映射为事件的逻辑。为此,我创建了一个字典,将我的所有事件保存在 app.module 中,以便我可以查找事件是否知道如何处理传入事件。然后,它使用新事件调用 subject.next(),以便将其放在内部 NestJS 事件总线上。

publisher.ts

为了根据我自己的事件更新其他系统,我创建了一个发布者,将数据发送到 Redis。

export class RedisEventPublisher implements IEventPublisher {

constructor(@Inject('redisClient') private client: RedisClient) {}

publish<T extends IEvent = IEvent>(event: T) {
const name = event.constructor.name;
const request = {
header: {
name
},
payload: {
event
}
}
this.client.publish('Foo', JSON.stringify(request));
}
}

和订阅者一样,这个类使用 NPM 包客户端向 Redis 事件总线发送事件。


微服务设置

某些部分的微服务设置与自定义事件服务方法非常相似。它使用相同的发布者类,但订阅设置的完成方式不同。它使用 NestJS 微服务包来设置一个微服务,监听传入的消息,然后调用 eventService 将传入的事件发送到事件总线。

eventService.ts

export class EventBusService implements IEventBusService {

constructor(
private eventBus: EventBus,
@Inject('eventPublisher') private eventPublisher: IEventPublisher,) {
}

public publish<T extends IEvent>(event: T): void {

const data = {
payload: event,
eventName: event.constructor.name
}

this.eventPublisher.publish(data);
};

async handle(string: string) : Promise<void> {

const data = JSON.parse(string);
const event = Events[data.event.eventName];

if (!event) {
console.log(`Could not find corresponding event for
${data.event.eventName}`);
};

await this.eventBus.publish(new event(data.event.payload));
}
}

NestJS 有关于如何设置混合服务的文档,可以找到 here .微服务包为您提供了一个 @EventPattern() 装饰器,您可以使用它为传入的事件总线消息创建处理程序,您只需将它们添加到 NestJS Controller 并注入(inject) eventService。

controller.ts

@Controller()
export default class EventController {

constructor(@Inject('eventBusService') private eventBusService:
IEventBusService) {}

@EventPattern(inviteServiceTopic)
handleInviteServiceEvents(data: string) {
this.eventBusService.handle(data)
}
}

因为我不想创建一个混合应用程序来监听传入的事件,所以我决定使用第一个选项。代码很好地组合在一起,而不是使用带有 @EventPattern() 装饰器的随机 Controller 。

这需要很长时间才能弄清楚,所以我希望它对将来的人有所帮助。 :)

关于NestJS 使用 Redis 实现外部事件总线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64769673/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com