M5StickC Plus + BP35A1 によるスマートメーターとの動作確認プログラム
BP35A1 によるスマートメーターとの接続シーケンスを理解するために、最低限のステップによる動作確認用のコードを作成しました。
@rukihenaさんの「スマートメーターの情報を最安ハードウェアで引っこ抜く」をベースにさせていただいています。
エラーチェックやタイムアウト処理等はほとんどないので、上手く動かないときは再起動してみてください。コメントアウトの方法さえ、検索しないとわからないレベルですので、稚拙なところもあろうかと思いますがご容赦のほど。
※ Bルートの認証ID(rbid)と認証パスワード(rbpwd)を記載してください
※ BP35A1 初回起動時は「【05】ERXUDPデータ表示形式の設定」
のコメントアウトを外して、ASCIIモードに変更してください。FLASHメモリの書き込み制限があるので、以降は再度コメントアウトしてください。
M5StickC PlusにとBP35A1をカッコよく合体させる Wi-SUN HAT については、こちらを参照してください。とても丁寧に説明されているので、ステップバイステップで組み立てから稼働までたどり着けます。
「M5StickCで家庭用スマートメーターをハックする!」 @norifumi5001 さん
'''
M5StickC Plus + BP35A1 :スマートメーターとの最低限の接続シーケンス
※ 参考:
「スマートメーターの情報を最安ハードウェアで引っこ抜く」@rukihenaさん
https://qiita.com/rukihena/items/82266ed3a43e4b652adb
※ Bルートの認証ID(rbid)と認証パスワード(rbpwd)を記載する
※ BP35A1 初回起動時は「【05】ERXUDPデータ表示形式の設定」
のコメントアウトを外して、ASCIIモードに変更する
FLASHメモリの書き込み制限があるので、以降は再度コメントアウトしておく
※ 動作確認用のため、エラーチェック等、甘めなので注意のこと
'''
# モジュールの読み込み
from m5stack import * # M5Stack標準関数
import sys # システム固有関数
import machine # ハードウェア関連関数
import utime # 時間関連関数
import ure # 簡素な正規表現関数
# 定数・変数
wait = 0 # ウエイト(秒)/初期値0秒/必要に応じて増やしてみてください
rbid = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # Bルート認証ID(東電から郵送)
rbpwd = 'xxxxxxxxxxxx' # Bルート認証パスワード(東電からメール)
line = None
# 関数定義
# OKが返ってくるまで待つ / OKなら True、FAIL なら False を戻す
def wait_for_ok() :
global line
line = None
utime.sleep(wait) # <<< ウエイト
while True : # OK か FAIL が返ってくるまでループ/タイムアウト処理なし
if bp35a1.any() != 0 :
line = bp35a1.readline()
print('<<', line.decode(), end = '')
if line is not None :
if line.decode().startswith('OK') :
return True
elif line.decode().startswith('FAIL') :
return False
# テキストLCD画面出力
def draw_text(text) :
(x, y, w, h) = (0, 61, 240, 20)
lcd.rect(x, y, w, h, 0x000000, 0x000000)
lcd.font(lcd.FONT_DejaVu18)
lcd.print(text, lcd.CENTER, lcd.CENTER, 0xffffff)
########## ここからメイン ##########
# 【00】LCD画面クリア
lcd.clear()
lcd.orient(lcd.LANDSCAPE)
draw_text('Connecting......')
print('【00】== LCD画面クリア OK!')
# 【01】UART 経由で BP35A1 との接続を初期化
bp35a1 = machine.UART(1, tx=0, rx=26)
bp35a1.init(115200, bits=8, parity=None, stop=1, timeout=2000)
print('【01】== BP35A1 接続初期化 OK!')
# 【02】バッファクリア
utime.sleep(wait) # <<< ウエイト
while bp35a1.any():
dust = bp35a1.read()
print('dust3 = ',dust)
bp35a1.write('\r\n')
utime.sleep(wait) # <<< ウエイト
print('【02】== バッファクリア OK!')
# 【03】プロトコル・スタックを初期化
bp35a1.write('SKRESET\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【03】== プロトコル・スタック初期化 OK!')
# 【04】コマンドエコーバックをオンにする
bp35a1.write('SKSREG SFE 1\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【04】== コマンドエコーバック設定 OK!')
# # 【05】ERXUDPデータ表示形式の設定
# # 初期値はバイナリ/FLASHメモリの書き込み制限があるので初回以降はコメントアウトしておく
# bp35a1.write('WOPT 01\r\n') # 00 = バイナリモード、 01 = ASCII モード
# if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
# print('【05】== ERXUDPデータ表示形式設定 OK!')
print('【05】== ERXUDPデータ表示形式設定 SKIP!')
# 【06】ERXUDPデータ表示形式の確認(ASCII or バイナリ)
bp35a1.write('ROPT\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
if ure.match('OK 00', line) :
print('\r\n【05】== echo mode = [Binary Mode]')
elif ure.match('OK 01', line) :
print('\r\n【05】== echo mode = [ASCII Mode]')
# 【07】ファームウェア・バージョン取得:動作確認用/特に必要ない
bp35a1.write('SKVER\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【07】== ファームウェア・バージョン取得 OK!')
# 【08】通信設定値の取得:動作確認用/特に必要ない
# 応答に技術資料アクセスキーが含まれる >> EINFO IDID:0000:0000:0000:PWPW:〜
# https://www.rohm.co.jp/products/wireless-communication/specified-low-power-radio-modules/bp35a1-product
bp35a1.write('SKINFO\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【08】== 通信設定値の取得 OK!')
# 【09】以前のPANAセッションを解除
bp35a1.write('SKTERM\r\n')
wait_for_ok() # OK でも FAIL でも先に進む
print('【09】== 以前のPANAセッションを解除 OK!')
# 【10】Bルート認証パスワード設定
bp35a1.write('SKSETPWD C ' + rbpwd + '\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【10】== Bルート認証パスワード設定 OK!')
# 【11】Bルート認証ID設定
bp35a1.write('SKSETRBID ' + rbid + '\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【11】== Bルート認証ID設定 OK!')
scanDuration = 4; # スキャン時間:0-14 / 4〜7くらいまでが有効値らしい
scanRes = {} # スキャンの結果 (空のdict)
# 【12】スキャンのリトライループ(何か見つかるまで)
while 'Channel' not in scanRes :
# アクティブスキャン(IE あり)を行う
# 時間かかります。10秒ぐらい?
print('>> scanDuration = ', scanDuration)
bp35a1.write('SKSCAN 2 FFFFFFFF ' + str(scanDuration) + '\r\n')
# スキャン1回について、スキャン終了までのループ
scanEnd = False
while not scanEnd :
line = bp35a1.readline()
if line is not None :
if line.decode().startswith('EVENT 22') :
print('>> ScanEnd')
# スキャン終わったよ(見つかったかどうかは関係なく)
scanEnd = True
elif line.decode().startswith(' ') :
# スキャンして見つかったらスペース2個あけてデータがやってくる
# 例
# Channel:39
# Channel Page:09
# Pan ID:FFFF
# Addr:FFFFFFFFFFFFFFFF
# LQI:A7
# PairID:FFFFFFFF
cols = line.decode().strip().split(':')
print('>> PAN status = ', cols)
scanRes[cols[0]] = cols[1]
scanDuration+=1
if (7 < scanDuration) and ('Channel' not in scanRes) :
# 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい
print('スキャンリトライオーバー')
sys.exit() #### 終了 ####
print('【12】== アクティブスキャン OK!')
# 【13】スキャン結果からChannelを設定。
bp35a1.write('SKSREG S2 ' + scanRes['Channel'] + '\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【13】== Channel 設定 OK!')
# 【14】スキャン結果からPan IDを設定
bp35a1.write('SKSREG S3 ' + scanRes['Pan ID'] + '\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【14】== Pan ID 設定 OK!')
# 【15】MACアドレス(64bit)をIPV6リンクローカルアドレスに変換。
bp35a1.write('SKLL64 ' + scanRes['Addr'] + '\r\n')
echo_back = bp35a1.readline().decode() # エコーバック
ipv6Addr = bp35a1.readline().decode().strip()
print('【15】== IPV6リンクローカルアドレス取得 OK! :', ipv6Addr)
# 【16】PANA 接続シーケンスを開始
bp35a1.write('SKJOIN ' + ipv6Addr + '\r\n')
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【16】== 接続シーケンスを開始 OK!')
# 【17】PANA 接続完了待ち(10行ぐらいなんか返してくる)
bConnected = False
while not bConnected :
line = bp35a1.readline()
if line is not None :
if line.decode().startswith('EVENT 24') :
print('【17】== PANA 接続失敗')
sys.exit() #### 終了 ####
elif line.decode().startswith('EVENT 25') :
print('【17】== PANA 接続完了 OK!')
bConnected = True
# 【18】スマートメーターがインスタンスリスト通知を投げてくる
# (ECHONET-Lite_Ver.1.12_02.pdf p.4-16)
print('【18】== インスタンスリスト:', bp35a1.readline().decode(), end = '') # 無視
########## ここから無限ループ ##########
while True:
# ECHONET Lite フレーム作成
# 参考資料
# ・ECHONET-Lite_Ver.1.12_02.pdf (以下 EL)
# ・Appendix_H.pdf (以下 AppH)
# ・「ECHONET Liteの電文 作成方法」@miyazawa_shi さん
# https://qiita.com/miyazawa_shi/items/725bc5eb6590be72970d
echonetLiteFrame = b''
echonetLiteFrame += b'\x10\x81' # EHD (参考:EL p.3-2)
echonetLiteFrame += b'\x00\x01' # TID (参考:EL p.3-3)
# ここから EDATA
echonetLiteFrame += b'\x05\xFF\x01' # SEOJ (参考:EL p.3-3 AppH p.3-408~)
echonetLiteFrame += b'\x02\x88\x01' # DEOJ (参考:EL p.3-3 AppH p.3-274~)
echonetLiteFrame += b'\x62' # ESV(62:プロパティ値読み出し要求) (参考:EL p.3-5)
echonetLiteFrame += b'\x01' # OPC(1個)(参考:EL p.3-7)
echonetLiteFrame += b'\xE7' # EPC(参考:EL p.3-7 AppH p.3-275) <E7> 瞬時電力計測値
echonetLiteFrame += b'\x00' # PDC(参考:EL p.3-9)
# echonetLiteFrame = b'\x10\x81\x00\x01\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x00'
# 【19】コマンドデータ送信
command = 'SKSENDTO 1 {0} 0E1A 1 {1:04X} '.format(ipv6Addr, len(echonetLiteFrame))
bp35a1.write(command)
bp35a1.write(echonetLiteFrame)
if wait_for_ok() is False : sys.exit() # FAIL が返ってきたら ##### 終了 #####
print('【19】== コマンドデータ送信 OK!')
line = bp35a1.readline() # ERXUDPが返ってくるはず
# 受信データはたまに違うデータが来たり、
# 取りこぼしたりして変なデータを拾うことがあるので
# チェックを厳しめにしてます。
if line is not None :
print('>> line =', line.decode(), end = '')
if line.decode().strip().startswith('ERXUDP') :
cols = line.strip().decode().split(' ')
res = cols[8] # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14:14+6]
ESV = res[20:20+2]
#OPC = res[22:22+2]
if seoj == '028801' and ESV == '72' :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
if EPC == 'E7' :
# 内容が瞬時電力計測値(E7)だったら
hexPower = res[-8:] # 最後の4バイト(16進数で8文字)が瞬時電力計測値
intPower = int(hexPower, 16)
print('瞬時電力計測値:{:4d}[W]'.format(intPower))
draw_text('NowPower:{:4d} [W]'.format(intPower))
utime.sleep(5) # インターバル:初期値 = 5秒
# 無限ループなのでここには来ないけど念のため
bp35a1.close()