Energy Monitor с использованием Minimalmodbus и устройства PZEM 016
Я пытаюсь обнулить счетчик энергии для монитора энергии Peacefair AC PZEM 016. У меня есть небольшой симпатичный скрипт, который измеряет значения регистров, однако я не могу очистить энергию через библиотекуминимальногомодбуса и продолжаю получать множество ошибок при попытке кода, но основная из них, которую я получаю, это «Подчиненное устройство сообщило о незаконном значении данных. " Ссылка на документацию находится здесь https://images-na.ssl-images-amazon.com/images/I/81GtkIOyZaL.pdf . «Сброс энергии» находится в разделе 2.5, но клянусь, это не должно быть так сложно.
import minimalmodbus
import datetime
import time
import serial
import sys
import os
import threading
from crccheck.crc import Crc16
mb_address = 1
clear_energy_command = 0x42
powark_sensor = minimalmodbus.Instrument('/dev/ttyUSB0',mb_address)
powark_sensor.serial.baudrate = 9600
powark_sensor.serial.bytesize = 8
powark_sensor.serial.parity = minimalmodbus.serial.PARITY_NONE
powark_sensor.serial.stopbits = 1
powark_sensor.serial.timeout = 0.5
powark_sensor.mode = minimalmodbus.MODE_RTU
file_path = "/home/josiahferguson/Documents/powark/mb_powark_data.txt"
powark_sensor.clear_buffers_before_each_transaction = True
powark_sensor.close_port_after_each_call = True
print("")
print("Requesting Data From Powark...")
print("")
#single_data = powark.sensor.read_register(1, 1, 3, False)
#print(f"Single register data = {single_data}")
#function to read powark data
def read_sensor_data():
data = powark_sensor.read_registers(0, 10, 4)
volts = data[0] / 10
ampslow = int(data[1]) / 1000
ampshigh = int(data[2]) / 1000
wattslow = int(data[3]) / 10
wattshigh = int(data[4]) / 10
watthourslow = int(data[5])
watthourshigh = int(data[6])
hertz = int(data[7]) / 10
powerfactor = int(data[8]) / 100
alarm = data[9]
processed_data = {
"volts": volts,
"ampslow": ampslow,
"ampshigh": ampshigh,
"wattslow": wattslow,
"wattshigh": wattshigh,
"watthourslow": watthourslow,
"watthourshigh": watthourshigh,
"hertz": hertz,
"powerfactor": powerfactor,
"alarm": alarm,
}
return processed_data
powark_sensor_data = read_sensor_data()
#function to write sensor data to file
def write_data_to_file(powark_sensor_data, file_path):
with open (file_path, "a") as file:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
file.write(f"{timestamp}: {powark_sensor_data}\n")
def low_byte(value):
return value & 0xFF
def high_byte(value):
return (value >> 8) & 0xFF
powark_sensor.write_long(mb_address, 0x42, False, 2)
# ~ powark_sensor.serial.open()
# ~ time.sleep(1)
# ~ # Calculate the CRC-16 using the CRC library
# ~ crc = Crc16.calc(bytearray([mb_address, clear_energy_command]))
# ~ # Send the slave address, reset command, and CRC-16
# ~ powark_sensor.serial.write(bytearray([mb_address, clear_energy_command, low_byte(crc), high_byte(crc)]))
# ~ # Wait for the energy monitor to process the command
# ~ time.sleep(0.26)
# ~ response = powark_sensor.serial.read(4) # The expected response length is 4 bytes
# Check if the response is correct or an error reply
# ~ if len(response) == 0:
# ~ print("No Response Received")
# ~ print("")
# ~ elif response[0] == mb_address and response[1] == clear_energy_command:
# ~ print("Energy Reset")
# ~ print("")
# ~ elif response[0] == mb_address and response[1] == 0xC2:
# ~ print("Error resetting energy. Abnormal code:", response[2])
# ~ print("")
# ~ else:
# ~ print("Unexpected response:", response)
# ~ print("")
#fucntion to clear the energy data from the powark sensor
def clear_energy_used(port, mb_adrress):
try:
powark_sensor.write_register(clear_energy_command, 6)
print("Energy data cleared successfully.")
except Exception as e:
print(f"Error clearing energy data: {e}")
#function to display sensor values to the console
def print_powark_values():
print(f"Voltage: {powark_sensor_data['volts']} Volts")
print("-------------------------")
print(f"Current: {powark_sensor_data['ampslow']} Amps")
print("-------------------------")
#converts Watts to Kilowatts if the value exceeds 999.99
if powark_sensor_data['wattslow'] >= 1000:
print(f"Charging Rate: {powark_sensor_data['wattslow']/1000} Kilowatts")
else:
print(f"Charging Rate: {powark_sensor_data['wattslow']} Watts")
print("-------------------------")
#converts energy (watt hours) to Kilowatts if the value exceeds 999.99
if powark_sensor_data['watthourslow'] >= 1000:
print(f"Energy Added: {powark_sensor_data['watthourslow']/1000} Kilowatts")
else:
print(f"Energy Added: {powark_sensor_data['watthourslow']} Watts")
print("-------------------------")
print(f"Frequency: {powark_sensor_data['hertz']} Hz")
print("")
print(f"{powark_sensor_data['powerfactor']} Powerfactor")
print("")
print(f"{powark_sensor_data['alarm']}")
print("")
print_powark_values()
print("Writing Data to mb_powark_data.txt File...")
write_data_to_file(powark_sensor_data, file_path)
print("")
print("Now Closing Serial Port")
powark_sensor.serial.close()
print("")
#
#user input handling to start logging data to the file
entered_value = input('Would you like to start a session?: y or n ')
print("")
if entered_value == 'y':
time.sleep(1)
print("Your charging session has started!")
print("")
max_iterations = int(input("How many iterations would you like this to run?: "))
print("")
iteration_count = 0
while iteration_count <= max_iterations:
# Read sensor data
powark_sensor_data = read_sensor_data()
# write_data_to_file(powark_sensor_data, file_path)
write_data_to_file(powark_sensor_data, file_path)
#print values to console
print_powark_values()
time.sleep(1)
# Close the serial port
powark_sensor.serial.close()
iteration_count += 1
if iteration_count >= max_iterations:
print("")
print("Ports now closed")
print("")
#print("Clearing Data")
#clear_energy_used('/dev/ttyUSB0', mb_address)
print("Thanks for using Powark!")
break
# Wait for the specified delay before the next iteration
else:
print("Ports now closed")
print("Thanks for using Powark!")
2 ответа
Наконец-то получил! Оказывается, правильная команда для сброса энергии для PZEM 016 — это ._perform_command(0x42, '').
По мануалу для регистрации ничего не надо писать , я бы попробовал что-то вроде этого:
powark_sensor.write_register(clear_energy_command, None, 0, 6, False) # address, value, number of decimals, function code, signed)
Я не уверен, что минимальный способен справиться с письмомNone
в регистр. И даже если это так, я думаю, ваше устройство не ожидает, что вы напишете для регистрации.0x42
но просто записываю 4 байта, которые вы видите в руководстве (я понял это, прочитав этот фрагмент кода
Имея это в виду, я бы посоветовал вам использовать pyserial для записи этих 4 необработанных байтов в порт. Чтобы вычислить CRC, вы можете попробовать использовать внутреннюю функцию минимума:
minimalmodbus._calculate_crc_string('\x01\x42')
И прикрепите их после\x42
.