毎月恒例AWSでbot作成
2022年になって早、3か月?4か月?
毎月、更新されているこちらのサイトをみてtwitterのbotを作成しています。
今月もやっていきます。
参考URL
https://aws.amazon.com/jp/builders-flash/202204/aws-drill-twitter-bot-4/?awsf.filter-name=*all
今月は何をやるかというと…
①毎朝、10:00に、6:00~9:59時に呟かれた内容(「うめみ」)を含むツイートを検索
②検索結果をツイート
③検索結果の件数が10件を超えていたら@bot_umeという文字列をつけて通知が飛ぶようにする
④また、自身のメールアドレスに送信
今回勉強したこと
・TwitterAPI
何時~何時までの間につぶやかれた内容を検索
件数だったり、つぶやかれた内容を取得することができる
ex) 6:00~9:00の間に「うめみ」ということをつぶやいた結果を取得
・検索結果をツイート
以前からの内容を使えばできる
・通知を飛ばす
今回あたらしいことかな
・登録したメールアドレスにツイート内容を送信する
AmazonSNSって機能を使えばできる
以下ソースだけど、いろいろ設定しないといけないこと多いね。
import json
import os
from requests_oauthlib import OAuth1Session
from datetime import datetime, timedelta, timezone
import boto3
ssm_client = boto3.client('ssm')
sns_client = boto3.client('sns')
oauth = None
def lambda_handler(event, context):
init()
# 同じ文言のツイートができないので、その対策で年月日を入れておきます。
now_jst = datetime.now(timezone(timedelta(hours=+9), 'JST'))
now_jst_str = now_jst.strftime('%Y年%-m月%-d日')
tweet_text = 'おはようございます、今日は ' + now_jst_str + ' です。'
# ★追加(ここから)
# Twitter API には UTC で渡す必要があるので、UTC のままにしています。
now = datetime.now()
start_time = (datetime.now() + timedelta(hours=-1)).strftime('%Y-%m-%dT%H:00:00Z')
end_time = datetime.now().strftime('%Y-%m-%dT%H:00:00Z')
count = count_matching_tweets('うめみ', start_time, end_time)
tweet_text = tweet_text + '今朝の「うめみ」関連のツイートは ' + str(count) + ' 件でした。'
if 20 < count:
#tweet_text = tweet_text + 'いつもより多めなので気をつけてください。 @ketancho'
tweet_text = tweet_text + 'いつもより多めなので気をつけてください。 @bot_ume'
# ★追加(ここから)
sns_client.publish(
TopicArn='内緒…',
Subject='Twitter Bot からの連絡',
Message = tweet_text
)
# ★追加(ここまで)
tweet(tweet_text)
def init():
response = ssm_client.get_parameter(
Name='/credentials/twitter',
WithDecryption=True
)
twitter_parameters = json.loads(response['Parameter']['Value'])
consumer_key = twitter_parameters['consumer_key']
client_secret = twitter_parameters['client_secret']
access_token = twitter_parameters['access_token']
access_token_secret = twitter_parameters['access_token_secret']
global oauth
oauth = OAuth1Session(consumer_key, client_secret, access_token, access_token_secret)
def tweet(text):
payload = {'text': text}
response = oauth.post(
'https://api.twitter.com/2/tweets',
json=payload,
)
if response.status_code != 201:
raise Exception(
'[Error] {} {}'.format(response.status_code, response.text)
)
def count_matching_tweets(search_word, start_time, end_time):
url = 'https://api.twitter.com/2/tweets/search/recent'
params = {
'query': search_word,
'max_results': 100,
'start_time': start_time,
'end_time': end_time
}
response = oauth.get(
url, params=params
)
json_response = response.json()
print(json_response)
if response.status_code != 200:
raise Exception(
'[Error] {} {}'.format(response.status_code, response.text)
)
return json_response['meta']['result_count']