BatGeek 发表于 23-03-2023 04:28:02

CT-3 under Linux

Hello, We wouldlike to use serial communication protocol to interface with CT-3 with Linux. Doyou have API or Libraries for linux? Or could you detail the serial protocol used?

zhoudada 发表于 22-05-2024 16:26:03

Same here. Is there a Linux version of the Shizuku System Box?
请问有Linux的版本的连接PC的软件吗?

admin 发表于 23-05-2024 03:05:18

Hello customer,
We only develop software that supports windows.

TheNetStriker 发表于 21-06-2024 10:59:45

I just rewrote a Python script for the CT-2 I found here for the CT-3:

#!/usr/bin/env python
from serial import Serial as Serial

from struct import unpack as unpack
from struct import pack_into as pack_into
from argparse import ArgumentParser as ArgumentParser
from argparse import FileType as FileType

from time import sleep as sleep

from sys import exit as exit

# actions along with their description
__actions = ["list", "read"]
__action_descriptions = {
    "list": "Show this list and exit",
    "read": "Reads energy value from the meter",
}

# values for read operation
__gets = ["all", "voltage", "current", "power", "voltageDP", "voltageDM", "energy"]


class AVHzY_CT2:

    def __init__(self, device, action, repeat, time, reads, output):
      self.__action = action
      self.__repeat = repeat
      self.__time = time
      self.__output = output
      self.__ser = Serial(device)

      self.__first_exec = True
      self.__timestamp = 0
      self.__energy = 0
      self.__prev_timestamp = 0

      if reads == "all":
            self.__reads = [
                "voltage",
                "current",
                "power",
                "voltageDP",
                "voltageDM",
                "energy",
            ]
      else:
            self.__reads = reads

      # action handlers in a dictionary
      self.__action_handlers = {
            "list": self.__action_list,
            "read": self.__action_read,
      }

    def __del__(self):
      self.__ser.close()
      self.__output.close()

    def __action_list(self):
      print("Actions list:")
      for action in __action_descriptions:
            print("", action, "\t", __action_descriptions)

    def xor_checksum(self, byte_array, start, end):
      checksum = 0
      for byte in byte_array:
            checksum ^= byte
      return checksum

    def __action_read(self):

      if self.__first_exec:
            self.__output.write("time")
            for r in self.__reads:
                self.__output.write(",{0}".format(r))
            self.__output.write("\n")
            self.__first_exec = False

      packet = self.__ser.read(35)
      packetData = unpack("ffffQ", packet)

      voltage = packetData
      current = abs(packetData)
      power = packetData * packetData
      voltageDP = packetData
      voltageDM = packetData
      self.__timestamp = packetData

      self.__output.write("{0}".format(self.__timestamp))
      if "voltage" in self.__reads:
            self.__output.write(",{0}".format(voltage))
      if "current" in self.__reads:
            self.__output.write(",{0}".format(current))
      if "power" in self.__reads:
            self.__output.write(",{0}".format(power))
      if "voltageDP" in self.__reads:
            self.__output.write(",{0}".format(voltageDP))
      if "voltageDM" in self.__reads:
            self.__output.write(",{0}".format(voltageDM))
      if "energy" in self.__reads:
            if self.__prev_timestamp == 0:
                self.__output.write(",0")
            else:
                self.__energy += (
                  (self.__timestamp - self.__prev_timestamp) * power
                ) / 3600000000
                self.__output.write(",{0}".format(self.__energy))
      self.__output.write("\n")
      self.__output.flush()

      self.__prev_timestamp = self.__timestamp

    def perform_action(self):
      if self.__action == "list":
            self.__action_list()
            return

      # Clear input buffer
      self.__ser.reset_input_buffer()

      # Reset device
      resetCommand = bytearray.fromhex("A5 04 00 00 00 01 07 00 00 06 5A")
      self.__ser.write(resetCommand)
      self.__ser.read(11)# Clear response to command

      # Reset record timer
      resetRecordTimerCommand = bytearray.fromhex("A5 04 00 00 00 01 0C 0A 00 07 5A")
      self.__ser.write(resetRecordTimerCommand)
      self.__ser.read(11)# Clear response to command

      # Start reading
      startReadCommand = bytearray.fromhex(
            "A5 08 00 00 00 01 09 0B 00 00 00 00 00 00 5A"
      )
      pack_into("I", startReadCommand, 9, self.__time)
      startReadCommand = self.xor_checksum(startReadCommand, 5, 12)
      self.__ser.write(startReadCommand)
      self.__ser.read(11)# Clear response to command

      count = 0
      while count != self.__repeat:

            try:
                self.__action_handlers()
            except KeyboardInterrupt:
                break

            if self.__repeat != -1:
                count += 1

    def change_output(self, output):
      self.__output.close()
      self.__output = output


