CT-3 under Linux
Hello, We wouldlike to use serial communication protocol to interface with CT-3 with Linux. Doyou have API or Libraries for linux? Or could you detail the serial protocol used?Same here. Is there a Linux version of the Shizuku System Box?
请问有Linux的版本的连接PC的软件吗? Hello customer,
We only develop software that supports windows. I just rewrote a Python script for the CT-2 I found here for the CT-3:
#!/usr/bin/env python
from serial import Serial as Serial
from struct import unpack as unpack
from struct import pack_into as pack_into
from argparse import ArgumentParser as ArgumentParser
from argparse import FileType as FileType
from time import sleep as sleep
from sys import exit as exit
# actions along with their description
__actions = ["list", "read"]
__action_descriptions = {
"list": "Show this list and exit",
"read": "Reads energy value from the meter",
}
# values for read operation
__gets = ["all", "voltage", "current", "power", "voltageDP", "voltageDM", "energy"]
class AVHzY_CT2:
def __init__(self, device, action, repeat, time, reads, output):
self.__action = action
self.__repeat = repeat
self.__time = time
self.__output = output
self.__ser = Serial(device)
self.__first_exec = True
self.__timestamp = 0
self.__energy = 0
self.__prev_timestamp = 0
if reads == "all":
self.__reads = [
"voltage",
"current",
"power",
"voltageDP",
"voltageDM",
"energy",
]
else:
self.__reads = reads
# action handlers in a dictionary
self.__action_handlers = {
"list": self.__action_list,
"read": self.__action_read,
}
def __del__(self):
self.__ser.close()
self.__output.close()
def __action_list(self):
print("Actions list:")
for action in __action_descriptions:
print("", action, "\t", __action_descriptions)
def xor_checksum(self, byte_array, start, end):
checksum = 0
for byte in byte_array:
checksum ^= byte
return checksum
def __action_read(self):
if self.__first_exec:
self.__output.write("time")
for r in self.__reads:
self.__output.write(",{0}".format(r))
self.__output.write("\n")
self.__first_exec = False
packet = self.__ser.read(35)
packetData = unpack("ffffQ", packet)
voltage = packetData
current = abs(packetData)
power = packetData * packetData
voltageDP = packetData
voltageDM = packetData
self.__timestamp = packetData
self.__output.write("{0}".format(self.__timestamp))
if "voltage" in self.__reads:
self.__output.write(",{0}".format(voltage))
if "current" in self.__reads:
self.__output.write(",{0}".format(current))
if "power" in self.__reads:
self.__output.write(",{0}".format(power))
if "voltageDP" in self.__reads:
self.__output.write(",{0}".format(voltageDP))
if "voltageDM" in self.__reads:
self.__output.write(",{0}".format(voltageDM))
if "energy" in self.__reads:
if self.__prev_timestamp == 0:
self.__output.write(",0")
else:
self.__energy += (
(self.__timestamp - self.__prev_timestamp) * power
) / 3600000000
self.__output.write(",{0}".format(self.__energy))
self.__output.write("\n")
self.__output.flush()
self.__prev_timestamp = self.__timestamp
def perform_action(self):
if self.__action == "list":
self.__action_list()
return
# Clear input buffer
self.__ser.reset_input_buffer()
# Reset device
resetCommand = bytearray.fromhex("A5 04 00 00 00 01 07 00 00 06 5A")
self.__ser.write(resetCommand)
self.__ser.read(11)# Clear response to command
# Reset record timer
resetRecordTimerCommand = bytearray.fromhex("A5 04 00 00 00 01 0C 0A 00 07 5A")
self.__ser.write(resetRecordTimerCommand)
self.__ser.read(11)# Clear response to command
# Start reading
startReadCommand = bytearray.fromhex(
"A5 08 00 00 00 01 09 0B 00 00 00 00 00 00 5A"
)
pack_into("I", startReadCommand, 9, self.__time)
startReadCommand = self.xor_checksum(startReadCommand, 5, 12)
self.__ser.write(startReadCommand)
self.__ser.read(11)# Clear response to command
count = 0
while count != self.__repeat:
try:
self.__action_handlers()
except KeyboardInterrupt:
break
if self.__repeat != -1:
count += 1
def change_output(self, output):
self.__output.close()
self.__output = output
def main():
# -------------------------------------- OPTION PARSING
parser = ArgumentParser(
description="Program to interact with the AVHzY CT-2 power meter"
)
parser.add_argument(
"action",
metavar="action",
choices=__actions,
help="The action to perform ",
)
parser.add_argument(
"-d",
"--device",
default="/dev/ttyACM0",
help="Path to the device ",
)
parser.add_argument(
"-r",
"--repeat",
type=int,
default=-1,
help="How many times to repeat the operation. Must be in [-1, inf[ (-1: infinite) ",
)
parser.add_argument(
"-t",
"--time",
type=int,
default=100,
help="The time (in miliseconds) to wait between each action iteration.",
)
parser.add_argument(
"-o",
"--output",
default="-",
type=FileType("w"),
help="Where to write output of the action ",
)
parser.add_argument(
"-g",
"--get",
default="all",
choices=__gets,
nargs="+",
help="For read operation: what to get from power meter [choices: %(choices)s default: %(default)s",
)
args = parser.parse_args()
if args.repeat < -1:
print("ERROR: repeat must be in >= -1")
parser.print_usage()
exit(1)
if args.time < 1:
print("ERROR: time must be > 1")
parser.print_usage()
exit(1)
AVHzY_CT2(
args.device, args.action, args.repeat, args.time, args.get, args.output
).perform_action()
exit(0)
if __name__ == "__main__":
main()
Looking promising! @TheNetStriker do u have any more info on the commands sent or any more decoded data?
Works ok for me but a didn't get any timestamp and updated the code to use the power value from the CT-3.
Is there any one that has more information regarding the data sent from the CT-3, eg temperature, energy (mWh, mAh), timestamp. I wasn't able to find any more useful data.
from serial import Serial
from struct import unpack as unpack
from struct import pack_into as pack_into
from argparse import ArgumentParser, FileType
from time import time, sleep, gmtime, localtime, mktime
from sys import exit
# actions along with their description, no list function
__actions = ["read"]
__action_descriptions = {
"read": "Reads energy value from the meter"
}
# values for read operation
__gets = ["all", "voltage", "current", "power", "voltageDP", "voltageDM", "energy"]
class AVHzY_CT3:
def __init__(self, device, action, repeat, time, reads, output):
self.__action = action
self.__repeat = repeat
self.__time = time
self.__output = output
self.__ser = Serial(device)
self.__first_exec = True
self.__timestamp = 0
self.__energy = 0
self.__prev_timestamp = 0
if reads == "all":
self.__reads = ["voltage", "current", "power", "voltageDP", "voltageDM", "energy"]
else:
self.__reads = reads
# action handlers in a dictionary
self.__action_handlers = {
"list": self.__action_list,
"read": self.__action_read,
}
def __del__(self):
self.__ser.close()
self.__output.close()
def __action_list(self):
print("Actions list:")
for action in __action_descriptions:
print("", action, "\t", __action_descriptions)
def xor_checksum(self, byte_array, start, end):
checksum = 0
for byte in byte_array:
checksum ^= byte
return checksum
def __action_read(self):
if self.__first_exec:
self.__output.write("time")
for r in self.__reads:
self.__output.write(",{0}".format(r))
self.__output.write("\n")
self.__first_exec = False
self.__ser.flushInput()
while True:
packet = self.__ser.read(30)
if len(packet) == 30:
break
packetData = unpack("<fffff", packet)# Adjust byte order if necessary
voltage = packetData
current = packetData
power = packetData
voltageDP = packetData
voltageDM = packetData
self.__timestamp = int(time()*1000)
self.__output.write("Timestamp: {0}".format(self.__timestamp))
if "voltage" in self.__reads:
self.__output.write(", Voltage: {0:.6f}".format(voltage))
if "current" in self.__reads:
self.__output.write(", Current: {0:.6f}".format(current))
if "power" in self.__reads:
self.__output.write(", Power: {0:.6f}".format(power))
if "voltageDP" in self.__reads:
self.__output.write(", D+: {0:.6f}".format(voltageDP))
if "voltageDM" in self.__reads:
self.__output.write(", D-: {0:.6f}".format(voltageDM))
if "energy" in self.__reads:
if self.__prev_timestamp == 0:
self.__output.write(",0")
else:
self.__energy += ((self.__timestamp - self.__prev_timestamp) * power) / 3600000000
self.__output.write(", Energy: {0:.6f}".format(self.__energy))
self.__output.write("\n")
self.__output.flush()
self.__prev_timestamp = self.__timestamp
def perform_action(self):
self.__ser.reset_input_buffer()
resetCommand = bytearray.fromhex("A5 04 00 00 00 01 07 00 00 06 5A")
self.__ser.write(resetCommand)
self.__ser.read(11)
resetRecordTimerCommand = bytearray.fromhex("A5 04 00 00 00 01 0C 0A 00 07 5A")
self.__ser.write(resetRecordTimerCommand)
self.__ser.read(11)
startReadCommand = bytearray.fromhex(
"A5 08 00 00 00 01 09 0B 00 00 00 00 00 00 5A"
)
pack_into("I", startReadCommand, 9, self.__time)
startReadCommand = self.xor_checksum(startReadCommand, 5, 12)
self.__ser.write(startReadCommand)
self.__ser.read(11)
count = 0
while count != self.__repeat:
try:
self.__action_handlers()
except KeyboardInterrupt:
break
if self.__repeat != -1:
count += 1
sleep(self.__time / 1000.0)
def change_output(self, output):
self.__output.close()
self.__output = output
def main():
parser = ArgumentParser(description="Program to interact with the AVHzY CT-2 power meter")
parser.add_argument("action", metavar="action", choices=__actions, help="The action to perform ")
parser.add_argument("-d", "--device", default="/dev/ttyACM0", help="Path to the device ")
parser.add_argument("-r", "--repeat", type=int, default=-1, help="How many times to repeat the operation. Must be in [-1, inf[ (-1: infinite) ")
parser.add_argument("-t", "--time", type=int, default=100, help="The time (in milliseconds) to wait between each action iteration.")
parser.add_argument("-o", "--output", default="-", type=FileType("w"), help="Where to write output of the action ")
parser.add_argument("-g", "--get", default="all", choices=__gets, nargs="+", help="For read operation: what to get from power meter [choices: %(choices)s default: %(default)s")
args = parser.parse_args()
if args.repeat < -1:
print("ERROR: repeat must be in >= -1")
parser.print_usage()
exit(1)
if args.time < 1:
print("ERROR: time must be > 1")
parser.print_usage()
exit(1)
AVHzY_CT3(args.device, args.action, args.repeat, args.time, args.get, args.output).perform_action()
exit(0)
if __name__ == '__main__':
main()
页:
[1]