基于Python+PyQt5实现串口数据采集和显示

本节我们将会通过pyqt5实现串口数据采集和实时通信,涉及到的技术栈包括:pythonpyqt5

一、环境搭建

1.1 python 3.x安装

直接从官网下载安装包:index of /ftp/python/;

这里我下载的包为https://www.python.org/ftp/python/3.9.6/python-3.9.6-amd64.exe,安装版本:python 3.9.6

双击开始安装的时候,一定要把下面的 add path勾上 (表示添加到环境变量,这样cmd也能使用了),其他一路next安装完成。默认会安装一键式工具pip

pip工具镜像源配置。配置方法如下:

  • cmd窗口下执行echo %homepath%获取用户home目录,并在该目录下创建pip目录;
  • pip目录下创建pip.ini文件。记住,后缀必须是.ini格式。并在该文件中写入如下内容;

内容如下:

[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = pypi.tuna.tsinghua.edu.cn

1.2 安装pycharm

官方网站:http://www.jetbrains.com/pycharm/,提供以下安装版本:

  • professional:专业版(收费,网上一大堆破解方法)
  • community:社区版(免费,我用的这个),下载版本为pycharm-community-2023.3.4.exe

1.3 pyqt5安装

使用pip工具安装pyqt5工具,执行:

pip install pyqt5

如果慢,用国内源:

pip install pyqt5 -i https://pypi.tuna.tsinghua.edu.cn/simple

使用pip工具安装pyqt5-tools工具,执行:

pip install pyqt5-tools

如果慢,用国内源:

pip install pyqt5-tools -i https://pypi.tuna.tsinghua.edu.cn/simple

工具安装完成后的路径在e:\program files\python\lib\site-packages

pyqt5主要有三个部分:

  • qtcore: 包含了核心的非gui的功能。主要和时间、文件与文件夹、各种数据、模型、流、urlsmime类文件、进程与线程一起使用;
  • qtgui: 包含了窗口系统、事件处理、2d图像、基本绘画、字体和文字类;
  • qtwidgets: 包含了一些创建桌面的ui元素和控件;

1.4 环境配置

pycharm是开发python程序主流常用的ide。为方便调用qt designer实现界面开发和编译相应完成,可以在pycharm配置qt designerpyuicpyrcc

其中qt designerqt 设计师,pyuics是把ui界面转换成py文件,pyrcc是资源系统转换。

打开pycharm, 新建一个项目,项目名称为serial_port

这里python path选择我们之前安装的python 3.9.6的路径:e:\program files\python\python.exe

1.4.1 配置qt designer

菜单file ->settings -> tools -> external tools -> +号,进行添加。 参数配置说明:

  • name:填入qt designer,实际可以任意取值;
  • program:designer.exe程序绝对路径。根据实际安装路径填写,这里我配置的是e:\program files\python\lib\site-packages\qt5_applications\qt\bin\designer.exe;
  • working directory: 填入$filedir$,固定取值;

具体如下:

1.4.2 配置pyuic

该工具是用于将qt designer工具开发完成的.ui文件转化为.py文件。配置打开路径同qt designer,参数配置说明:

  • name:填入pyuic,实际可以任意取值。
  • programpython.exe程序绝对路径,根据实际安装路径填写,这里我配置的是e:\program files\python\python.exe
  • arguments:-m pyqt5.uic.pyuic $filename$ -o $filenamewithoutextension$.py
  • working directory: 填入$filedir$,固定取值;

具体如下:

1.4.3 配置pyqcc

配置打开路径同qt designer。参数配置说明:

  • name:填入pyqcc,实际可以任意取值。
  • program:这里我配置的是e:\program files\python\scripts\pyrcc5.exe
  • arguments:$filename$ -o $filenamewithoutextension$_rc.py
  • working directory: 填入$filedir$,固定取值;

具体如下:

1.5 测试

测试qt designerpyuicpyqcc配置是否成功。打开路径:菜单栏tools ->external tools ->qt designer/pyuic/pyqcc

1.5.1 ui_serial_port.ui

点击qt designer,打开designer程序主主界面,会弹出一个窗口,这里一般是选择main window或者widget,其中main window继承自widget,添加了一些内容,本质二者差不多。这里选择的是main window

将左侧widget boxpush button空间拖到主界面,ctrl + s保存名称ui_serial_port.ui,默认后缀就是.ui

1.5.2 ui_serial_port.py

选中ui_serial_port.ui文件,同理点击pyuic,自动完成ui_serial_port.ui文件的转换,生成文件名为ui_serial_port.py

1.5.3 serial_port.py

ui界面代码,还需要有一个逻辑代码,而逻辑代码个人感觉使用类的形式来组织更加方便,也更优雅。

还记得创建ui时选择的类吗?是widget还是main window,逻辑代码类最好是继承这个这个类,即qwidgetqmainwindow。一般的代码结构如下所示:

from pyqt5.qtwidgets import qmainwindow
 
# 导入设计的ui界面转换成的py文件
from ui_serial_port import ui_mainwindow
 
 
class serialport(qmainwindow):
    """
     串口行为
    """
 
    def __init__(self):
        # qmainwindow构造函数初始化
        super().__init__()
        self.ui = ui_mainwindow()
        # 这个函数本身需要传递一个mainwindow类,而该类本身就继承了这个,所以可以直接传入self
        self.ui.setupui(self)

1.5.4 main.py

在当前项目下,新建main.py 文件;

import sys
from serial_port import serialport
from pyqt5.qtwidgets import qapplication, qmainwindow
 
if __name__ == '__main__':
    # 先建立一个app
    app = qapplication(sys.argv)
    # 初始化一个对象,调用init函数,已加载设计的ui文件
    ui = serialport()
    # 显示这个ui
    ui.show()
    # 运行界面,响应按钮等操作
    sys.exit(app.exec_())

运行程序:

二、程序设计

2.1 需求

客户这里有一款惯性设备,在惯性装置的axs31接口,里面有两路数据,一路称为导航解算,一路称为原始信息。我们通过串口读取该设备的数据并在界面显示处理,同时还需要将读取到的数据保存到文本中。

实际在测试时候发现只能接收到导航解算报文,因此猜测原始信息已经被传感器内部转换为导航解算报文了。

2.1.1 导航解算报文

波特率:230400,数据位8,停止位1,无校验;

字节意义类型所占字节备注
1-2报文头2字节5a 5a
3工作状态1字节0xff等待对准
0x00码头对准
0x01海上对准
0x02牵引对准
0x03 是无阻尼
0x04是惯导阻尼
0x05 点校
0x06 综合校正
0x07 位置组合
4参数状态1字节b8 =0手动 b8=1自动
5-8运行时间4字节单位0.05s
9-11纬度3字节量纲93206.75556
12-14经度3字节量纲46603.37778
15-16升沉2字节最小量纲100m
17-18东速2字节最小量纲100kn
19-20北速2字节最小量纲100kn
21-22垂速2字节最小量纲100m/s
23-25姿态角13字节最小量纲0.25*93206.75556
26-28姿态角23字节最小量纲93206.75556
29-31姿态角33字节最小量纲93206.75556
32-34姿态角速率1(纵摇角速率)3字节93206.75556 度每秒
35-37姿态角速率2 (横摇角速率)3字节93206.75556 度每秒
38-40姿态角速率3(航向角速率)3字节93206.75556度每秒
41故障码1字节b0=1 imu接收错
b1=1 测角采样错
b2=1 接收缓存错
b3=1 测角控制板错
b4=1 驱动错
b5=1 测角错
b6=1 激磁错
b7=1 转台保护错
42-43imutime2字节
44-47备用4字节
48应答标志1字节可忽略
49校验和1字节3-48字节累加和

2.1.2 原始信息报文

波特率:230400,数据位8,停止位1,无校验;

字节意义类型所占字节备注
1-2报文头int2字节5a 5a
3-6imutimeint4字节整型时戳
7-10gyroxfloat4字节直接浮点数
11-14gyroyfloat4字节直接浮点数
15-18gyrozfloat4字节直接浮点数
19-22accexfloat4字节直接浮点数
23-26acceyfloat4字节直接浮点数
27-30accezfloat4字节直接浮点数
31-34备用int4字节
35-37转台角1int3字节量纲2.330168888888889*e4°
38-40转台角2int3字节量纲2.330168888888889*e4°
41-43gps经度int3字节量纲93206.75556
44-46gps纬度int3字节量纲46603.37778
47-48para4int2字节
49-50para[2]/para[5]int2字节
51-52para[3]/para[6]int2字节
53-54para[7]int2字节
55-56para[8]int2字节
57comdatavalidint1字节
58校验和int1字节3-57和校验

2.2 界面设计

首先,我们设计一个简单的用户界面,包括:

  • 串口配置区域:【串口】、【波特率】、【数据位】、【停止位】的设置,以及一个按钮用于开始打开和关闭串口;
  • 接收设置区域:用于设置接收和发送的数据格式,支持16进制以及ascii两种格式;这里为了简单起见,程序中发送/接收采用一样的数据格式;
    • 16进制:例如:5a 5a 02 03 5a
    • ascii格式:例如:ddr v1.12 52218f4949 cym 23/07/0
  • 数据发送区域:由一个文本框和一个发送按钮组成;
  • 数据接收区域:由一个文本域组成;
  • 如果接收到的数据时导航解算或者原始信息报文,则将解析后的数据显示在导航结算和原始信息区域;

界面效果如下(参考网上串口助手工具);

三、程序实现

我们将界面原型划分成了五个区域;

  • 串口设置区域;
  • 接收设置区域;
  • 数据发送区域;
  • 数据接收区域;
  • 导航解算和原始信息区域;

我们针对这五个区域编写相关实现代码,其具体流程如下;

  • 使用qt designer工具按照界面原型设置窗口,主要使用到一些基础控件,比如按钮、文本框、文本域、单选框、复选框等;‘
  • 针对窗体各个区域中的控件进行初始化工作;

我们将界面相关的代码均放置在serial_port.py文件中,该文件主要包含了如下功能;

  • 界面初始化工作;
  • 打开串口;
  • 接收数据;
  • 发送数据。

3.1 初始化工作

3.1.1 初始化串口设置区域

串口设置区域主要由【串口】、【波特率】、【校验位】、【数据位】、【停止位】下拉列表以及【打开串口】按钮组成。

首先需要初始化【串口】、【波特率】、【校验位】、【数据位】、【停止位】下拉列表;

  • 串口:下拉列表加载系统当前可用的串口;
  • 波特率:下拉列表设置常见的波特率,比如1200240096004800,9600,19200,384000,57600,115200,460800,921600,230400,1500000等;
  • 校验位:下拉列表设置校验位为noneoddevenmarkspace
  • 数据位:下拉列表设置为5678
  • 停止位:下拉列表设置为12

设置【打开串口】按钮点击事件对应的槽函数为self.open_serial_connection,当点击【打开串口】按钮时将会执行该函数;

代码位于serial_port.py__init_serial_setting__,具体如下;

def __init_serial_setting__(self):
	"""
	初始化串口设置相关控件默认参数
	设置下拉列表自动补全  https://blog.csdn.net/xuleisdjn/article/details/51434118
	:return:
	"""
	# 加载可用串口
	self.ui.cbx_com.seteditable(false)
	self.ui.cbx_com.setmaxvisibleitems(10)  # 设置最大显示下列项 超过要使用滚动条拖拉
	self.ui.cbx_com.setinsertpolicy(qcombobox.insertaftercurrent)  # 设置插入方式\
	port_list = list(serial.tools.list_ports.comports())  # 获取当前的所有串口,得到一个列表
	for port in port_list:
		self.ui.cbx_com.additem(port.device)
 
	# 初始化波特率列表
	self.ui.cbx_baud_rate.seteditable(false)
	self.ui.cbx_baud_rate.setmaxvisibleitems(10)  # 设置最大显示下列项 超过要使用滚动条拖拉
	self.ui.cbx_baud_rate.setinsertpolicy(qcombobox.insertaftercurrent)  # 设置插入方式
	for baud_rate in [1200, 2400, 4800, 9600, 19200, 384000, 57600, 115200, 460800, 921600, 230400, 1500000]:
		self.ui.cbx_baud_rate.additem(str(baud_rate), baud_rate)
	# 设置默认值
	self.ui.cbx_baud_rate.setcurrentindex(7)
 
	# 初始化校验位列表
	self.ui.cbx_parity_bit.seteditable(false)
	self.ui.cbx_parity_bit.setmaxvisibleitems(10)  # 设置最大显示下列项 超过要使用滚动条拖拉
	self.ui.cbx_parity_bit.setinsertpolicy(qcombobox.insertaftercurrent)  # 设置插入方式
	for (key, value) in {'none': serial.parity_none, 'odd': serial.parity_odd,
						 'even': serial.parity_even, 'mark': serial.parity_mark,
						 'space': serial.parity_space}.items():
		self.ui.cbx_parity_bit.additem(key, value)
	# 设置默认值
	self.ui.cbx_parity_bit.setcurrentindex(0)
 
	# 初始化数据位列表
	self.ui.cbx_data_bit.seteditable(false)
	self.ui.cbx_data_bit.setmaxvisibleitems(10)  # 设置最大显示下列项 超过要使用滚动条拖拉
	self.ui.cbx_data_bit.setinsertpolicy(qcombobox.insertaftercurrent)  # 设置插入方式
	for data_bit in [serial.fivebits, serial.sixbits, serial.sevenbits, serial.eightbits]:
		self.ui.cbx_data_bit.additem(str(data_bit), data_bit)
	# 设置默认值
	self.ui.cbx_data_bit.setcurrentindex(3)
 
	# 初始化停止位列表
	self.ui.cbx_stop_bit.seteditable(false)
	self.ui.cbx_stop_bit.setmaxvisibleitems(10)  # 设置最大显示下列项 超过要使用滚动条拖拉
	self.ui.cbx_stop_bit.setinsertpolicy(qcombobox.insertaftercurrent)  # 设置插入方式
	for data_bit in [serial.stopbits_one, serial.stopbits_two]:
		self.ui.cbx_stop_bit.additem(str(data_bit), data_bit)
	# 设置默认值
	self.ui.cbx_stop_bit.setcurrentindex(0)
 
	# 设置点击打开串口按钮对应的槽函数
	self.ui.btx_start.clicked.connect(self.open_serial_connection)

3.1.2 初始化串口接收区域

串口接收区域主要由【hex】、【ascii】单选框以及【显示时间】复选框组成;

设置发送/接收的数据格式,支持hexasciihexascii是互斥的,默认选中hex,这里我们设置:

  • hex】单选框点击事件对应的槽函数为self.rbn_data_format_hex_clicked,当点击【hex】单选框时将会执行该函数,在该函数内会记录当前选中的是hex
  • ascii】单选框点击事件对应的槽函数为self.rbn_data_format_ascii_clicked,当点击【ascii】单选框时将会执行该函数,在该函数内出记录当前选中的是ascii

【显示时间】复选框用于设置串口接收到数据时,是否在接收区域输出当前时间;

代码位于serial_port.py__init_recv_setting__,具体如下;

def __init_recv_setting__(self):
	"""
	接收设置初始化
	:return:
	"""
	self.ui.rbn_data_format_hex.clicked.connect(self.rbn_data_format_hex_clicked)
	self.ui.rbn_data_format_ascii.clicked.connect(self.rbn_data_format_ascii_clicked)

其中rbn_data_format_hex_clicked函数;

def rbn_data_format_hex_clicked(self):
	"""
	接收数据格式发生变化
	:return:
	"""
	if self.ui.rbn_data_format_hex.ischecked():
		self.ui.rbn_data_format_ascii.setchecked(false)
		if self.serial_thread:
			self.serial_thread.date_format = 'hex'

其中rbn_data_format_ascii_clicked函数;

def rbn_data_format_ascii_clicked(self):
	"""
	接收数据格式发生变化
	:return:
	"""
	if self.ui.rbn_data_format_ascii.ischecked():
		self.ui.rbn_data_format_hex.setchecked(false)
		if self.serial_thread:
			self.serial_thread.date_format = 'ascii'

3.1.3 初始化串口数据接收区域

串口数据接收区域由一个【文本域控件】组成,用于存放串口接收到的数据,数据长度默认最长为5000字符,超过5000字符自动清空;

这里初始化【文本域控件】为只读,并且串口接收到数据时,自动将滚动条移动至【文本域控件】的最低端;

代码位于serial_port.py__init_recv_setting__,具体如下;

def __init_recv_data_viewer__(self):
	"""
	初始化串口数据接收区域
	:return:
	"""
	self.ui.txt_recv_data_viewer.setreadonly(true)
	self.ui.txt_recv_data_viewer.textchanged.connect(
		lambda: self.ui.txt_recv_data_viewer.movecursor(qtextcursor.end))

3.1.4 初始化串口数据发送区域

串口数据接收区域由一个【文本输入框】和一个【发送】按钮组成,文本输入框用于输入要发送的内容,点击发送按钮,会将文本输入框的内容通过当前打开的串口发送出去;

代码位于serial_port.py__init_send_data_viewer__,具体如下;

def __init_send_data_viewer__(self):
	"""
	初始化串口数据发送区域
	:return:
	"""
	self.ui.btn_send.clicked.connect(self.send_serial_data)

设置【发送】按钮点击事件对应的槽函数为self.send_serial_data,当点击【发送】按钮时将会执行该函数进行串口数据的发送。

3.1.5 初始化报文解析区域

报文解析区域主要由两部分组成;

  • 导航结算区域:该部分主要由【报文头】、【工作状态】、【参数状态】、【运行时间】等文本输入框组成,其与导航解算报文字段一一对应;
  • 原始信息区域:该部分主要由【报文头】、【imutime】、【gyrox】、【gyroz】等文本输入框组成,其与原始信息报文字段一一对应;

代码位于serial_port.py__init_package_setting__,具体如下;

def __init_package_setting__(self):
	self.ui.let_nav_header.setreadonly(true)
	self.ui.let_nav_work_state.setreadonly(true)
	self.ui.let_nav_param_state.setreadonly(true)
	self.ui.let_nav_run_time.setreadonly(true)
	self.ui.let_nav_latitude.setreadonly(true)
	self.ui.let_nav_longitude.setreadonly(true)
	self.ui.let_nav_heave.setreadonly(true)
	self.ui.let_nav_east_speed.setreadonly(true)
	self.ui.let_nav_north_speed.setreadonly(true)
	self.ui.let_nav_vertical_speed.setreadonly(true)
	self.ui.let_nav_attitude_angle1.setreadonly(true)
	self.ui.let_nav_attitude_angle2.setreadonly(true)
	self.ui.let_nav_attitude_angle3.setreadonly(true)
	self.ui.let_nav_attitude_velocity1.setreadonly(true)
	self.ui.let_nav_attitude_velocity2.setreadonly(true)
	self.ui.let_nav_attitude_velocity3.setreadonly(true)
	self.ui.let_nav_fault_code.setreadonly(true)
	self.ui.let_nav_imu_time.setreadonly(true)
	self.ui.let_nav_reserved.setreadonly(true)
	self.ui.let_nav_response_flag.setreadonly(true)
	self.ui.let_nav_check_sum.setreadonly(true)
	self.ui.let_raw_header.setreadonly(true)
	self.ui.let_raw__imu_time.setreadonly(true)
	self.ui.let_raw_gyrox.setreadonly(true)
	self.ui.let_raw_gyroy.setreadonly(true)
	self.ui.let_raw_gyroz.setreadonly(true)
	self.ui.let_raw_accex.setreadonly(true)
	self.ui.let_raw_accey.setreadonly(true)
	self.ui.let_raw_accez.setreadonly(true)
	self.ui.let_raw_reserved.setreadonly(true)
	self.ui.let_raw_turntable_angle1.setreadonly(true)
	self.ui.let_raw_turntable_angle2.setreadonly(true)
	self.ui.let_raw_longitude.setreadonly(true)
	self.ui.let_raw_latitude.setreadonly(true)
	self.ui.let_raw_para4.setreadonly(true)
	self.ui.let_raw_para2_5.setreadonly(true)
	self.ui.let_raw_para3_6.setreadonly(true)
	self.ui.let_raw_para7.setreadonly(true)
	self.ui.let_raw_para8.setreadonly(true)
	self.ui.let_raw_comdata_valid.setreadonly(true)
	self.ui.let_raw_check_sum.setreadonly(true)

这里我们仅仅是将上述这些控件设置为只读状态。

3.2 打开串口

根据硬件设备参数在串口设置区域进行配置【串口】、【波特率】、【校验位】、【数据位】、【停止位】,配置完成后,当点击【打开串口】按钮,将会执行open_serial_connection函数;

open_serial_connection函数内主要会进行如下工作;

  • 获取串口设置区域配置的参数,并对参数进行校验,具体是由__validate_setting__函数完成;
  • 调用serialthread创建串口线程,并将串口设置区域配置参数传递给线程;
  • 设置串口线程接收到数据时对应的槽函数为handle_data_received,即串口接收到数据时会执行handle_data_received函数;
  • 设置串口线程发生异常时对应的槽函数为handler_serial_error,即串口线程发生异常时,会将错误信息发送给主线程,主线程接收到错误信息,将会在界面显示;

核心代码如下:

def open_serial_connection(self):
	"""
	打开串口
	:return:
	"""
	if not self.serial_thread or not self.serial_thread.isrunning():
		# 参数校验
		if not self.__validate_setting__():
			return
 
		# 建立一个串口
		self.serial_thread = serialthread(self.ui.cbx_com.currenttext(),
												 self.ui.cbx_baud_rate.currentdata(),
												 self.ui.cbx_data_bit.currentdata(),
												 self.ui.cbx_parity_bit.currentdata(),
												 self.ui.cbx_stop_bit.currentdata(),
													   'hex' if self.ui.rbn_data_format_hex.ischecked() else 'ascii')
 
		self.serial_thread.data_received.connect(self.handle_data_received)
		self.serial_thread.serial_error.connect(self.handler_serial_error)
		self.serial_thread.start()
		self.ui.btx_start.settext("关闭串口")
	else:
		self.serial_thread.stop()
		# 打开串口操作
		self.ui.btx_start.settext("打开串口")

3.2.1 __validate_setting__

__validate_setting__函数主要就是校验【串口】、【波特率】、【校验位】、【数据位】、【停止位】下拉列表是否已经配置,如果未配置将会弹出警告提示信息;

def __validate_setting__(self):
	"""
	校验串口设置参数
	:return:
	"""
	# 参数校验
	if self.ui.cbx_com.currentindex() == -1:
		qmessagebox.warning(self, "warning", "请选择串口!")
		return false
	if self.ui.cbx_baud_rate.currentindex() == -1:
		qmessagebox.warning(self, "warning", "请选择波特率!")
		return false
	if self.ui.cbx_parity_bit.currentindex() == -1:
		qmessagebox.warning(self, "warning", "请选择校验位!")
		return false
	if self.ui.cbx_data_bit.currentindex() == -1:
		qmessagebox.warning(self, "warning", "请选择数据位!")
		return false
	if self.ui.cbx_data_bit.currentindex() == -1:
		qmessagebox.warning(self, "warning", "请选择停止位!")
		return false
 
	return true

3.2.2 serialthread

serialthread类的实现位于serial_thread.py文件中,这块我们后面单独接收。

3.2.3 handle_data_received

当串口接收到数据时会执行handle_data_received函数,这块我们后面单独接收。

3.2.4 handler_serial_error

串口线程发生异常时,会将错误信息发送给主线程,主线程接收到错误信息,将会在界面显示;具体实现函数为handler_serial_error

def handler_serial_error(self, error):
	"""
	串口接收线程异常
	:param error:
	:return:
	"""
	qmessagebox.critical(self, '错误', error)

3.3 接收数据

上面我们提到,当串口线程接收到数据时,会将数据通过信号与槽机制传递给主线程,将会由主线程handle_data_received函数进行处理;

handle_data_received函数内主要会进行如下工作;

  • 获取当前时间;
  • 串口数据接收区域【文本域控件】内容超过5000个字符将会清空;
  • 如果配置了显示当前时间,将会在接收到的数据前面 插入当前时间,并将数据追加到串口数据接收区域【文本域控件】中;
  • 如果串口接收区域配置的数据格式为hex,将会对接收到的报文信息进行解析;
    • 解析由packageparse类完成,解析类会根据报文数据长度、报文头、以及校验和等参数判定当前报文格式为导航结算还是原始信息;
    • 如果为导航结算报文,保存报文到nav_data.csv,同时将报文解析到的字段一一填入界面导航结算区域对应的控件中;
    • 如果为原始信息报文,保存报文到raw_data.csv,同时将报文解析到的字段一一填入界面原始信息区域对应的控件中;
  • 移除已经解析的报文,截取未解析的报文,重复报文解析的流程,直至报文内容不满足导航结算、原始信息报文格式;

handle_data_received代码如下:

def handle_data_received(self, data):
	"""
	接收数据
	:param data:接收到的数据
	:return:
	"""
	# 获取时间
	current_time = qdatetime.currentdatetime().tostring("yyyy-mm-dd hh:mm:ss")
 
	# 超过5000字符清空
	if len(self.ui.txt_recv_data_viewer.toplaintext()) > 5000:
		self.ui.txt_recv_data_viewer.clear()
	# 更新显示区域中的数据
	if self.ui.cbx_show_time.ischecked():
		self.ui.txt_recv_data_viewer.insertplaintext(f"[{current_time}] {data}")
	else:
		self.ui.txt_recv_data_viewer.insertplaintext(f"[{data}")
 
	# 必须是hex格式
	if self.ui.rbn_data_format_ascii.ischecked():
		return
 
	# 如果报文最后两位是换行符,则移除
	data = data.replace('0d 0a', '')
	data = bytes.fromhex(data.replace(' ', ''))
	# 解析data
	while data is not none:
		package_parse = packageparse(data)
		data_format = package_parse.get_data_format()
		if data_format == packageparse.type.nal_sol:
			# 解析数据
			dict_msg = package_parse.get_nav_sol()
			# 写入当前时间,并保存到文件
			dict_msg["current_time"] = current_time
			save_csv('nav_data.csv', dict_msg)
			# 界面显示
			self.ui.let_nav_header.settext(dict_msg["header"])
			self.ui.let_nav_work_state.settext(dict_msg["work_state"])
			self.ui.let_nav_param_state.settext(dict_msg["param_state"])
			self.ui.let_nav_run_time.settext(str(dict_msg["run_time"]))
			self.ui.let_nav_latitude.settext(str(dict_msg["latitude"]))
			self.ui.let_nav_longitude.settext(str(dict_msg["longitude"]))
			self.ui.let_nav_heave.settext(str(dict_msg["heave"]))
			self.ui.let_nav_east_speed.settext(str(dict_msg["east_speed"]))
			self.ui.let_nav_north_speed.settext(str(dict_msg["north_speed"]))
			self.ui.let_nav_vertical_speed.settext(str(dict_msg["vertical_speed"]))
			self.ui.let_nav_attitude_angle1.settext(str(dict_msg["attitude_angle1"]))
			self.ui.let_nav_attitude_angle2.settext(str(dict_msg["attitude_angle2"]))
			self.ui.let_nav_attitude_angle3.settext(str(dict_msg["attitude_angle3"]))
			self.ui.let_nav_attitude_velocity1.settext(str(dict_msg["attitude_velocity1"]))
			self.ui.let_nav_attitude_velocity2.settext(str(dict_msg["attitude_velocity2"]))
			self.ui.let_nav_attitude_velocity3.settext(str(dict_msg["attitude_velocity3"]))
			self.ui.let_nav_fault_code.settext(dict_msg["fault_code"])
			self.ui.let_nav_imu_time.settext(dict_msg["imu_time"])
			self.ui.let_nav_reserved.settext(dict_msg["reserved"])
			self.ui.let_nav_response_flag.settext(dict_msg["response_flag"])
			self.ui.let_nav_check_sum.settext(dict_msg["check_sum"])
		if data_format == packageparse.type.raw:
			# 解析数据
			data = package_parse.get_raw()
			# 写入当前时间,并保存到文件
			data["current_time"] = current_time
			save_csv('raw_data.csv', data)
			# 界面显示
			self.ui.let_raw_header.settext(dict_msg["header"])
			self.ui.let_raw__imu_time.settext(dict_msg["imu_time"])
			self.ui.let_raw_gyrox.settext(dict_msg["gyrox"])
			self.ui.let_raw_gyroy.settext(dict_msg["gyroy"])
			self.ui.let_raw_gyroz.settext(dict_msg["gyroz"])
			self.ui.let_raw_accex.settext(dict_msg["accex"])
			self.ui.let_raw_accey.settext(dict_msg["accey"])
			self.ui.let_raw_accez.settext(dict_msg["accez"])
			self.ui.let_raw_reserved.settext(dict_msg["reserved"])
			self.ui.let_raw_turntable_angle1.settext(dict_msg["turntable_angle1"])
			self.ui.let_raw_turntable_angle2.settext(dict_msg["turntable_angle2"])
			self.ui.let_raw_longitude.settext(dict_msg["longitude"])
			self.ui.let_raw_latitude.settext(dict_msg["latitude"])
			self.ui.let_raw_para4.settext(dict_msg["para4"])
			self.ui.let_raw_para2_5.settext(dict_msg["para2_5"])
			self.ui.let_raw_para3_6.settext(dict_msg["para3_6"])
			self.ui.let_raw_para7.settext(dict_msg["para7"])
			self.ui.let_raw_para8.settext(dict_msg["para8"])
			self.ui.let_raw_comdata_valid.settext(dict_msg["comdata_valid"])
			self.ui.let_raw_check_sum.settext(dict_msg["check_sum"])
		# 一次收到若干个数据包
		if data_format != packageparse.type.unknown and len(data) > data_format.value:
			data = data[data_format.value:]
		else:
			data = none

文件保存函数save_csv

def save_csv(file_name, dict: {}):
    """
    保存到csv文件
    :param file_name:文件名
    :param dict: 数据,字典格式
    :return:
    """
    file_exists = os.path.exists(file_name)
    # save data to csv file
    with open(file_name, 'a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        # 如果文件不存在,则写入列名
        if not file_exists:
            writer.writerow(dict.keys())
        # 写入数据
        writer.writerow(dict.values())

3.4 报文解析

报文解析是由packageparse类实现的,代码位于package_parse.py,在该类内部主要实现了一下功能;

判断当前报文格式是导航结算还是原始信息,由get_data_format函数实现;实现导航结算报文的解析,由get_nav_sol函数实现;实现原始信息报文的解析,由get_raw函数实现;

3.4.1 get_data_format

这里判断报文格式的方法很简单;

  • 首先就是长度判定,如果是导航结算报文,长度至少为49个字符;如果是原始信息报文,长度至少是58个字符;
  • 接着判断报文前两个字节是否为0x5a 0x5a
  • 然后判定第58个字节是否为3~57个字节的累加和,如果满足条件就是原始信息报文;
  • 最后判定第49个字节是否为3~48个字节的累加和,如果满足条件就是导航结算报文;

具体代码如下:

def validate_nal_sol(self):
	"""
	校验和
		导航结算:第49个字节为3-48字节累加和
	"""
	# 取出第3到第48字节
	selected_bytes = self.__byte_array[2:48]
 
	# 计算累加和并只保留低两位数字
	checksum = sum(selected_bytes) & 0xff
 
	return checksum == self.__byte_array[48]
 
def validate_raw(self):
	"""
	校验和
		原始信息:第58个字节为3-57字节累加和
	"""
	# 取出第3到第48字节
	selected_bytes = self.__byte_array[2:57]
 
	# 计算累加和并只保留低两位数字
	checksum = sum(selected_bytes) & 0xff
 
	return checksum == self.__byte_array[57]
 
def get_data_format(self):
	"""
	返回数据类型:在惯性装置的axs31接口,里面有两路数据,一路称为导航解算,一路称为原始数
	:return: 0: 导航解算   1:原始信息  2:非法数据
	"""
	# 原始信息报文
	if (len(self.__byte_array) >= 58 and self.__byte_array[0] == 0x5a and self.__byte_array[1] == 0x5a
			and self.validate_raw()):
		return packageparse.type.raw
	# 导航解算的报文内容
	if (len(self.__byte_array) >= 49 and self.__byte_array[0] == 0x5a and self.__byte_array[1] == 0x5a
			and self.validate_nal_sol()):
		return packageparse.type.nal_sol
	return packageparse.type.unknown

3.4.2 get_nav_sol

如果报文格式是导航结算,那么调用该方法实现导航结算报文的解析,在函数内部根据字段单位、量纲对原始字节进行了转换;

def translate(hex_array: [], unit=none):
    """
    将字节数组转换为16进制字符串  [0x0a,0x04]-> 0x0a 0x04
    :param hex_array: 字节数组
    :param unit 量纲
    :return:
    """
    if unit is not none:
        # 将字节数组按照小端格式转换为数字
        num = int.from_bytes(hex_array, byteorder='little')
        return num / unit
    return ' '.join(f'0x{val:02x}' for val in hex_array)
    
def get_nav_sol(self):
	"""
	获取导航解算数据
	:return:
	"""
	if self.get_data_format() != packageparse.type.nal_sol:
		print("数据格式错误,非导航解算报文!")
		return none
 
	message_dict = {}
 
	# 解析报文头
	message_dict['header'] = translate(self.__byte_array[0:2])
 
	# 解析工作状态
	message_dict['work_state'] = translate(self.__byte_array[2:3])
 
	# 解析参数状态
	message_dict['param_state'] = translate(self.__byte_array[3:4])
 
	# 解析运行时间
	message_dict['run_time'] = translate(self.__byte_array[4:8], 20)
 
	# 解析纬度
	message_dict['latitude'] = translate(self.__byte_array[8:11], 93206.75556)
 
	# 解析经度
	message_dict['longitude'] = translate(self.__byte_array[11:14], 46603.37778)
 
	# 解析升沉
	message_dict['heave'] = translate(self.__byte_array[14:16], 100)
 
	# 解析东速
	message_dict['east_speed'] = translate(self.__byte_array[16:18], 100)
 
	# 解析北速
	message_dict['north_speed'] = translate(self.__byte_array[18:20], 100)
 
	# 解析垂速
	message_dict['vertical_speed'] = translate(self.__byte_array[20:22], 100)
 
	# 解析姿态角1
	message_dict['attitude_angle1'] = translate(self.__byte_array[22:25], 0.25 * 93206.75556)
 
	# 解析姿态角2
	message_dict['attitude_angle2'] = translate(self.__byte_array[25:28], 93206.75556)
 
	# 解析姿态角3
	message_dict['attitude_angle3'] = translate(self.__byte_array[28:31], 93206.75556)
 
	# 姿态角速率1
	message_dict['attitude_velocity1'] = translate(self.__byte_array[31:34], 93206.75556)
 
	# 姿态角速率2
	message_dict['attitude_velocity2'] = translate(self.__byte_array[34:37], 93206.75556)
 
	# 姿态角速率3
	message_dict['attitude_velocity3'] = translate(self.__byte_array[37:40], 93206.75556)
 
	# 故障码
	message_dict['fault_code'] = translate(self.__byte_array[40:41])
 
	# imu时间
	message_dict['imu_time'] = translate(self.__byte_array[41:43])
 
	# 备用
	message_dict['reserved'] = translate(self.__byte_array[43:47])
 
	# 应答标志
	message_dict['response_flag'] = translate(self.__byte_array[47:48])
 
	# 校验和
	message_dict['check_sum'] = translate(self.__byte_array[48:49])
 
	return message_dict

3.4.3 get_raw

如果报文格式是原始信息,那么调用该方法实现原始信息报文的解析;

def get_raw(self):
	"""
	获取原始信息报文
	:return:
	"""
	if self.get_data_format() != packageparse.type.raw:
		print("数据格式错误,非原始信息报文!")
		return none
 
	message_dict = {}
 
	# 解析报文头
	message_dict['header'] = translate(self.__byte_array[0:2])
 
	# imutime
	message_dict['imu_time'] = translate(self.__byte_array[2:6])
 
	# gyrox
	message_dict['gyrox'] = translate(self.__byte_array[6:10])
 
	# gyroy
	message_dict['gyroy'] = translate(self.__byte_array[10:14])
 
	# gyroz
	message_dict['gyroz'] = translate(self.__byte_array[14:18])
 
	# accex
	message_dict['accex'] = translate(self.__byte_array[18:22])
 
	# accey
	message_dict['accey'] = translate(self.__byte_array[22:26])
 
	# accez
	message_dict['accez'] = translate(self.__byte_array[26:30])
 
	# 备用
	message_dict['reserved'] = translate(self.__byte_array[30:34])
 
	# 转台角1
	message_dict['turntable_angle1'] = translate(self.__byte_array[34:37])
 
	# 转台角2
	message_dict['turntable_angle2'] = translate(self.__byte_array[37:40])
 
	# gps经度
	message_dict['longitude'] = translate(self.__byte_array[40:43])
 
	# gps纬度
	message_dict['latitude'] = translate(self.__byte_array[43:46])
 
	# para[4](电磁/牵引时为航向)
	message_dict['para4'] = translate(self.__byte_array[46:48])
 
	# para[2]/para[5]
	message_dict['para2_5'] = translate(self.__byte_array[48:50])
 
	# para[3]/para[6]
	message_dict['para3_6'] = translate(self.__byte_array[50:52])
 
	# para[7]
	message_dict['para7'] = translate(self.__byte_array[52:54])
 
	# para[8]
	message_dict['para8'] = translate(self.__byte_array[54:56])
 
	# comdatavalid
	message_dict['comdata_valid'] = translate(self.__byte_array[56:57])
 
	# 校验和
	message_dict['check_sum'] = translate(self.__byte_array[57:58])
 
	return message_dict

3.5 发送数据

当用户在串口数据接收区域的【文本输入框】录入内容,点击【发送】按钮时,会调用send_serial_data方法:

def send_serial_data(self, data: str):
	"""
	发送数据
	:return:
	"""
	if not self.serial_thread or not self.serial_thread.isrunning():
		qmessagebox.warning(self, "warning", "请先打开串口!")
		return
 
	data = self.ui.let_send_data_viewer.text()
	if data != "":
		self.serial_thread.send_data(data)
		self.ui.let_send_data_viewer.clear()

函数内部首先校验串口是否已经打开,如果串口已经打开并且【文本输入框】输入了内容,将调用串口线程的send_data方法来进行数据发送。

3.6 串口线程

pyqt已经为我们提供了串口控件,控件名称为qtserialport,使用方法比较简单,主要是两个模块:qserialport, qserialportinfo,但是这个控件提供的能力有限。

这里我们使用另一个串口包pyserial来实现:

pip install pyserial -i https://pypi.tuna.tsinghua.edu.cn/simple

为了实现串口数据的读取和发送,我们创建了一个继承自qthreadserialthread类;

from pyqt5.qtcore import qthread,pyqtsignal
import serial
 
class serialthread(qthread):
    """
    创建一个继承自qthread的serialthread类,实现串口数据的读取/发送
    """
 
    # 用于发送串口数据接收信号
    data_received = pyqtsignal(str)
    # 串口打开/接收异常
    serial_error = pyqtsignal(str)
    
    .......

3.6.1 初始化

在首次点击【打开串口】按钮时会创建serialthread实例,即调用串口线程初始化方法;

def __init__(self, port, baud_rate, data_bits, parity_bits, stop_bits, data_format):
	"""
	初始化
	:param port: 串口号
	:param baud_rate: 波特率
	:param data_bits: 数据位
	:param stop_bits: 停止位
	:param parity_bits: 奇偶校验位
	"""
	super().__init__()
	self.port = port
	self.baud_rate = baud_rate
	self.data_bits = data_bits
	self.stop_bits = stop_bits
	self.parity_bits = parity_bits
 
	# 串口已经运行标志位
	self.running = false
	# 串口
	self.serial = none
	# 数据格式
	self.__date_format = data_format
	# 发送开启追加换行
	self.__auto_line = true
    
@property
def date_format(self):
	"""
	#把一个getter方法变成属性
	:return:
	"""
	return self.__date_format
 
@date_format.setter
def date_format(self, value):
	"""
	# 负责把一个setter方法变成属性赋值
	:param value:
	:return:
	"""
	if not isinstance(value, str):
		raise valueerror('date_format must be an str')
	if value not in ['hex', 'ascii']:
		raise valueerror('date_format must in [hex,ascii]')
	self.__date_format = value

在这段代码中,主要就是保存构造函数传递过来的参数,比如串口号、波特率等。

3.6.2 线程运行

串口线程初始化完成后,调用start方法,将会执行run函数;

def run(self):
	"""
	打开串口
	:return:
	"""
	# 串口已经运行
	if self.running:
		return
 
	try:
		# 建立一个串口
		with serial.serial(port=self.port,
						   baudrate=self.baud_rate,
						   parity=self.parity_bits,
						   bytesize=self.data_bits,
						   stopbits=self.stop_bits,
						   timeout=2) as self.serial:
			self.running = true
			while self.running:
				data = self.__read_data__()
				if data:
					self.data_received.emit(data)
	except exception as e:
		print("打开串口时失败:", e)
		self.serial_error.emit("打开串口时失败!")

在函数内部我们实际上就是调用了pyserial提供的serial类去实现串口的打开,并在while循环中,调用__read_data__获取串口发送过来的数据,如果接受到数据将串口数据通过信号与槽机制推送给主线程。

3.6.3 接受数据

串口数据的读取是由__read_data__函数完成的,在函数内部我们调用self.serial.readline()依次读取一行的数据,这里读取到的是字节数组,我们根据我们串口接收区域设置的数据格式来进行解析;

def __read_data__(self):
	"""
	按行接收数据
	:return:
	"""
	if self.serial is none or not self.serial.isopen():
		return
 
	try:
		# 读取串口数据 例如:b'ddr v1.12 52218f4949 cym 23/07/0'
		byte_array = self.serial.readline()
		if len(byte_array) == 0:
			return none
		# ascii显示
		if self.__date_format == 'ascii':
			# 串口接收到的字符串为b'abc',要转化成unicode字符串才能输出到窗口中去
			data_str = byte_array.decode('utf-8')
		else:
			# 串口接收到的字符串为b'zz\x02\x03z',要转换成16进制字符串显示
			data_str = ' '.join(format(x, '02x') for x in byte_array)
			if self.__auto_line:
				data_str += '\r\n'
		return data_str
	except exception as e:
		print("接收数据异常:", e)
		self.serial_error.emit("接收数据异常!")

3.6.4 发送数据

当用户在串口数据接收区域的【文本输入框】录入内容,点击【发送】按钮时,最终调用的就是串口线程的send_data方法;我们根据我们串口接收区域设置的数据格式处理将要发送的数据,然后调用self.serial.write实现串口数据的发送;

def send_data(self, data: str):
	"""
	发送数据
	:return:
	"""
	if not self.running:
		self.serial_error.emit("请先打开串口!")
		return
 
	# hex发送 比如:5a 5a 02 03 5a -> b'zz\x02\x03z'
	if self.__date_format == 'hex':
		data_str = data.strip()
		send_list = []
		while data_str != '':
			try:
				num = int(data_str[0:2], 16)
			except valueerror:
				self.serial_error.emit('请输入十六进制数据,以空格分开!')
				return
			data_str = data_str[2:].strip()
			send_list.append(num)
		if self.__auto_line:
			send_list.append(0x0d)
			send_list.append(0x0a)
		byte_array = bytes(send_list)
	else:
		if self.__auto_line:
			data += '\r\n'
		# ascii发送 比如:'abc' -> b'abc'
		byte_array = data.encode('utf-8')
 
	try:
		self.serial.write(byte_array)
	except exception as e:
		print("发送失败", e)
		self.serial_error.emit('发送失败!')

以上就是基于python+pyqt5实现串口数据采集和显示的详细内容,更多关于python pyqt5数据采集和显示的资料请关注代码网其它相关文章!

发布于 2025-05-07 22:10:16
分享
海报
112
上一篇:pandas DataFrame keys的使用小结 下一篇:使用Python和PaddleOCR实现图文识别的代码和步骤
目录

    推荐阅读

    忘记密码?

    图形验证码