As I Please

MTのいんすとーるの練習と、その他びぼうろく・・・

タグ「東電」が付けられているもの


スマートメーター HEMS デバイススキャン for python3

以前書いた、HEMS 機器からのデータの取り出し関連で、とても参考にさせてもらった スマートメーターの情報を最安ハードウェアで引っこ抜くをベースにプログラムを書いた。
このサンプルプログラムはpython2 でのプログラムだったので、今後のためにもpython3 用に書き直しておいたほうが良かったので、作成してみた。例示してあるプログラムはデバイスを探索してデータを取得するのだが、定期的にデータを取得して時系列データを作成するのであれば、デバイス探索は毎回繰り返さなくても良い(スマートメーターのサイトローカルのIPv6アドレスで特定できる)ので、(1)デバイスを探索し、アクセスするための諸元データを取得する。(2)データを定期的に取得する。の2つにわけてしまうほうが良さそうで、それを実施してみた。 まずは (1)の python3 に自分なりに書き換えたものを。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import serial
import time
# Bルート認証ID(東京電力パワーグリッドから郵送で送られてくる)
rbid  = "AAAABBBBCCCCDDDD"
# Bルート認証パスワード(東京電力パワーグリッドからメールで送られてくる)
rbpwd = "PASSWORDDDDDDD"
# シリアルポートデバイス名
#serialPortDev = 'COM3'  # Windows の場合
serialPortDev = '/dev/ttyUSB0'  # Linux(ラズパイなど)の場合
#serialPortDev = '/dev/cu.usbserial-A103BTPR'    # Mac の場合
#
ser = serial.Serial(serialPortDev, 115200)
#
wcom = "SKVER\r\n"
ser.write(str.encode(wcom))
print(ser.readline().decode('utf-8'), end="",flush=True) # エコーバック
print(ser.readline().decode('utf-8'), end="",flush=True) # バージョン
#
# Bルート認証パスワード設定
wcom = "SKSETPWD C " + rbpwd + "\r\n"
ser.write(str.encode(wcom))
ser.readline() # エコーバック
# Bルート認証ID設定
wcom = "SKSETRBID " + rbid + "\r\n"
ser.write(str.encode(wcom))
print(ser.readline().decode('utf-8'), end="",flush=True) # エコーバック
ser.readline()
print(ser.readline().decode('utf-8'), end="",flush=True) # OKが来るはず(チェック無し)
ser.readline()
scanDuration = 4;   # スキャン時間。サンプルでは6なんだけど、4でも行けるので。(ダメなら増やして再試行)
scanRes = {} # スキャン結果の入れ物
# スキャンのリトライループ(何か見つかるまで)
#while not scanRes.has_key("Channel") :  python2
while not 'Channel' in scanRes :
        # アクティブスキャン(IE あり)を行う
        # 時間かかります。10秒ぐらい?
        #ser.write(str.encode("SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"))
        wcom = "SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"
        ser.write(str.encode(wcom))
        # スキャン1回について、スキャン終了までのループ
        scanEnd = False
        while not scanEnd :
            line = ser.readline()
            print(line.decode(), end="",flush=True)
#
            line = line.decode()
            if line.startswith("EVENT 22") :
#                # スキャン終わったよ(見つかったかどうかは関係なく)
                scanEnd = True
            elif line.startswith("  ") :
                # スキャンして見つかったらスペース2個あけてデータがやってくる
                # 例
                #  Channel:39
                #  Channel Page:09
                #  Pan ID:FFFF
                #  Addr:FFFFFFFFFFFFFFFF
                #  LQI:A7
                #  PairID:FFFFFFFF
                cols = line.strip().split(':')
                scanRes[cols[0]] = cols[1]
        scanDuration+=1
        if 7 < scanDuration and not scanRes.has_key("Channel"):
            # 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい
            print("スキャンリトライオーバー")
            sys.exit()  #### 糸冬了 ####
これを実行すると次のような結果が出てくる。
raspberrypi:~/smartmeter3# ./test-bs_scan.py
SKVER
EVER 1.2.8
SKSETPWD C PASSWORDDDDDDD
SKSETRBID AAAABBBBCCCCDDDD
SKSCAN 2 FFFFFFFF 4
OK
EVENT 20 FE80:0000:0000:0000:1234:5678:9ABC:DEF0
EPANDESC
  Channel:33
  Channel Page:09
  Pan ID:24EF
  Addr:00FFEEDDCCBBAA99
  LQI:32
  PairID:12345678
EVENT 22 FE80:0000:0000:0000:1234:5678:9ABC:DEF0
Addr は64bit、これを IPv6のリンクローカルアドレスに変換するには SKLL64 コマンドを使う、ようだが 単純に、MACアドレス 48bit を、EUI-64で変換する場合の、'FF:FE'の挿入をやめただけのよう。先頭の7bit目を反転させて、後半64bit の IPv6 アドレスにしてしまえば良いみたい。例の 'Addr:00FFEEDDCCBBAA99' なら、IPv6アドレスは 'FE80:0000:0000:0000:02FF:EEDD:CCBB:AA99' になる、と思われる。
これで取得できた Channel, Channel Page, Pan ID,PairID, 生成した IPv6アドレスを利用して定期的なデータ取得 scriptを動かす。

