gpt4 book ai didi

python - 从python 2到3的UnicodeDecodeError,带有.decode()和套接字

转载 作者:行者123 更新时间:2023-12-03 12:08:49 24 4
gpt4 key购买 nike

您好,我一直在研究Python代码,该代码将通过WiFi刷新我的Arduino,而我几乎可以将它如此接近地工作。

问题是它是为Python 2创建的,我希望它在最新的python 3上运行。我在套接字发送或接收字符串的地方添加了.encode和解码。直到停止读取字符串并想要读取十六进制字符串为止,它的效果都很好。我得到这个错误

Traceback (most recent call last): File "C:\Users\edwin\Desktop\FlashTesting\pythonFlash.py", line 258, in main() File "C:\Users\edwin\Desktop\FlashTesting\pythonFlash.py", line 250, in main if wait_for (cli, "\x00", MAX_TIMEOUT) [0]: File "C:\Users\edwin\Desktop\FlashTesting\pythonFlash.py", line 123, in wait_for received += cli.recv(1).decode() # .decode() is new TypeError: Can't convert 'bytes' object to str implicitly UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe1 in position 0: unexpected end of data.



通常,我会将所有代码缩减为所需的部分,但是由于在整个代码中都使用了wait_for,因此我应该粘贴整个代码,函数之间用注释行分隔,以使其易于阅读。自从圣诞节前夕以来,我一直在为此做任何努力,感谢任何帮助。

该错误始于代码底部的main函数中,当它调用wait_for并想要读取“\x00”,然后它将最终调用program_process函数,我认为它将得到相同的错误,如您所见,我已经添加了。 encode()到那里的十六进制字符串,所以您也可以帮助其中之一,以便我知道如何在发生这种情况时解决该问题。

在此先感谢您,祝您新年快乐:)

main code that i am converting, note his indentation is shocking
import sys 
import binascii
import struct
import select
import socket
import errno

MAX_TIMEOUT = 500
SUCCESS = "success"
FAILED = "failed"
PORT = 50000
#-------------------------------------------------------------------------------------------------
'''
Class containing the data from non-contiguous memory allocations
'''
class Data :
def __init__ (self, begin, data):
self.begin = begin
self.data = data
self.count = len (data)
#-------------------------------------------------------------------------------------------------
'''
Parameters:
line: The line to parse
Returns:
The size of data. The address of data. The type of data. The line checksum. True if the checksum is correct, otherwise False.
Description:
It parses a line from the .hex file.
'''
def parse_line (line):
ok = False
size = int (line [ 1 : 3 ], 16)
address = int (line [ 3 : 7 ], 16)
type = int (line [ 7 : 9 ], 16)
next_index = (9 + size * 2)
data = binascii.a2b_hex (line [9: next_index])
checksum = int (line [ next_index :], 16)

