gpt4 book ai didi

python - 将参数传递给paho mqtt回调

转载 作者:行者123 更新时间:2023-12-03 13:08:01 24 4
gpt4 key购买 nike

我是Python的新手,并且每次收到MQTT消息/主题时,我都试图运行一个控件以使RGB LED发光。因此,我创建了一个对象threadObj来运行线程并循环播放脉冲动画。

我面临的问题是我不知道如何将对象实例传递给on_message mqtt回调。我一直在寻找部分函数应用程序,但是现在我不确定是正确的方法。

这是主要脚本:


from P9813 import P9813
from safeGPIO import safeGPIO as GPIO
import json
from math import ceil
import os
import threading
import queue
import sys
import subprocess
import threading
import time
from functools import partial
from ledPulse import ledPulse
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
print("Connected to {0} with result code {1}".format(HOST, rc))
# Subscribe to any hotword topic --- old code: # client.subscribe("hermes/hotword/default/detected")
client.subscribe("hermes/hotword/#")

# Subscribe to any topic starting with 'hermes/intent/'
client.subscribe('hermes/intent/#')

def on_message(client, userdata, msg):

print("- Message received on topic {0}: {1}".format(msg.topic, msg.payload))
print('* ledPulse instance count {0} nameid {1}'.format(ledPulse.instance_count, threadObj))

if msg.topic == 'hermes/hotword/default/detected':
print("Wakeword detected! Wakeword light!")
#Wakeword light

print('ledPulseThread.Status -> ON and status {0}'.format(threadObj.status))
threadObj.Status = "ON" # T.StreamV could be modified to run at creation and wait for this to change to "ON"
threadObj.msgTopic = msg.topic
threadObj.ledDriver = ledDriver
threadObj.ledColor = LEDS_CYAN
#threadObj.daemon = True
threadObj.ledPulseThread.start() # this calls the T.StreamV function in a separate thread

if msg.topic == 'hermes/intent/mywai:ShowMarketplace':
# intent ShowMarketplace light
print("Intent detected! ShowMarketplace light!")
ledDriver[0] = LEDS_GREEN
ledDriver.write()

if msg.topic == 'hermes/hotword/toggleOn':
# light for hotword toggled-on
threadObj.ledColor = LEDS_WARM_WHITE
threadObj.Status = "OFF"
print('status ', threadObj.Status)
print('T.Status -> "OFF"')
print("Intent detected! Hotword ended!")
ledDriver[0] = LEDS_WARM_WHITE
ledDriver.write()
#threadObj.ledPulseThread.terminate() # this calls the T.StreamV function in a separate thread

#########################################################################
# MAIN
#########################################################################
if __name__ == '__main__':
#main()
HOST = 'localhost'
PORT = 1883
gpio = GPIO()
gpio.cleanup()

# Construct the object
ledDriver = P9813(32, 33)
# Create led (R,G,B) list
leds = [[0, 0, 0]]

# Define color constants
# [R,G,B]
LED_STRENGTH = 100
LEDS_OFF = [0, 0, 0]
LEDS_RED = [LED_STRENGTH, 0, 0]
LEDS_GREEN = [0, LED_STRENGTH, 0]
LEDS_BLUE = [0, 0, LED_STRENGTH]
LEDS_YELLOW = [LED_STRENGTH, LED_STRENGTH, 0]
LEDS_MAGENTA = [LED_STRENGTH, 0, LED_STRENGTH]
LEDS_CYAN = [0, LED_STRENGTH, LED_STRENGTH]
LEDS_WHITE = [LED_STRENGTH, LED_STRENGTH, LED_STRENGTH]
LEDS_WARM_WHITE = [210, 155, 35]

# Default light is warm white
#ledDriver[0] = LEDS_WARM_WHITE
#ledDriver.write()

# t1 = threading.Thread(target=loop_test, args=(deca,))
# t1.start()

threadObj = ledPulse(ledDriver) # our thread is actually created here, but is hasn't started yet

client = mqtt.Client()
client.on_connect = on_connect
#client.on_message = on_message
client.on_message = partial(on_message, threadObj) #https://stackoverflow.com/questions/15331726/how-does-the-functools-partial-work-in-python

client.connect(HOST, PORT, 60)
client.loop_forever()

# Ask the user repeatedly for LED brightness setting
try:
while (True):
str = raw_input("Input R, G, B [Enter] or Ctrl-C to quit. R, G, B range from 0 - 255: ")
leds[0] = list(map(int, str.split(",")))
print(leds[0])
ledDriver[0] = leds[0]
ledDriver.write()
except KeyboardInterrupt:
print("\r")
except:
print(str)

# Turn off LEDs before we quit
leds[0] = [0, 0, 0]
ledDriver[0] = leds[0]
ledDriver.write()

ledPulse 类别:


import threading
import time

class ledPulse(object):

instance_count = 0 #count how many istance of the class are created
def __init__(self, ledDriver):
self.status = "OFF"
self.ledPulseThread = threading.Thread(target=self.ledPulseThread) # create a thread for our streamV function
self.ledColor = [30, 40, 50]
self.ledDriver = ledDriver
self.msgTopic = ""
self.counter = 0
ledPulse.instance_count += 1

def ledPulseThread(self):
print("starting thread")
while self.status == "ON":
print("nonstop | status = ", self.Status)
self.ledDriver[0] = [0, 0, 0]
self.ledDriver.write()
time.sleep(0.2)
print("{0} sec | color {2} | driver {3} | instance {4} | deca {1}".format(self.counter, self.msgTopic, self.ledColor, self.ledDriver.write(), ledPulse.instance_count ))
self.ledDriver[0] = self.ledColor
self.ledDriver.write()
time.sleep(0.2)
self.counter = self.counter + 1
print("stopping thread")
self.counter = 0
#raise SystemExit

最佳答案

将对象实例作为值放置在字典中(或任何可变对象,如list,如果要更改回调中的值并查看反射(reflect)在主代码中的更改),则在实例化时将对象实例作为用户数据的一部分传递MQTT客户端。例如:

client_userdata = {'myobject':threadObj}
client = mqtt.Client(userdata=client_userdata)

在on_message回调中,您可以在userdata变量中访问此字典。
userdata['myobject'] <- this is your threadObj

关于python - 将参数传递给paho mqtt回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52796149/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com