蓄電池関連のデータを python で取得して、influxdb2 に放り込む。

  •  
  •  
こちらも同様に 5分に1回、influxdbにデータを格納。
#
# https://echonet.jp/wp/wp-content/uploads/pdf/General/Standard/Release/Release_R/Appendix_Release_R_r2.pdf
#
#
# 蓄電池クラス規定: 2-236
#   クラスグループコード: 0x02
#   クラスコード: 0x7D
#   インスタンスコード: 0x01-0x7F
#
#  データ取得対象を、以下の epcにする。
#  0x80 動作状態
#  0x89 異常状態
#  0x97 現在時刻
#  0x98 現在年月日
#
#  0xA4 AC充電可能量
#  0xA5 AC放電可能量
#
#  0xA8 AC積算充電電力量計測値
#  0xA9 AC積算放電電力量計測値
#
#  0xCF 運転動作状態
#
#  0xD3 瞬時充放電電力計測値(プラス:充電、マイナス:放電?)
#
#  0xDA 運転モード設定
#
#  0xE2 蓄電残量1
#  0xE4 蓄電残量3
#
#
import asyncio
import netifaces
import socket
import sys
#
# for influxdb
#
from influxdb_client import InfluxDBClient,Point,WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
#
UDP_RESES = []
UDP_PORT = 3610  # for UDP
GWIP     = "xxx.xxx.xxx.xxx" # Gateway IP address
INTERVAL = 0.03 # udp 送信のインターバル (sec)
# for influxdb2
org = "homemetrics"
bucket = "environment"
token = "api key"
client = InfluxDBClient(url="http://xxx.xxx.xxx.xxx:8086",token=token,org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
#
async def main():
    # 自身のipアドレスを探索
    local_ip = UdpSender.find_local_ip_addr()
    if local_ip is None:
        print("ローカルIPアドレスが見つかりませんでした。")
        return
    #print(f"ローカルIPアドレス: {local_ip}")
    # echonetのコマンドをudp送信
    sender = UdpSender(local_ip)
    # echonetのコマンド受信用
    recv = UdpReceiver()
    await recv.start()
    echonet_cmd = echonet_cmd_pro("80") # 0x80: 動作状態 :unsigned char: 1 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("89") # 89: 異常内容 :unsigend short: 2 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("97") # 97: 現在時刻設定 :unsigend char: 2 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("98") # 98: 現在年月日設定 :unsigend char: 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("A4") # A4: AC充電可能量 :unsigend long: 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("A5") # A5: AC放電可能量 :unsigend long: 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("A8") # A8: AC積算充電電力量計測値 :unsigend long: 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("A9") # A9: AC積算放電電力量計測値 :unsigend long: 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("CF") # CF: 運転動作状態 :unsigend char: 1 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("D3") # D3: 瞬時充放電電力計測値 :sigend long : 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("DA") # DA: 運転モード設定 :unsigend char: 1 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("E2") # E2: 蓄電残量1 :unsigend long : 4 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    echonet_cmd = echonet_cmd_pro("E4") # E4: 蓄電残量3 :unsigend char : 1 Byte
    await sender.send_msg(GWIP, echonet_cmd) # 動作状態を確認
    await asyncio.sleep(INTERVAL)
    #
    #print("メッセージ送信完了")
    sender.close()
    #await asyncio.sleep(10)
    await asyncio.sleep(7)
    #recv.stop() # これは awaitはつけずに実効する。、
    # UDP_RESES に各機器からのレスポンスが、echonet電文とIPで入ってます
    for udp_res in UDP_RESES:
        echonet_res = parse_echonet_res(udp_res[0])
        #print(udp_res)
        #print(echonet_res)
    await recv.stop()
    #print("end")
    for udp_res in UDP_RESES:
        echonet_res = parse_echonet_res(udp_res[0]) #
        #print(echonet_res[6]) # [6] is epc, [8] is raw hexdata
        match echonet_res[6]:
            case "80": # 動作状態: 0x30 on,  0x31 off
                print("80 start")
                data80 = echonet_res[8]
                print(data80)
            case "89": # 異常状態: 上位1バイト 異常内容小分類,下位1バイト 異常内容大分類
                print("89 start")
                data89 = echonet_res[8]
                print(data89)
            case "97": # 現在時刻:
                print("97 start")
                data97 = str(int(echonet_res[8][:2],16)).zfill(2) + ':' + str(int(echonet_res[8][2:],16)).zfill(2)
                print(data97)
            case "98": # 現在年月日:
                print("98 start")
                data98 = str(int(echonet_res[8][:4],16)) + '-' + str(int(echonet_res[8][4:6],16)).zfill(2) + '-' + str(int(echonet_res
[8][6:],16)).zfill(2)
                print(data98)
            case "a4": # AC充電可能量(Wh): 0x00000000-0x03B9AC9FF = 0 - 999,999,999 Wh
                print("a4:")
                dataa4 = int(echonet_res[8],16)
                print(dataa4)
            case "a5": # AC放電可能量(Wh): 0x00000000-0x03B9AC9FF = 0 - 999,999,999 Wh
                print("a5:")
                dataa5 = int(echonet_res[8],16)
                print(dataa5)
            case "a8": # AC積算充電電力量計測値(0.001kWh): 0x00000000-0x3b9AC9FF = 0 - 999,999.999 kWh)
                print("a8:")
                dataa8 = float(int(echonet_res[8],16)/1000)
                print(dataa8)
            case "a9": # AC積算放電電力量計測値(0.001kWh): 0x00000000-0x3b9AC9FF = 0 - 999,999.999 kWh)
                print("a9:")
                dataa9 = float(int(echonet_res[8],16)/1000)
                print(dataa9)
            case "cf": # 運転動作状態:0x41 急速充電,0x42 充電,0x43 放電,0x44 待機,0x45 テスト,0x46 自動,0x48 再起動,0x49 実効容量再計算処理,0x40 その他
                print("cf:")
                datacf = echonet_res[8]
                print(datacf)
            case "d3": # 瞬時充放電電力計測値:0x0000001-0x3B9AC9FF 1-999,999,999 W(充電時 プラス値), 0xFFFFFFFF - 0xC4653601 - 1 - -999,999,999 W(放電時 マイナス値)
                print("d3:")
                datad3 = int(echonet_res[8],16)
                if datad3 >= 0x8000000:
                    datad3 -= 0x100000000
                print(datad3)
            case "da": # 運転モード設定;0x41 急速充電,0x42 充電,0x43 放電,0x44 待機,0x45 テスト,0x46 自動,0x48 再起動,0x49 実効容量再計算処理,0x40 その他
                print("da:")
                datada = echonet_res[8]
                print(datada)
            case "e2": # 蓄電残量1(Wh):0x00000000 - 0x3B9AC9FF  0-999,999,999 Wh
                print("e2")
                datae2 = int(echonet_res[8],16)
                print(datae2)
            case "e4": # 蓄電残量3(%) :0x00-0x64  0-100 %
                print("e4")
                datae4 = int(echonet_res[8],16)
                print(datae4)
    # for influxdb
    p = Point("hems_027d1f")\
        .tag("location","home")\
        .field("data80",data80)\
        .field("data89",data89)\
        .field("data97",data97)\
        .field("data98",data98)\
        .field("dataa4",dataa4)\
        .field("dataa5",dataa5)\
        .field("dataa8",dataa8)\
        .field("dataa9",dataa9)\
        .field("datacf",datacf)\
        .field("datad3",datad3)\
        .field("datada",datada)\
        .field("datae2",datae2)\
        .field("datae4",datae4)
    write_api.write(bucket=bucket,record=p,write_precision=WritePrecision.S)
def parse_echonet_res(echonet_res):
    res_cols = [echonet_res[0:4],  # echonetであることの宣言
                echonet_res[4:8],  # 自由欄
                echonet_res[8:14],  # SEOJ(送信元機器) 0ef001=ノード
                echonet_res[14:20],  # DEOJ(送信先機器) 05ff01=コントローラ
                echonet_res[20:22],  # 応答code. 71=set 72=get
                echonet_res[22:24],  # 処理プロパティ数
                echonet_res[24:26],  # プロパティ名 d6=自ノードlist.
                echonet_res[26:28],  # PDC. 後のbyte数
                #echonet_res[28:36]]  # 0105ff01 = 1個(01)x05ff01
                echonet_res[28:]   # 0105ff01 = 1個(01)x05ff01
                ]
    return res_cols
def echonet_cmd_pro(property):
    cmd_cols = ["1081",   # echonetであることの宣言
                "0000",   # 自由欄
                "05ff01",  # SEOJ(送信元機器) 05ff01=コントローラ
                #"0ef001",  # DEOJ(送信先機器) 0ef001=ノード
                #"02791f",  # DEOJ(送信先機器) 02791f=太陽光発電?
                #"02877f",  # DEOJ(送信先機器)  0287 =分電盤メータリングクラス。 さて、インスタンスコードは何?
                "027d1f",  # DEOJ(送信先機器)  027d =蓄電池 インスタンスは 1fっぽい
                "62",     # 60=set, 61=set(要:応答), 62=get
                "01",     # 処理プロパティ数
                #"d6",     # プロパティ名 d6=自ノードlist.
                property, #
                "00"]
    echonet_cmd = "".join(cmd_cols)
    return echonet_cmd
class UdpSender():
    def __init__(self, local_ip):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.IPPROTO_IP,
                             socket.IP_MULTICAST_IF,
                             socket.inet_aton(local_ip))
    async def send_msg(self, ip, message):
        msg = bytes.fromhex(message)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self.sock.sendto, msg, (ip, UDP_PORT))
    def close(self):
        self.sock.close()
    @staticmethod
    def find_local_ip_addr(find_iface_name=None):
        for iface_name in netifaces.interfaces():
            iface_data = netifaces.ifaddresses(iface_name)
            af_inet = iface_data.get(netifaces.AF_INET)
            if not af_inet:
                continue
            ip_addr = af_inet[0]["addr"]
            if find_iface_name is None:
                return ip_addr
            elif iface_name == find_iface_name:
                return ip_addr
        return None
class UdpReceiver():
    def __init__(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(("0.0.0.0", UDP_PORT))
        self.sock.settimeout(5.0)
        self.task = asyncio.create_task(self.receive_loop())
        self.running = True
    async def start(self):
        loop = asyncio.get_running_loop()
        self.task = loop.create_task(self.receive_loop())
    async def receive_loop(self):
        loop = asyncio.get_running_loop()
        while self.running:
            try:
                data, addr = await asyncio.get_running_loop().run_in_executor(None, self.sock.recvfrom, 4096)
                global UDP_RESES
                UDP_RESES.append([data.hex(), addr[0]])
                #print(f"データ受信: {data.hex()}, 送信元アドレス: {addr[0]}")
            except socket.timeout:
                print("socket timeout")
                continue
            except (OSError, asyncio.CancelledError):
                break
    async def stop(self):
        self.running = False
        self.sock.close()
        self.task.cancel()
        try:
            await self.task
        except asyncio.CancellerdError:
            pass
if __name__ == '__main__':
    asyncio.run(main())

コメントする