Energy Monitor с использованием Minimalmodbus и устройства PZEM 016

Я пытаюсь обнулить счетчик энергии для монитора энергии Peacefair AC PZEM 016. У меня есть небольшой симпатичный скрипт, который измеряет значения регистров, однако я не могу очистить энергию через библиотекуминимальногомодбуса и продолжаю получать множество ошибок при попытке кода, но основная из них, которую я получаю, это «Подчиненное устройство сообщило о незаконном значении данных. " Ссылка на документацию находится здесь https://images-na.ssl-images-amazon.com/images/I/81GtkIOyZaL.pdf . «Сброс энергии» находится в разделе 2.5, но клянусь, это не должно быть так сложно.

      import minimalmodbus
import datetime
import time
import serial
import sys
import os
import threading
from crccheck.crc import Crc16

mb_address = 1 
clear_energy_command = 0x42
powark_sensor = minimalmodbus.Instrument('/dev/ttyUSB0',mb_address) 

powark_sensor.serial.baudrate = 9600
powark_sensor.serial.bytesize = 8
powark_sensor.serial.parity = minimalmodbus.serial.PARITY_NONE
powark_sensor.serial.stopbits = 1
powark_sensor.serial.timeout = 0.5
powark_sensor.mode = minimalmodbus.MODE_RTU

file_path = "/home/josiahferguson/Documents/powark/mb_powark_data.txt"

powark_sensor.clear_buffers_before_each_transaction = True
powark_sensor.close_port_after_each_call = True

print("")
print("Requesting Data From Powark...")
print("")
#single_data = powark.sensor.read_register(1, 1, 3, False)
#print(f"Single register data = {single_data}")

#function to read powark data
def read_sensor_data():
    data = powark_sensor.read_registers(0, 10, 4)

    volts = data[0] / 10
    ampslow = int(data[1]) / 1000
    ampshigh = int(data[2]) / 1000
    wattslow = int(data[3]) / 10
    wattshigh = int(data[4]) / 10
    watthourslow = int(data[5])
    watthourshigh = int(data[6])
    hertz = int(data[7]) / 10
    powerfactor = int(data[8]) / 100
    alarm = data[9]

    processed_data = {
        "volts": volts,
        "ampslow": ampslow,
        "ampshigh": ampshigh,
        "wattslow": wattslow,
        "wattshigh": wattshigh,
        "watthourslow": watthourslow,
        "watthourshigh": watthourshigh,
        "hertz": hertz,
        "powerfactor": powerfactor,
        "alarm": alarm,
    }

    return processed_data

powark_sensor_data = read_sensor_data()

#function to write sensor data to file
def write_data_to_file(powark_sensor_data, file_path):
    with open (file_path, "a") as file:
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
        file.write(f"{timestamp}: {powark_sensor_data}\n")

def low_byte(value):
    return value & 0xFF

def high_byte(value):
    return (value >> 8) & 0xFF

powark_sensor.write_long(mb_address, 0x42, False, 2)

# ~ powark_sensor.serial.open()
# ~ time.sleep(1)
# ~ # Calculate the CRC-16 using the CRC library
# ~ crc = Crc16.calc(bytearray([mb_address, clear_energy_command]))

# ~ # Send the slave address, reset command, and CRC-16
# ~ powark_sensor.serial.write(bytearray([mb_address, clear_energy_command, low_byte(crc), high_byte(crc)]))

# ~ # Wait for the energy monitor to process the command
# ~ time.sleep(0.26)

# ~ response = powark_sensor.serial.read(4)  # The expected response length is 4 bytes
# Check if the response is correct or an error reply
# ~ if len(response) == 0:
    # ~ print("No Response Received")
    # ~ print("")
# ~ elif response[0] == mb_address and response[1] == clear_energy_command:
    # ~ print("Energy Reset")
    # ~ print("")
