gpt4 book ai didi

node.js - 如何使用 kafka-node 将 AWS Bitnami Certified Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来

转载 作者:搜寻专家 更新时间:2023-10-31 23:14:47 26 4
gpt4 key购买 nike

我正在尝试连接 Bitnami Certified Kafka AMI使用 Elastic Beanstalk nodejs 环境 kafka-node , 怎么做呢?

在本地安装 apache Kafka 并成功使用 Kafka-node 对其进行测试后,我想使用 AWS kafka 服务器测试我的应用程序。

我配置了我的 AWS Bitnami Certified Kafka AMI听众匹配我的公共(public) DNS (IPv4) 并在入站规则中公开 9092 和 2181 端口,如下所示:

Type            protocol     port    source

Custom TCP Rule TCP 9092 0.0.0.0/0
Custom TCP Rule TCP 2181 0.0.0.0/0

#server.properties    
listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# Hostname and port the broker will advertise to producers and consumers.
# If not set it uses the value for "listeners" if configured. Otherwise, it
# will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# root directory for all kafka znodes.
zookeeper.connect=<Public DNS (IPv4) from AWS>:2181

我正在像这样使用 kafka-node 设置我的生产者:

var Producer = kafka.Producer,
client = new kafka.KafkaClient({ kafkaHost: <kafka-public-ip>:9092}),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})

kafka-node 抛出超时错误 Error: Unable to find available brokers to try

我已经使用 telnet open <kafka-instance-public-ip> 22 测试了默认端口 22|它有效,但端口 9092 不工作。

Bitnami Kafka AMI 问题总结:

1- 如何使用 AWS 配置 Bitnami Kafka AMI 以进行远程访问

最佳答案

所以我的设置如下:这是 2 个可以运行的文件,只需要 express 和 kafka-node@3.0.1

// consumer.js
const kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('<IP of kafka server>:2181');
consumer = new Consumer(client,
[{ topic: '<>'}]
);
console.log('listening')
consumer.on('message', function (message) {
console.log(message);
});

consumer.on('error', function (err) {
console.log('Error:',err);
})

consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})

这是连接到 zookeeper 所以我认为你需要有 kafka-node 版本 3.0.1 所以当你安装它时

npm install --save kafka-node@3.0.1

要直接连接到经纪人,您可能需要自己解决。

// producer.js
const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer } = kafka;
const client = new kafka.Client('<IP of kafka server>:2181');
const producer = new Producer(client);

producer.on('ready', () => {
console.log('Producer is ready');
});

producer.on('error', err => {
console.log('Producer is in error state');
console.log(err);
});

app.post('/kafkaproducer', (req, res) => {
const sentMessage = JSON.stringify(req.body.message);
const payloads = [
{ topic: req.body.topic, messages: sentMessage, partition: 0 },
];
producer.send(payloads, (err, data) => {
if (data) {
res.json(data);
}
if (err) {
res.send(err);
}
});
});

app.get('/',function(req,res){
res.json({greeting:'Kafka Producer'})
});

app.listen(5001,function(){
console.log('Kafka producer running at 5001')
})

您可以使用postman 发送一个post http 请求到http://localhost:5001/kafkaproducer采用以下格式

{
topic: '<TOPIC YOU WANT>',
messages: '<Can be any format you want even a json but i would advise just
testing with a basic string at first>'
}

然后消费者将接收消息,但要确保主题已在 kafka 服务器上创建,并且您的消费者拥有正确的主题。

旁注,如果您使用 EC2 实例,则可以将它们结合起来

const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer, Consumer } = kafka;
const client = new kafka.Client('13.56.240.35:2181');
const producer = new Producer(client);
consumer = new Consumer(client,
[{ topic: 'memes-to-mturk'}]
);

producer.on('ready', () => {
console.log('Producer is ready');
});

producer.on('error', err => {
console.log('Producer is in error state');
console.log(err);
});

consumer.on('message', function (message) {
console.log(message);
});

consumer.on('error', function (err) {
console.log('Error:',err);
})


app.get('/',function(req,res){
res.json({greeting:'Kafka Producer'})
});

app.post('/kafkaproducer', (req, res) => {
const sentMessage = JSON.stringify(req.body.message);
console.log(sentMessage);
const payloads = [
{ topic: req.body.topic, messages: sentMessage, partition: 0 },
];
producer.send(payloads, (err, data) => {
if (data) {
res.json(data);
}
if (err) {
res.send(err);
}
});
});

app.listen(5002,function(){
console.log('Kafka producer running at 5001')
})

关于node.js - 如何使用 kafka-node 将 AWS Bitnami Certified Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56650664/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com