找回密码
 注册
查看: 2053|回复: 4

CT-3 under Linux

[复制链接]

1

主题

0

回帖

15

积分

Newbie

积分
15
发表于: 23-03-2023 04:28:02
| 显示全部楼层 |阅读模式
Hello,
We wouldlike to use serial communication protocol to interface with CT-3 with Linux. Doyou have API or Libraries for linux? Or could you detail the serial protocol used?

回复

使用道具 举报

0

主题

1

回帖

16

积分

Newbie

积分
16
发表于: 22-05-2024 16:26:03
| 显示全部楼层
Same here. Is there a Linux version of the Shizuku System Box?
请问有Linux的版本的连接PC的软件吗?
回复

使用道具 举报

29

主题

62

回帖

2863

积分

Administrator

积分
2863
发表于: 23-05-2024 03:05:18
| 显示全部楼层
Hello customer,
We only develop software that supports windows.
回复

使用道具 举报

0

主题

2

回帖

26

积分

Newbie

积分
26
发表于: 21-06-2024 10:59:45
| 显示全部楼层
I just rewrote a Python script for the CT-2 I found here for the CT-3:

  1. #!/usr/bin/env python
  2. from serial import Serial as Serial

  3. from struct import unpack as unpack
  4. from struct import pack_into as pack_into
  5. from argparse import ArgumentParser as ArgumentParser
  6. from argparse import FileType as FileType

  7. from time import sleep as sleep

  8. from sys import exit as exit

  9. # actions along with their description
  10. __actions = ["list", "read"]
  11. __action_descriptions = {
  12.     "list": "Show this list and exit",
  13.     "read": "Reads energy value from the meter",
  14. }

  15. # values for read operation
  16. __gets = ["all", "voltage", "current", "power", "voltageDP", "voltageDM", "energy"]


  17. class AVHzY_CT2:

  18.     def __init__(self, device, action, repeat, time, reads, output):
  19.         self.__action = action
  20.         self.__repeat = repeat
  21.         self.__time = time
  22.         self.__output = output
  23.         self.__ser = Serial(device)

  24.         self.__first_exec = True
  25.         self.__timestamp = 0
  26.         self.__energy = 0
  27.         self.__prev_timestamp = 0

  28.         if reads == "all":
  29.             self.__reads = [
  30.                 "voltage",
  31.                 "current",
  32.                 "power",
  33.                 "voltageDP",
  34.                 "voltageDM",
  35.                 "energy",
  36.             ]
  37.         else:
  38.             self.__reads = reads

  39.         # action handlers in a dictionary
  40.         self.__action_handlers = {
  41.             "list": self.__action_list,
  42.             "read": self.__action_read,
  43.         }

  44.     def __del__(self):
  45.         self.__ser.close()
  46.         self.__output.close()

  47.     def __action_list(self):
  48.         print("Actions list:")
  49.         for action in __action_descriptions:
  50.             print("  ", action, "\t", __action_descriptions[action])

  51.     def xor_checksum(self, byte_array, start, end):
  52.         checksum = 0
  53.         for byte in byte_array[start : end + 1]:
  54.             checksum ^= byte
  55.         return checksum

  56.     def __action_read(self):

  57.         if self.__first_exec:
  58.             self.__output.write("time")
  59.             for r in self.__reads:
  60.                 self.__output.write(",{0}".format(r))
  61.             self.__output.write("\n")
  62.             self.__first_exec = False

  63.         packet = self.__ser.read(35)
  64.         packetData = unpack("ffffQ", packet[9:33])

  65.         voltage = packetData[0]
  66.         current = abs(packetData[1])
  67.         power = packetData[1] * packetData[0]
  68.         voltageDP = packetData[2]
  69.         voltageDM = packetData[3]
  70.         self.__timestamp = packetData[4]

  71.         self.__output.write("{0}".format(self.__timestamp))
  72.         if "voltage" in self.__reads:
  73.             self.__output.write(",{0}".format(voltage))
  74.         if "current" in self.__reads:
  75.             self.__output.write(",{0}".format(current))
  76.         if "power" in self.__reads:
  77.             self.__output.write(",{0}".format(power))
  78.         if "voltageDP" in self.__reads:
  79.             self.__output.write(",{0}".format(voltageDP))
  80.         if "voltageDM" in self.__reads:
  81.             self.__output.write(",{0}".format(voltageDM))
  82.         if "energy" in self.__reads:
  83.             if self.__prev_timestamp == 0:
  84.                 self.__output.write(",0")
  85.             else:
  86.                 self.__energy += (
  87.                     (self.__timestamp - self.__prev_timestamp) * power
  88.                 ) / 3600000000
  89.                 self.__output.write(",{0}".format(self.__energy))
  90.         self.__output.write("\n")
  91.         self.__output.flush()

  92.         self.__prev_timestamp = self.__timestamp

  93.     def perform_action(self):
  94.         if self.__action == "list":
  95.             self.__action_list()
  96.             return

  97.         # Clear input buffer
  98.         self.__ser.reset_input_buffer()

  99.         # Reset device
  100.         resetCommand = bytearray.fromhex("A5 04 00 00 00 01 07 00 00 06 5A")
  101.         self.__ser.write(resetCommand)
  102.         self.__ser.read(11)  # Clear response to command

  103.         # Reset record timer
  104.         resetRecordTimerCommand = bytearray.fromhex("A5 04 00 00 00 01 0C 0A 00 07 5A")
  105.         self.__ser.write(resetRecordTimerCommand)
  106.         self.__ser.read(11)  # Clear response to command

  107.         # Start reading
  108.         startReadCommand = bytearray.fromhex(
  109.             "A5 08 00 00 00 01 09 0B 00 00 00 00 00 00 5A"
  110.         )
  111.         pack_into("I", startReadCommand, 9, self.__time)
  112.         startReadCommand[13] = self.xor_checksum(startReadCommand, 5, 12)
  113.         self.__ser.write(startReadCommand)
  114.         self.__ser.read(11)  # Clear response to command

  115.         count = 0
  116.         while count != self.__repeat:

  117.             try:
  118.                 self.__action_handlers[self.__action]()
  119.             except KeyboardInterrupt:
  120.                 break

  121.             if self.__repeat != -1:
  122.                 count += 1

  123.     def change_output(self, output):
  124.         self.__output.close()
  125.         self.__output = output


  126. def main():

  127.     # -------------------------------------- OPTION PARSING
  128.     parser = ArgumentParser(
  129.         description="Program to interact with the AVHzY CT-2 power meter"
  130.     )
  131.     parser.add_argument(
  132.         "action",
  133.         metavar="action",
  134.         choices=__actions,
  135.         help="The action to perform [choices: %(choices)s]",
  136.     )
  137.     parser.add_argument(
  138.         "-d",
  139.         "--device",
  140.         default="/dev/ttyACM0",
  141.         help="Path to the device [default: %(default)s]",
  142.     )
  143.     parser.add_argument(
  144.         "-r",
  145.         "--repeat",
  146.         type=int,
  147.         default=-1,
  148.         help="How many times to repeat the operation. Must be in [-1, inf[ (-1: infinite) [default: %(default)s]",
  149.     )
  150.     parser.add_argument(
  151.         "-t",
  152.         "--time",
  153.         type=int,
  154.         default=100,
  155.         help="The time (in miliseconds) to wait between each action iteration.",
  156.     )
  157.     parser.add_argument(
  158.         "-o",
  159.         "--output",
  160.         default="-",
  161.         type=FileType("w"),
  162.         help="Where to write output of the action [default: stdout]",
  163.     )
  164.     parser.add_argument(
  165.         "-g",
  166.         "--get",
  167.         default="all",
  168.         choices=__gets,
  169.         nargs="+",
  170.         help="For read operation: what to get from power meter [choices: %(choices)s default: %(default)s",
  171.     )
  172.     args = parser.parse_args()

  173.     if args.repeat < -1:
  174.         print("ERROR: repeat must be in >= -1")
  175.         parser.print_usage()
  176.         exit(1)

  177.     if args.time < 1:
  178.         print("ERROR: time must be > 1")
  179.         parser.print_usage()
  180.         exit(1)

  181.     AVHzY_CT2(
  182.         args.device, args.action, args.repeat, args.time, args.get, args.output
  183.     ).perform_action()
  184.     exit(0)


  185. if __name__ == "__main__":
  186.     main()