# ~ elif response[0] == mb_address and response[1] == 0xC2:
    # ~ print("Error resetting energy. Abnormal code:", response[2])
    # ~ print("")
# ~ else:
    # ~ print("Unexpected response:", response)
    # ~ print("")


#fucntion to clear the energy data from the powark sensor
def clear_energy_used(port, mb_adrress):
    try:
        powark_sensor.write_register(clear_energy_command, 6)
        print("Energy data cleared successfully.")
    except Exception as e:
        print(f"Error clearing energy data: {e}")

#function to display sensor values to the console
def print_powark_values():
    print(f"Voltage: {powark_sensor_data['volts']} Volts")
    print("-------------------------")
    print(f"Current: {powark_sensor_data['ampslow']} Amps")
    print("-------------------------")

    #converts Watts to Kilowatts if the value exceeds 999.99
    if powark_sensor_data['wattslow'] >= 1000:
        print(f"Charging Rate: {powark_sensor_data['wattslow']/1000} Kilowatts")
    else:
        print(f"Charging Rate: {powark_sensor_data['wattslow']} Watts")
    print("-------------------------")

    #converts energy (watt hours) to Kilowatts if the value exceeds 999.99
    if powark_sensor_data['watthourslow'] >= 1000:
        print(f"Energy Added: {powark_sensor_data['watthourslow']/1000} Kilowatts")
    else:
        print(f"Energy Added: {powark_sensor_data['watthourslow']} Watts")

    print("-------------------------")
    print(f"Frequency: {powark_sensor_data['hertz']} Hz")
    print("")
    print(f"{powark_sensor_data['powerfactor']} Powerfactor")
    print("")
    print(f"{powark_sensor_data['alarm']}")
    print("")
    
print_powark_values()

print("Writing Data to mb_powark_data.txt File...")
write_data_to_file(powark_sensor_data, file_path)
print("")
print("Now Closing Serial Port")
powark_sensor.serial.close()
print("")

#
#user input handling to start logging data to the file
entered_value = input('Would you like to start a session?: y or n  ')
print("")

if entered_value == 'y':
    time.sleep(1)
    print("Your charging session has started!")
    print("")
    max_iterations = int(input("How many iterations would you like this to run?:  "))
    print("")
    iteration_count = 0

    while iteration_count <= max_iterations:
        # Read sensor data
        powark_sensor_data = read_sensor_data()
        # write_data_to_file(powark_sensor_data, file_path)
        write_data_to_file(powark_sensor_data, file_path)

    #print values to console
        print_powark_values()
        time.sleep(1)
    # Close the serial port
        powark_sensor.serial.close()
        iteration_count += 1
        if iteration_count >= max_iterations:
            print("")
            print("Ports now closed")
            print("")
            #print("Clearing Data")
            #clear_energy_used('/dev/ttyUSB0', mb_address)
            print("Thanks for using Powark!")
            break
    # Wait for the specified delay before the next iteration
else:
    print("Ports now closed")
    print("Thanks for using Powark!")

2 ответа

Наконец-то получил! Оказывается, правильная команда для сброса энергии для PZEM 016 — это ._perform_command(0x42, '').

По мануалу для регистрации ничего не надо писать , я бы попробовал что-то вроде этого:

      powark_sensor.write_register(clear_energy_command, None, 0, 6, False) # address, value, number of decimals, function code, signed)

Я не уверен, что минимальный способен справиться с письмомNoneв регистр. И даже если это так, я думаю, ваше устройство не ожидает, что вы напишете для регистрации.0x42но просто записываю 4 байта, которые вы видите в руководстве (я понял это, прочитав этот фрагмент кода

Имея это в виду, я бы посоветовал вам использовать pyserial для записи этих 4 необработанных байтов в порт. Чтобы вычислить CRC, вы можете попробовать использовать внутреннюю функцию минимума:

      minimalmodbus._calculate_crc_string('\x01\x42')

И прикрепите их после\x42.

Другие вопросы по тегам