gpt4 book ai didi

node.js - 未收到从 azure-event-hubs onMessage 函数发送的 azure-iothub 消息

转载 作者:太空宇宙 更新时间:2023-11-04 01:48:22 26 4
gpt4 key购买 nike

我有一个 Web API,可以接收消息并向 Raspberry Pi 发送消息。连接工作正常,使用 Web API 上的 azure-event-hubs 接收消息,使用 azure-iothub 将消息发送到树莓派。

我遇到的问题是当我尝试在 onMessage 函数中发送消息时(因此每当我在 webapi 中收到消息时),设备不会收到它。这是我的代码:

WebApi:

const { EventHubClient, EventPosition } = require('azure-event-hubs');
var connectionString = 'myConnectionString'
var sendingClient = require('../azure/sendingClient')

async function main() {
sendingClient.sendMessage('raspberry',{},"allDevices") //The raspberry receives this
const client = await
EventHubClient.createFromIotHubConnectionString(connectionString);

const onError = (err) => {
console.log("An error occurred on the receiver ", err);
};

const onMessage = (msg) => {
console.log(msg.body);
sendingClient.sendMessage('raspberry',{},"allDevices")// the raspberry doesn't receive this
};

const receiveHandler = client.receive("1", onMessage, onError, {
eventPosition: EventPosition.fromEnqueuedTime(Date.now())
});

// To stop receiving events later on...
await receiveHandler.stop();
await client.close();
}

main().catch((err) => {
console.log(err);
});

发送客户端:

var Client = require('azure-iothub').Client;
var Message = require('azure-iot-common').Message;

var connectionString = 'myConnectionString'

var sendingClient = Client.fromConnectionString(connectionString);

exports.sendMessage = (targetDevice, content, messageId) => {
sendingClient.open(function (err) {
if (err) {
console.error('Could not connect: ' + err.message);
} else {
console.log('Service client connected');
var message = new Message(content);
message.ack = 'full';
message.messageId = 'message'
message.properties.add('message',messageId)
console.log('Sending message: ' + message.getData());
console.log('Sending message to : ' + targetDevice);
sendingClient.send(targetDevice, message,);
}
});
}

树莓派上的接收器:

var iothub = require('azure-iothub');
var Protocol = require('azure-iot-device-mqtt').Mqtt;
var Client = require('azure-iot-device').Client;
var Message = require('azure-iot-device').Message;
var client = Client.fromConnectionString(connectionString, Protocol)
client.open((err) => {
if (err) console.error('Could not connect: ' + err.message)
else {
client.on('message', (msg) => {
switch (msg.properties.propertyList[1].value) {
case 'allDevices':
devices = JSON.parse(msg.data.toString())
response(devices) //passing the message content
}
});

client.on('error', (err) => {
console.error(err.message);
});

client.on('disconnect', () => {
clearInterval(sendInterval);
client.removeAllListeners();
client.open(connectCallback);
});
}
})

RaspberryPi 上的发送者:

exports.sendMessage = (data, connectionString, key) => {
var client = Client.fromConnectionString(connectionString, Protocol)
client.open((err) => {
if (err) console.error('Send message error: ' + err.message)
else {
data = JSON.stringify(data);
var message = new Message(data);
message.properties.add('message', key);
client.sendEvent(message);
console.log('Message sent ' + key);
}
})
}

最佳答案

我认为这是由于事件中心客户端无法接收来自分区“1”的消息。无法将消息发送到 IoT 中心中的特定分区,分区在内部使用以允许扩展 IoT(事件中心)并允许扩展消费者应用程序。您可以尝试使用以下代码来接收来自所有分区的消息。

  const partitionIds = await client.getPartitionIds();
partitionIds.forEach(function(id,index){
const receiveHandler = client.receive(id, onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
});

更新:

WebApi.js

const { EventHubClient, EventPosition } = require('azure-event-hubs');
var connectionString = '{iot-hub-connectionstring}';
var sendingClient = require('./sendingClient');

async function main() {
sendingClient.sendMessage('raspberry',{},"raspberry-first") //The raspberry receives this
const client = await EventHubClient.createFromIotHubConnectionString(connectionString);

const onError = (err) => {
console.log("An error occurred on the receiver ", err);
};

const onMessage = (msg) => {
console.log("receive response");
console.log(msg.body);
sendingClient.sendMessage('raspberry',{},"raspberry-second")// the raspberry doesn't receive this
};

const partitionIds = await client.getPartitionIds();
partitionIds.forEach(function(id,index){
const receiveHandler = client.receive(id, onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
});
}

main().catch((err) => {
console.log(err);
});

sendingClient.js

var Client = require('azure-iothub').Client;
var Message = require('azure-iot-common').Message;

var connectionString = '{iot-hub-connectionstring}';

var sendingClient = Client.fromConnectionString(connectionString);

exports.sendMessage = (targetDevice, content, messageId) => {
sendingClient.open(function (err) {
if (err) {
console.error('Could not connect: ' + err.message);
} else {
console.log('Service client connected');
var message = new Message(content);
message.ack = 'full';
message.messageId = 'message'
message.properties.add('message',messageId)
console.log('Sending message: ' + message.getData());
console.log('Sending message to : ' + targetDevice);
sendingClient.send(targetDevice, message,);
}
});
}

Receiver.js var Protocol = require('azure-iot-device-mqtt').Mqtt; var Client = require('azure-iot-device').Client; var Message = require('azure-iot-device').Message;

var connectionString = "{iot-hub-device-connectionstring}";

// fromConnectionString must specify a transport constructor, coming from any transport package.
var client = Client.fromConnectionString(connectionString, Protocol);

var connectCallback = function (err) {
if (err) {
console.error('Could not connect: ' + err.message);
} else {

console.log('Client connected');
client.on('message', function (msg) {

// When using MQTT the following line is a no-op.
client.complete(msg, printResultFor('completed'));
console.log(msg.properties.propertyList[1].value);
});

client.on('error', function (err) {
console.error(err.message);
});

client.on('disconnect', function () {
client.removeAllListeners();
client.open(connectCallback);
});
}
};

client.open(connectCallback);

// Helper function to print results in the console
function printResultFor(op) {
return function printResult(err, res) {
if (err) console.log(op + ' error: ' + err.toString());
if (res) console.log(op + ' status: ' + res.constructor.name);
};
}

测试结果 enter image description here

顺便说一句,我已将所有 azure-* 库更新到最新版本。

关于node.js - 未收到从 azure-event-hubs onMessage 函数发送的 azure-iothub 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50624195/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com