def main():

    # -------------------------------------- OPTION PARSING
    parser = ArgumentParser(
      description="Program to interact with the AVHzY CT-2 power meter"
    )
    parser.add_argument(
      "action",
      metavar="action",
      choices=__actions,
      help="The action to perform ",
    )
    parser.add_argument(
      "-d",
      "--device",
      default="/dev/ttyACM0",
      help="Path to the device ",
    )
    parser.add_argument(
      "-r",
      "--repeat",
      type=int,
      default=-1,
      help="How many times to repeat the operation. Must be in [-1, inf[ (-1: infinite) ",
    )
    parser.add_argument(
      "-t",
      "--time",
      type=int,
      default=100,
      help="The time (in miliseconds) to wait between each action iteration.",
    )
    parser.add_argument(
      "-o",
      "--output",
      default="-",
      type=FileType("w"),
      help="Where to write output of the action ",
    )
    parser.add_argument(
      "-g",
      "--get",
      default="all",
      choices=__gets,
      nargs="+",
      help="For read operation: what to get from power meter [choices: %(choices)s default: %(default)s",
    )
    args = parser.parse_args()

    if args.repeat < -1:
      print("ERROR: repeat must be in >= -1")
      parser.print_usage()
      exit(1)

    if args.time < 1:
      print("ERROR: time must be > 1")
      parser.print_usage()
      exit(1)

    AVHzY_CT2(
      args.device, args.action, args.repeat, args.time, args.get, args.output
    ).perform_action()
    exit(0)


if __name__ == "__main__":
    main()

joadswe 发表于 17-07-2024 00:30:35

Looking promising! @TheNetStriker do u have any more info on the commands sent or any more decoded data?
Works ok for me but a didn't get any timestamp and updated the code to use the power value from the CT-3.

Is there any one that has more information regarding the data sent from the CT-3, eg temperature, energy (mWh, mAh), timestamp. I wasn't able to find any more useful data.

from serial import Serial
from struct import unpack as unpack
from struct import pack_into as pack_into
from argparse import ArgumentParser, FileType
from time import time, sleep, gmtime, localtime, mktime
from sys import exit

# actions along with their description, no list function
__actions = ["read"]
__action_descriptions = {
    "read": "Reads energy value from the meter"
}

# values for read operation
__gets = ["all", "voltage", "current", "power", "voltageDP", "voltageDM", "energy"]

