data:image/s3,"s3://crabby-images/33620/33620b0ec24f990e4f59653fd7e8249037b1138f" alt="見出し画像"
[Python]Raspberry Pico wで自動餌やり機作って見た2[Slack通信機能追加]
前回作成したコードに追加した機能とコードの説明をしようと思います。
追加した機能は餌を投入する際、slackに通知を入れるようにしました。
slack通知はIncoming Webhooksを使いました。
ソースコード
main.py
コードの概要
ほぼ前回と同じですが追加したのは6.のslack通信処理です。
Incoming Webhooksで作ったURLとtext_messageを渡すだけで送信できます。
Raspberry Pi Pico wでnetに接続(接続できるまでループ)
ntpサーバから時刻を取得
取得した時刻を変数に年月日時分秒でa~fに格納
時刻取得から変数iで40秒間ループ
ループ中は時刻の判定(指定した時刻、正午)
notifier.send_slack(url, text_message)でSlackへメッセージ送信 New
ループ中は変数の時刻をプログラム内でカウント、繰り上げ(1時間でntpサーバから取得し直す)
from machine import PWM, Pin
import utime
import network
import ntptime
import urequests
import slack
from ssid2 import SSID, PASS,url2
led = machine.Pin('LED',machine.Pin.OUT)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID,PASS)
text_message = '123'
servo1 = PWM(Pin(0))
servo1.freq(50)
max_duty = 65025
dig_0 = 0.0725 #0°
dig_30 = 0.0883 #30°
dig_60 = 0.1042 #60°
dig_90 = 0.12 #90°
dig_m30 = 0.0567 #-30°
dig_m60 = 0.0408 #-60°
dig_m90 = 0.025 #-90°
# Slack通知用の設定
# Wi-FiのSSIDとパスワード
ssid = SSID
password = PASS
notifier = slack.SlackNotifier(ssid, password)
# SlackのWebhook URL
url = url2
while(not wlan.isconnected()):
print('接続中…')
utime.sleep(1.0)
print('接続済')
i=0
_jst = ntptime.time() + 3600 * 9
_tm = utime.localtime(_jst)
a = int(_tm [0])
b = int(_tm [1])
c = int(_tm [2])
d = int(_tm [3])
e = int(_tm [4])
f = int(_tm [5])
while(i<30):
try:
print(str(a) +'年'+ str(b) + '月' + str(c) + '日' + str(d) + '時' + str(e) + '分' + str(f) + '秒')
servo1.duty_u16(int(max_duty*dig_m90))
if d == 2 and e == 00 and f == 00:
print("指定時刻")
servo1.duty_u16(int(max_duty*dig_60))
text_message = 'message=' + 'Rasberry Pi Pico W NOWTIME:' + str(a) +'年'+ str(b) + '月' + str(c) + '日' + str(d) + '時' + str(e) + '分' + str(f) + '秒'
print(text_message)
notifier.send_slack(url, text_message)
if d == 12 and e == 00 and f == 00:
print("正午なら")
servo1.duty_u16(int(max_duty*dig_60))
text_message = 'message=' + 'Rasberry Pi Pico W NOWTIME:' + str(a) +'年'+ str(b) + '月' + str(c) + '日' + str(d) + '時' + str(e) + '分' + str(f) + '秒'
print(text_message)
notifier.send_slack(url, text_message)
led.value(1)
utime.sleep(0.5)
led.value(0)
utime.sleep(0.5)
i += 1
f += 1
if f >=60:
print("秒繰り上げ")
f = 00
e +=1
if e>= 60:
print("時間を校正します")
_jst = ntptime.time() + 3600 * 9
_tm = utime.localtime(_jst)
a = int(_tm [0])
b = int(_tm [1])
c = int(_tm [2])
d = int(_tm [3])
e = int(_tm [4])
f = int(_tm [5])
except:
machine.idle()
slack.py
main.pyのnotifier.send_slack(url, text_message)で使用します。
このコードは参考文献のslack.pyをそのまま拝借しました。
(変更を加えたところはtimeをutimeに変更しただけです…)
import utime
import network
import machine
from machine import Pin
import urequests
import ujson
class SlackNotifier:
def __init__(self, ssid, password):
self.ssid = ssid
self.password = password
self.connected = self._connect_to_wifi(ssid, password)
def _connect_to_wifi(self, ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect(ssid, password)
# 接続が完了するまで待機
while not wlan.isconnected():
utime.sleep(1)
print('Network config:', wlan.ifconfig())
return wlan.isconnected()
def _send_request_with_retry(self, url, message, headers, max_retries=3, timeout=10):
retries = 0
while retries < max_retries:
try:
res = urequests.post(
url,
data=ujson.dumps(message).encode("utf-8"),
headers=headers,
timeout=timeout
)
return res
except OSError as e:
print("Error:", e)
print("Retrying... (attempt {}/{})".format(retries + 1, max_retries))
retries += 1
utime.sleep(1)
return None
def send_slack(self, url, message_text):
if not self.connected:
# 途中で切断した場合は再接続を試みる
print("Trying to reconnect...")
self.connected = self._connect_to_wifi(self.ssid, self.password)
if not self.connected:
print("Failed to reconnect to Wi-Fi, cannot send Slack message.")
return
# jsonデータで送信するという事を明示的に宣言
header = {'Content-Type': 'application/json'}
# 送信するメッセージ
message = {"text": message_text}
# HTTPリクエストをPOSTとして送信
res = self._send_request_with_retry(url, message, header)
if res is not None:
print("Request successful:", res.status_code)
res.close()
else:
print("Request failed after retries.")
実際に動くところは前回と同じで、slackに通知を追加しています。
参考動画(前回のものです)
今後の課題
できたこと
とりあえず動いた(^_^;)
pythonで少し実用的なものができた
スラックで通知ができた New
改善したいこと
サーボモータの電圧を安定させるためにはコンデンサが必要
Raspberry Pi Pico w単体(PCなし)で稼働させる
追加したい機能
温度センサを取り付けるメールかスラックで通知する
カメラで確認できるようにする
時間ではなくスラックやメールの通知でサーボモータを動かせるようにする
あとがき
前回、コードの説明を今回でする予定でしたが、slackの通信ができそうだったので先に追加しました。(^_^;)
次回こそコードの説明回にしようとおもいます。(^_^;)