gpt4 book ai didi

javascript - 在服务器中运行node-rdkafka代码

转载 作者:行者123 更新时间:2023-11-29 00:06:01 25 4
gpt4 key购买 nike

我正在 Eclipse 中将以下 node-rdkafka 代码作为 Node.js 应用程序运行。这是来自 https://blizzard.github.io/node-rdkafka/current/tutorial-producer_.html 的示例代码

我想在测试服务器中运行它并从 iOS 移动应用程序进行调用。我知道如何在 AWS 中运行 node.js 应用程序。问题一:是否还有其他选项可以在 Tomcat 这样的免费测试服务器环境中运行?问题二:即使我能够在服务器中运行这个node.js应用程序,我如何从移动应用程序调用?我是否需要调用 Producer.on('ready', function(arg) (或)我需要从移动应用程序调用什么函数?

var Kafka = require('node-rdkafka');
//console.log(Kafka.features);
//console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
'metadata.broker.list': 'localhost:9092',
'dr_cb': true
});

var topicName = 'MyTest';

//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
console.log(log);
});

//logging all errors
producer.on('event.error', function(err) {
console.error('Error from producer');
console.error(err);
});

//counter to stop this sample after maxMessages are sent
var counter = 0;
var maxMessages = 10;

producer.on('delivery-report', function(err, report) {
console.log('delivery-report: ' + JSON.stringify(report));
counter++;
});

//Wait for the ready event before producing
producer.on('ready', function(arg) {
console.log('producer ready.' + JSON.stringify(arg));

for (var i = 0; i < maxMessages; i++) {
var value = new Buffer('MyProducerTest - value-' +i);
var key = "key-"+i;
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
}

//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function() {
producer.poll();
if (counter === maxMessages) {
clearInterval(pollLoop);
producer.disconnect();
}
}, 1000);

});

/*
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});*/

//starting the producer
producer.connect();

最佳答案

首先,您需要一个 HTTP 服务器。可以使用 ExpressJS。然后,基本上在最后添加 Express 代码,但将生产者循环移动到请求路由中。

所以,从你拥有的开始

var Kafka = require('node-rdkafka');
//console.log(Kafka.features);
//console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
'metadata.broker.list': 'localhost:9092',
'dr_cb': true
});

var topicName = 'MyTest';

//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
console.log(log);
});

//logging all errors
producer.on('event.error', function(err) {
console.error('Error from producer');
console.error(err);
});

producer.on('delivery-report', function(err, report) {
console.log('delivery-report: ' + JSON.stringify(report));
counter++;
});

//Wait for the ready event before producing
producer.on('ready', function(arg) {
console.log('producer ready.' + JSON.stringify(arg));
});

producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});

//starting the producer
producer.connect();

然后,您可以将其添加到同一文件中。

var express = require('express')
var app = express()

app.get('/', (req, res) => res.send('Ready to send messages!'))

app.post('/:maxMessages', function (req, res) {
if (req.params.maxMessages) {
var maxMessages = parseInt(req.params.maxMessages);
for (var i = 0; i < maxMessages; i++) {
var value = new Buffer('MyProducerTest - value-' +i);
var key = "key-"+i;
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
} // end for
} // end if
}); // end app.post()

app.listen(3000, () => console.log('Example app listening on port 3000!'))

我不认为轮询循环是必要的,因为你不关心 counter了。

现在,将您的移动应用连接到 http://<your server IP>:3000/并将带有 POST 请求的测试消息发送到 http://<your server IP>:3000/10 ,例如,并调整以更改要发送的消息数

关于javascript - 在服务器中运行node-rdkafka代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48055530/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com