#checking if checksum is correct
sum = size + (address >> 8) + (address & 0xFF) + type
for byte in data:
sum += byte # was ord(byte) removed to fix TypeError
if ( ~ (sum & 0xFF) + 1) & 0xFF == checksum:
ok = True
return (size, address, type, data, checksum, ok)
#-------------------------------------------------------------------------------------------------
'''
Parameters:
chunks: An array with different chunks of data.
path: The path to the .hex file to read
Returns:
True if the reading was successfully, otherwise False.
Description:
It reads a .hex file and stores the data in memory.
'''
def read_hex_file(chunks, path):
try:
file = open (path, 'r')
except IOError:
print ("Hex file not loaded")
return False
line = file.readline()
if line [0] != ':':
print ("The file seems to be a not valid .hex file")
file.close ()
return False
size, address, type, data, checksum, ok = parse_line(line.strip())
if not ok:
print ("The checksum in line 1 is wrong")
file.close()
return False
chunks.append(Data(address, data))
# Read the other lines
index = 0
count = 2
for line in file:
size, address, type, data, checksum, ok = parse_line(line.strip())
if not ok:
print ("The checksum in line", count, "is wrong")
file.close()
return False
if chunks [index].begin + chunks[index].count == address:
chunks [index].count += size
for code in data:
chunks [index].data += bytes(code) # added bytes to fix TypeError
else:
chunks.append(Data(address, data))
index += 1
count += 1
return True
#-------------------------------------------------------------------------------------------------
'''
Parameters:
None
Returns:
The server socket
Description:
It opens a server socket at the specified port and listens to connections.
'''
def init_server():
server = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
server.bind (('', PORT))
server.listen (1)
return server
#-------------------------------------------------------------------------------------------------
'''
Parameters:
cli: The client socket
response: The search string
timeout: The maximum time in milliseconds the function can be running before a time out.
Returns:
True if the string was found, otherwise False. The received string.
Description:
It waits for the expected string.
'''
def wait_for(cli, response, timeout):
inputs = [cli]
received = ""
milliseconds = 0
while milliseconds < timeout:
rlist, wlist, xlist = select.select(inputs,[], [], 0.001)
if len (rlist) > 0:
received += cli.recv(1).decode() # .decode() is new fixed TypeError
if response in received:
return True, received
milliseconds += 1
return False, received
#-------------------------------------------------------------------------------------------------
'''
Parameters:
cli: The client socket
timeout: The maximum time in milliseconds the function can be running before a time out.
length: The number of bytes to receive.
Returns:
True if the string has the required length, otherwise False. The received string.
Description:
It waits for the required length of bytes.
'''
def return_data(cli, timeout, length = 1):
inputs = [cli]
received = ""
milliseconds = 0
while milliseconds < timeout:
rlist, wlist, xlist = select.select(inputs,[], [], 0.001)
if len (rlist) > 0:
received = cli.recv(length).decode() # .decode() is new
return True, received
milliseconds += 1
return False, received
#-------------------------------------------------------------------------------------------------
'''
Parameters:
cli: The client socket
Returns:
True if the string was found, otherwise False
Description:
It waits for the acknowledge string.
'''
def acknowledge(cli):
if wait_for (cli, "\x14\x10", MAX_TIMEOUT) [0]: #STK_INSYNC, STK_OK
print (SUCCESS)
return True
else:
print (FAILED)
return False
#-------------------------------------------------------------------------------------------------
'''
Parameters:
chunks: An array with different chunks of data.
cli: The client socket
Returns:
Nothing
Description:
It starts the STK500 protocol to program the data at their respective memory address.
'''
def program_process(chunks, cli): # I HAVE ADDED .encode() to any strings inside cli.send IDK if they are correct ? the code has not got to this part yet....
print ("Connection to Arduino bootloader:"),
counter = 0
cli.send(("\x30\x20").encode()) #STK_GET_SYNCH, SYNC_CRC_EOP
if not acknowledge(cli):
return
print ("Enter in programming mode:"),
cli.send (("\x50\x20").encode()) #STK_ENTER_PROGMODE, SYNC_CRC_EOP
if not acknowledge(cli):
return
print ("Read device signature:"),
cli.send(("\x75\x20").encode()) #STK_READ_SIGN, SYNC_CRC_EOP
if wait_for (cli, "\x14", MAX_TIMEOUT) [0]: #STK_INSYNC
ok ,received = return_data (cli, MAX_TIMEOUT, 3)
print (binascii.b2a_hex (received))
if not wait_for (cli, "\x10", MAX_TIMEOUT) [0]: #STK_INSYNC
print (FAILED)
return
else:
print (FAILED)
return
for chunk in chunks:
total = chunk.count
if total > 0: #avoid the last block (the last line of .hex file)
current_page = chunk.begin
pages = total / 0x80
index = 0
for page in range (pages):
print ("Load memory address", current_page , ":"),
cli.send (struct.pack ((("<BHB").encode()), 0x55, current_page , 0x20)) #STK_LOAD_ADDRESS, address, SYNC_CRC_EOP
if not acknowledge (cli):
return
print ("Program memory address:"),
cli.send ((("\x64\x00\x80\x46").encode()) + chunk.data [index:index + 0x80] + (("\x20").encode())) #STK_PROGRAM_PAGE, page size, flash memory, data, SYNC_CRC_EOP
if not acknowledge (cli):
return
current_page += 0x40
total -= 0x80
index += 0x80
if total > 0:
print ("Load memory address", current_page , ":"),
cli.send (struct.pack ((("<BHB").encode()), 0x55, current_page, 0x20)) #STK_LOAD_ADDRESS, address, SYNC_CRC_EOP
if not acknowledge (cli):
return
print ("Program memory address:"),
cli.send (struct.pack(((">BHB").encode()), 0x64, total, 0x20) + chunk.data [index:index + total] + (("\x20").encode())) #STK_PROGRAM_PAGE, page size, flash memory, data, SYNC_CRC_EOP
if not acknowledge (cli):
return
print ("Leave programming mode:"),
cli.send (("\x51\x20").encode()) #STK_LEAVE_PROGMODE, SYNC_CRC_EOP
acknowledge (cli)
#-------------------------------------------------------------------------------------------------
def main():
print ("Arduino program via ESP V1.1")
print ("Listen to connections")
ser = init_server()
inputs = [ser]
while True:
rlist, wlist, xlist = select.select(inputs,[],[])
for s in rlist:
if s == ser:
cli, addr = s.accept()
print (addr[0], "connected")
# It assures the connection is for programming an Arduino and not other service.
if wait_for (cli, "hello", 20000) [0]:
cli.send(("welcome").encode()) # encode(0 is new
ok, received = wait_for(cli, "hex", 10000)
if ok:
chunks = []
print ("Read hex file", received.strip()) # prints out Read hex file Blink.ino.hex
if read_hex_file(chunks, received.strip()):
cli.send (("ok").encode()) # encode(0 is new
print ("listening for blank Serial")
# Wait for the byte '0' sent by Arduino after resetting
if wait_for (cli, "\x00", MAX_TIMEOUT) [0]: # CODE CURRENTLY STOPS HERE DUE TO UNICODE DECODE ERROR
program_process (chunks, cli)
else:
cli.send(("error").encode()) # encode(0 is new
cli.close()
print ("Listen to connections")
#-------------------------------------------------------------------------------------------------
if __name__ == "__main__":
main()

最佳答案

正如bjorn所暗示的那样,您不能(总是)按字节解码UTF-8字符串,仅仅是因为一个字符可以由多个字节组成。因此,您可以做的是组装字符串,直到(希望)包含完整的UTF-8序列,然后再对其进行解码。

def wait_for(cli, response, timeout): 
inputs = [cli]
response = response.encode() # Make bytes!
received = b"" # Make bytes!
milliseconds = 0
while milliseconds < timeout:
rlist, wlist, xlist = select.select(inputs, [], [], 0.001)
if len(rlist) > 0:
received += cli.recv(1) # Do not decode yet!
if response in received:
return True, received.decode() # .decode() is new fixed TypeError
milliseconds += 1
return False, received.decode() # .decode() is new fixed TypeError

关于python - 从python 2到3的UnicodeDecodeError,带有.decode()和套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48043848/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com