复制代码
回复

使用道具 举报

0

主题

1

回帖

20

积分

Newbie

积分
20
发表于: 17-07-2024 00:30:35
| 显示全部楼层
Looking promising! @TheNetStriker do u have any more info on the commands sent or any more decoded data?
Works ok for me but a didn't get any timestamp and updated the code to use the power value from the CT-3.

Is there any one that has more information regarding the data sent from the CT-3, eg temperature, energy (mWh, mAh), timestamp. I wasn't able to find any more useful data.

  1. from serial import Serial
  2. from struct import unpack as unpack
  3. from struct import pack_into as pack_into
  4. from argparse import ArgumentParser, FileType
  5. from time import time, sleep, gmtime, localtime, mktime
  6. from sys import exit

  7. # actions along with their description, no list function
  8. __actions = ["read"]
  9. __action_descriptions = {
  10.     "read": "Reads energy value from the meter"
  11. }

  12. # values for read operation
  13. __gets = ["all", "voltage", "current", "power", "voltageDP", "voltageDM", "energy"]

  14. class AVHzY_CT3:
  15.     def __init__(self, device, action, repeat, time, reads, output):
  16.         self.__action = action
  17.         self.__repeat = repeat
  18.         self.__time = time
  19.         self.__output = output
  20.         self.__ser = Serial(device)

  21.         self.__first_exec = True
  22.         self.__timestamp = 0
  23.         self.__energy = 0
  24.         self.__prev_timestamp = 0

  25.         if reads == "all":
  26.             self.__reads = ["voltage", "current", "power", "voltageDP", "voltageDM", "energy"]
  27.         else:
  28.             self.__reads = reads

  29.         # action handlers in a dictionary
  30.         self.__action_handlers = {
  31.             "list": self.__action_list,
  32.             "read": self.__action_read,
  33.         }
  34.    
  35.     def __del__(self):
  36.         self.__ser.close()
  37.         self.__output.close()
  38.    
  39.     def __action_list(self):
  40.         print("Actions list:")
  41.         for action in __action_descriptions:
  42.             print("  ", action, "\t", __action_descriptions[action])

  43.     def xor_checksum(self, byte_array, start, end):
  44.         checksum = 0
  45.         for byte in byte_array[start : end + 1]:
  46.             checksum ^= byte
  47.         return checksum

  48.     def __action_read(self):
  49.         if self.__first_exec:
  50.             self.__output.write("time")
  51.             for r in self.__reads:
  52.                 self.__output.write(",{0}".format(r))
  53.             self.__output.write("\n")
  54.             self.__first_exec = False
  55.         
  56.         self.__ser.flushInput()
  57.         while True:
  58.             packet = self.__ser.read(30)
  59.             if len(packet) == 30:
  60.                 break

  61.         packetData = unpack("<fffff", packet[9:29])  # Adjust byte order if necessary
  62.         
  63.         voltage = packetData[0]
  64.         current = packetData[1]
  65.         power = packetData[2]
  66.         voltageDP = packetData[3]
  67.         voltageDM = packetData[4]      
  68.         self.__timestamp = int(time()*1000)
  69.         
  70.         self.__output.write("Timestamp: {0}".format(self.__timestamp))
  71.         if "voltage" in self.__reads:
  72.             self.__output.write(", Voltage: {0:.6f}".format(voltage))
  73.         if "current" in self.__reads:
  74.             self.__output.write(", Current: {0:.6f}".format(current))
  75.         if "power" in self.__reads:
  76.             self.__output.write(", Power: {0:.6f}".format(power))
  77.         if "voltageDP" in self.__reads:
  78.             self.__output.write(", D+: {0:.6f}".format(voltageDP))
  79.         if "voltageDM" in self.__reads:
  80.             self.__output.write(", D-: {0:.6f}".format(voltageDM))
  81.         
  82.         if "energy" in self.__reads:
  83.             if self.__prev_timestamp == 0:
  84.                 self.__output.write(",0")
  85.             else:
  86.                 self.__energy += ((self.__timestamp - self.__prev_timestamp) * power) / 3600000000
  87.                 self.__output.write(", Energy: {0:.6f}".format(self.__energy))
  88.         
  89.         self.__output.write("\n")
  90.         self.__output.flush()

  91.         self.__prev_timestamp = self.__timestamp

  92.     def perform_action(self):

  93.         self.__ser.reset_input_buffer()

  94.         resetCommand = bytearray.fromhex("A5 04 00 00 00 01 07 00 00 06 5A")
  95.         self.__ser.write(resetCommand)
  96.         self.__ser.read(11)

  97.         resetRecordTimerCommand = bytearray.fromhex("A5 04 00 00 00 01 0C 0A 00 07 5A")
  98.         self.__ser.write(resetRecordTimerCommand)
  99.         self.__ser.read(11)

  100.         startReadCommand = bytearray.fromhex(
  101.             "A5 08 00 00 00 01 09 0B 00 00 00 00 00 00 5A"
  102.         )
  103.         pack_into("I", startReadCommand, 9, self.__time)
  104.         startReadCommand[13] = self.xor_checksum(startReadCommand, 5, 12)
  105.         self.__ser.write(startReadCommand)
  106.         self.__ser.read(11)

  107.         count = 0
  108.         while count != self.__repeat:
  109.             try:
  110.                 self.__action_handlers[self.__action]()
  111.             except KeyboardInterrupt:
  112.                 break

  113.             if self.__repeat != -1:
  114.                 count += 1
  115.             sleep(self.__time / 1000.0)

  116.     def change_output(self, output):
  117.         self.__output.close()
  118.         self.__output = output


  119. def main():
  120.     parser = ArgumentParser(description="Program to interact with the AVHzY CT-2 power meter")
  121.     parser.add_argument("action", metavar="action", choices=__actions, help="The action to perform [choices: %(choices)s]")
  122.     parser.add_argument("-d", "--device", default="/dev/ttyACM0", help="Path to the device [default: %(default)s]")
  123.     parser.add_argument("-r", "--repeat", type=int, default=-1, help="How many times to repeat the operation. Must be in [-1, inf[ (-1: infinite) [default: %(default)s]")
  124.     parser.add_argument("-t", "--time", type=int, default=100, help="The time (in milliseconds) to wait between each action iteration.")
  125.     parser.add_argument("-o", "--output", default="-", type=FileType("w"), help="Where to write output of the action [default: stdout]")
  126.     parser.add_argument("-g", "--get", default="all", choices=__gets, nargs="+", help="For read operation: what to get from power meter [choices: %(choices)s default: %(default)s")
  127.     args = parser.parse_args()

  128.     if args.repeat < -1:
  129.         print("ERROR: repeat must be in >= -1")
  130.         parser.print_usage()
  131.         exit(1)

  132.     if args.time < 1:
  133.         print("ERROR: time must be > 1")
  134.         parser.print_usage()
  135.         exit(1)

  136.     AVHzY_CT3(args.device, args.action, args.repeat, args.time, args.get, args.output).perform_action()
  137.     exit(0)

  138. if __name__ == '__main__':
  139.     main()
复制代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

Archive|手机版|小黑屋|AVHzY Forum

GMT-8, 03-04-2025 17:35 , Processed in 0.095039 sec., 17 queries .

Powered by Discuz! X3.5

© 2001-2025, Tencent Cloud.

快速回复 返回顶部 返回列表