class AVHzY_CT3:
    def __init__(self, device, action, repeat, time, reads, output):
      self.__action = action
      self.__repeat = repeat
      self.__time = time
      self.__output = output
      self.__ser = Serial(device)

      self.__first_exec = True
      self.__timestamp = 0
      self.__energy = 0
      self.__prev_timestamp = 0

      if reads == "all":
            self.__reads = ["voltage", "current", "power", "voltageDP", "voltageDM", "energy"]
      else:
            self.__reads = reads

      # action handlers in a dictionary
      self.__action_handlers = {
            "list": self.__action_list,
            "read": self.__action_read,
      }
   
    def __del__(self):
      self.__ser.close()
      self.__output.close()
   
    def __action_list(self):
      print("Actions list:")
      for action in __action_descriptions:
            print("", action, "\t", __action_descriptions)

    def xor_checksum(self, byte_array, start, end):
      checksum = 0
      for byte in byte_array:
            checksum ^= byte
      return checksum

    def __action_read(self):
      if self.__first_exec:
            self.__output.write("time")
            for r in self.__reads:
                self.__output.write(",{0}".format(r))
            self.__output.write("\n")
            self.__first_exec = False
      
      self.__ser.flushInput()
      while True:
            packet = self.__ser.read(30)
            if len(packet) == 30:
                break

      packetData = unpack("<fffff", packet)# Adjust byte order if necessary
      
      voltage = packetData
      current = packetData
      power = packetData
      voltageDP = packetData
      voltageDM = packetData      
      self.__timestamp = int(time()*1000)
      
      self.__output.write("Timestamp: {0}".format(self.__timestamp))
      if "voltage" in self.__reads:
            self.__output.write(", Voltage: {0:.6f}".format(voltage))
      if "current" in self.__reads:
            self.__output.write(", Current: {0:.6f}".format(current))
      if "power" in self.__reads:
            self.__output.write(", Power: {0:.6f}".format(power))
      if "voltageDP" in self.__reads:
            self.__output.write(", D+: {0:.6f}".format(voltageDP))
      if "voltageDM" in self.__reads:
            self.__output.write(", D-: {0:.6f}".format(voltageDM))
      
      if "energy" in self.__reads:
            if self.__prev_timestamp == 0:
                self.__output.write(",0")
            else:
                self.__energy += ((self.__timestamp - self.__prev_timestamp) * power) / 3600000000
                self.__output.write(", Energy: {0:.6f}".format(self.__energy))
      
      self.__output.write("\n")
      self.__output.flush()

      self.__prev_timestamp = self.__timestamp

    def perform_action(self):

      self.__ser.reset_input_buffer()

      resetCommand = bytearray.fromhex("A5 04 00 00 00 01 07 00 00 06 5A")
      self.__ser.write(resetCommand)
      self.__ser.read(11)

      resetRecordTimerCommand = bytearray.fromhex("A5 04 00 00 00 01 0C 0A 00 07 5A")
      self.__ser.write(resetRecordTimerCommand)
      self.__ser.read(11)

      startReadCommand = bytearray.fromhex(
            "A5 08 00 00 00 01 09 0B 00 00 00 00 00 00 5A"
      )
      pack_into("I", startReadCommand, 9, self.__time)
      startReadCommand = self.xor_checksum(startReadCommand, 5, 12)
      self.__ser.write(startReadCommand)
      self.__ser.read(11)

      count = 0
      while count != self.__repeat:
            try:
                self.__action_handlers()
            except KeyboardInterrupt:
                break

            if self.__repeat != -1:
                count += 1
            sleep(self.__time / 1000.0)

    def change_output(self, output):
      self.__output.close()
      self.__output = output


def main():
    parser = ArgumentParser(description="Program to interact with the AVHzY CT-2 power meter")
    parser.add_argument("action", metavar="action", choices=__actions, help="The action to perform ")
    parser.add_argument("-d", "--device", default="/dev/ttyACM0", help="Path to the device ")
    parser.add_argument("-r", "--repeat", type=int, default=-1, help="How many times to repeat the operation. Must be in [-1, inf[ (-1: infinite) ")
    parser.add_argument("-t", "--time", type=int, default=100, help="The time (in milliseconds) to wait between each action iteration.")
    parser.add_argument("-o", "--output", default="-", type=FileType("w"), help="Where to write output of the action ")
    parser.add_argument("-g", "--get", default="all", choices=__gets, nargs="+", help="For read operation: what to get from power meter [choices: %(choices)s default: %(default)s")
    args = parser.parse_args()

    if args.repeat < -1:
      print("ERROR: repeat must be in >= -1")
      parser.print_usage()
      exit(1)

    if args.time < 1:
      print("ERROR: time must be > 1")
      parser.print_usage()
      exit(1)

    AVHzY_CT3(args.device, args.action, args.repeat, args.time, args.get, args.output).perform_action()
    exit(0)

if __name__ == '__main__':
    main()
页: [1]
查看完整版本: CT-3 under Linux