gpt4 book ai didi

python - 如何从 Azure IoT Edge 模块 Python 发送新消息

转载 作者:行者123 更新时间:2023-12-01 09:15:46 25 4
gpt4 key购买 nike

似乎对我想做的事情没有太多支持,但它应该是可能的,因为它在 temperature sensor and sensor filter tutorial 中得到了演示。 。但是,没有从 python 中的边缘模块创建实际消息的示例。该教程仅显示转发消息。有从设备发送的示例,但设备使用与边缘模块不同的类。从过滤器示例和几个设备示例中,我拼凑了以下内容:

# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.

import random
import time
import sys
import iothub_client
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError

# messageTimeout - the maximum time in milliseconds until a message times out.
# The timeout period starts at IoTHubModuleClient.send_event_async.
# By default, messages do not expire.
MESSAGE_TIMEOUT = 10000

# global counters
RECEIVE_CALLBACKS = 0
SEND_CALLBACKS = 0

# Choose HTTP, AMQP or MQTT as transport protocol. Currently only MQTT is supported.
PROTOCOL = IoTHubTransportProvider.MQTT

# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
global SEND_CALLBACKS
print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
map_properties = message.properties()
key_value_pair = map_properties.get_internals()
print ( " Properties: %s" % key_value_pair )
SEND_CALLBACKS += 1
print ( " Total calls confirmed: %d" % SEND_CALLBACKS )


# receive_message_callback is invoked when an incoming message arrives on the specified
# input queue (in the case of this sample, "input1"). Because this is a filter module,
# we will forward this message onto the "output1" queue.
def receive_message_callback(message, hubManager):
global RECEIVE_CALLBACKS
message_buffer = message.get_bytearray()
size = len(message_buffer)
print ( " Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) )
map_properties = message.properties()
key_value_pair = map_properties.get_internals()
print ( " Properties: %s" % key_value_pair )
RECEIVE_CALLBACKS += 1
print ( " Total calls received: %d" % RECEIVE_CALLBACKS )
hubManager.forward_event_to_output("output1", message, 0)
return IoTHubMessageDispositionResult.ACCEPTED


def construct_message(message_body, topic):
try:
msg_txt_formatted = message_body
message = IoTHubMessage(msg_txt_formatted)

# Add a custom application property to the message.
# An IoT hub can filter on these properties without access to the message body.
prop_map = message.properties()
prop_map.add("topic", topic)

# TODO Use logging
# Send the message.
print( "Sending message: %s" % message.get_string() )

except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return

return message


class HubManager(object):

def __init__(
self,
protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)

# set the time until a message times out
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)

# sets the callback when a message arrives on "input1" queue. Messages sent to
# other inputs or to the default will be silently discarded.
self.client.set_message_callback("input1", receive_message_callback, self)

# Forwards the message received onto the next stage in the process.
def forward_event_to_output(self, outputQueueName, event, send_context):
self.client.send_event_async(
outputQueueName, event, send_confirmation_callback, send_context)

def send_message(self, message):
# No callback
# TODO what is the third arg?
self.client.send_event_async(
"output1", message, send_confirmation_callback, 0)
self.client.send_message()

def mypublish(self, topic, msg):
message = construct_message(msg, topic)
self.send_message(message)
print('publishing %s', msg)

def main(protocol):
try:
print ( "\nPython %s\n" % sys.version )
print ( "IoT Hub Client for Python" )

hub_manager = HubManager(protocol)

print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ")

while True:
hub_manager.mypublish('testtopic', 'hello world this is a module')
time.sleep(1)

except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )

if __name__ == '__main__':
main(PROTOCOL)

当我构建和部署它时,它会在边缘设备上执行,没有错误,并且在日志中,回调报告消息发送正常。但是,当我尝试监视 D2C 消息时,没有消息通过。

最佳答案

我用它来创建和发送来自 JSON 字典的消息。

new_message = json.dumps(json_obj)
new_message = IoTHubMessage(new_message)
hubManager.forward_event_to_output("output1", new_message, 0)

您可以发送您需要的任何内容,甚至是字符串或其他任何内容。

关于python - 如何从 Azure IoT Edge 模块 Python 发送新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51274664/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com