スマートメーター HEMS デバイススキャン for python3

以前書いた、HEMS 機器からのデータの取り出し関連で、とても参考にさせてもらった スマートメーターの情報を最安ハードウェアで引っこ抜くをベースにプログラムを書いた。
このサンプルプログラムはpython2 でのプログラムだったので、今後のためにもpython3 用に書き直しておいたほうが良かったので、作成してみた。例示してあるプログラムはデバイスを探索してデータを取得するのだが、定期的にデータを取得して時系列データを作成するのであれば、デバイス探索は毎回繰り返さなくても良い(スマートメーターのサイトローカルのIPv6アドレスで特定できる)ので、(1)デバイスを探索し、アクセスするための諸元データを取得する。(2)データを定期的に取得する。の2つにわけてしまうほうが良さそうで、それを実施してみた。 まずは (1)の python3 に自分なりに書き換えたものを。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import serial
import time
# Bルート認証ID(東京電力パワーグリッドから郵送で送られてくる)
rbid  = "AAAABBBBCCCCDDDD"
# Bルート認証パスワード(東京電力パワーグリッドからメールで送られてくる)
rbpwd = "PASSWORDDDDDDD"
# シリアルポートデバイス名
#serialPortDev = 'COM3'  # Windows の場合
serialPortDev = '/dev/ttyUSB0'  # Linux(ラズパイなど)の場合
#serialPortDev = '/dev/cu.usbserial-A103BTPR'    # Mac の場合
#
ser = serial.Serial(serialPortDev, 115200)
#
wcom = "SKVER\r\n"
ser.write(str.encode(wcom))
print(ser.readline().decode('utf-8'), end="",flush=True) # エコーバック
print(ser.readline().decode('utf-8'), end="",flush=True) # バージョン
#
# Bルート認証パスワード設定
wcom = "SKSETPWD C " + rbpwd + "\r\n"
ser.write(str.encode(wcom))
ser.readline() # エコーバック
# Bルート認証ID設定
wcom = "SKSETRBID " + rbid + "\r\n"
ser.write(str.encode(wcom))
print(ser.readline().decode('utf-8'), end="",flush=True) # エコーバック
ser.readline()
print(ser.readline().decode('utf-8'), end="",flush=True) # OKが来るはず(チェック無し)
ser.readline()
scanDuration = 4;   # スキャン時間。サンプルでは6なんだけど、4でも行けるので。(ダメなら増やして再試行)
scanRes = {} # スキャン結果の入れ物
# スキャンのリトライループ(何か見つかるまで)
#while not scanRes.has_key("Channel") :  python2
while not 'Channel' in scanRes :
        # アクティブスキャン(IE あり)を行う
        # 時間かかります。10秒ぐらい?
        #ser.write(str.encode("SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"))
        wcom = "SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"
        ser.write(str.encode(wcom))
        # スキャン1回について、スキャン終了までのループ
        scanEnd = False
        while not scanEnd :
            line = ser.readline()
            print(line.decode(), end="",flush=True)
#
            line = line.decode()
            if line.startswith("EVENT 22") :
#                # スキャン終わったよ(見つかったかどうかは関係なく)
                scanEnd = True
            elif line.startswith("  ") :
                # スキャンして見つかったらスペース2個あけてデータがやってくる
                # 例
                #  Channel:39
                #  Channel Page:09
                #  Pan ID:FFFF
                #  Addr:FFFFFFFFFFFFFFFF
                #  LQI:A7
                #  PairID:FFFFFFFF
                cols = line.strip().split(':')
                scanRes[cols[0]] = cols[1]
        scanDuration+=1
        if 7 < scanDuration and not scanRes.has_key("Channel"):
            # 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい
            print("スキャンリトライオーバー")
            sys.exit()  #### 糸冬了 ####
これを実行すると次のような結果が出てくる。
raspberrypi:~/smartmeter3# ./test-bs_scan.py
SKVER
EVER 1.2.8
SKSETPWD C PASSWORDDDDDDD
SKSETRBID AAAABBBBCCCCDDDD
SKSCAN 2 FFFFFFFF 4
OK
EVENT 20 FE80:0000:0000:0000:1234:5678:9ABC:DEF0
EPANDESC
  Channel:33
  Channel Page:09
  Pan ID:24EF
  Addr:00FFEEDDCCBBAA99
  LQI:32
  PairID:12345678
EVENT 22 FE80:0000:0000:0000:1234:5678:9ABC:DEF0
Addr は64bit、これを IPv6のリンクローカルアドレスに変換するには SKLL64 コマンドを使う、ようだが 単純に、MACアドレス 48bit を、EUI-64で変換する場合の、'FF:FE'の挿入をやめただけのよう。先頭の7bit目を反転させて、後半64bit の IPv6 アドレスにしてしまえば良いみたい。例の 'Addr:00FFEEDDCCBBAA99' なら、IPv6アドレスは 'FE80:0000:0000:0000:02FF:EEDD:CCBB:AA99' になる、と思われる。
これで取得できた Channel, Channel Page, Pan ID,PairID, 生成した IPv6アドレスを利用して定期的なデータ取得 scriptを動かす。