gpt4 book ai didi

javascript - 每次生产 Kafka 消息时都需要连接吗?

转载 作者:行者123 更新时间:2023-12-04 17:19:13 24 4
gpt4 key购买 nike

我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和干净。
我正在使用 使用 KafkaJS 的 Node 和 Kafka .
我是新手而且我在互联网上的任何地方都找不到在生产应用程序中使用它的完美/好方法。
问题(简而言之) :我们是否需要在每次生成新消息时都以生产者身份连接。我们不能保持联系吗 Redis 或 NATS .
这是我迄今为止尝试过的:
用例
假设每次创建新用户时我都需要发送一条消息。
1.创建afka客户端

Created kafka client so that we can don't have to re-configure it everytime


import { Kafka } from 'kafkajs';

class KafkaClient {
private _client: Kafka;

get client() {
if (!this._client) {
throw new Error('Cannot access client before initializing it');
} else {
return this._client;
}
}
connect(clientId: string, brokers: string[]) {
this._client = new Kafka({
clientId,
brokers,
});
}
}

export const producerClient = new KafkaClient();
2.创建抽象发布者类

Created kafka producer abstract class for all types of producers


import { Kafka, Message } from 'kafkajs';
import { Topics } from './topics';

interface Event {
topic: Topics;
data: Message;
}

export abstract class Publisher<T extends Event> {
private client: Kafka;
abstract topic: Topics;
constructor(client: Kafka) {
this.client = client;
}

async publish(data: T['data']): Promise<void> {
const producer = this.client.producer();
await producer.connect();
await producer.send({
topic: this.topic,
messages: [data],
});
}
}
3. 最后一个用户创建的生产者(继承自基类)

All producers will have these classes of their own


import { Publisher } from './publisher';
import { Topics } from './topics';

interface Event {
topic: Topics.USER_CREATED;
data: {
value: string;
};
}

export class UserCreatedPublisher extends Publisher<Event> {
topic: Topics = Topics.USER_CREATED;
}
生产用户创建的事件/消息

Using it in the nodejs route

import { Router } from 'express';
import { producerClient } from './kafka-client';
import { UserCreatedPublisher } from './user-created-publisher';

const router = Router();

router.post('/create-user', async (req, res) => {
const { email, password } = req.body;

// send this to the other service using kafka
await new UserCreatedPublisher(producerClient.client).publish({
value: JSON.stringify({ email, password }),
console.log('Message published');

res.status(201).send();
});

最佳答案

问:每次生成新消息时,我们是否需要以生产者身份连接?
我的回答:没有
在这种情况下我会做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。
例如为 kafka producer 创建一个类

import { Kafka } from 'kafkajs';

export class KafkaProducer {
private static instance: KafkaProducer;
private _producer = null;
private _isConnected = false;

private constructor() {
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENTID,
brokers: [process.env.KAFKA_SERVER],
});
this._producer = kafka.producer();
}

public static getInstance(): KafkaProducer {
if (!KafkaProducer.instance) {
KafkaProducer.instance = new KafkaProducer();
}
return KafkaProducer.instance;
}

public get isConnected() {
return this._isConnected;
}

async connect(): Promise<void> {
try {
await this._producer.connect();
this._isConnected = true;
} catch (err) {
console.error(err);
}
}

get producer() {
return this._producer;
}
}
并使用此生产者在其他代码位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)
     let kafka = KafkaProducer.getInstance();
if (!kafka.isConnected) {
await kafka.connect();
}
await kafka.producer.send({
topic: 'topicName',
messages: [
{
value: JSON.stringify(valueObj),
},
],
});
并在应用程序关闭之前断开连接
try {
let kafka = KafkaProducer.getInstance();
if (kafka.isConnected) {
let producer = kafka.producer;
await producer.disconnect();
}
} catch (err) {
console.error(err);
}

关于javascript - 每次生产 Kafka 消息时都需要连接吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67243831/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com