リファクタリング的な作業(3)
(Python学習初心者の試行錯誤・備忘録です)
前回
で、AIのCopilot先生からは、
・SQLでテーブル名や要素名を指定するのにプレースホルダは使えない
・「(pythonの)f-stringを使ってテーブル名をSQL文に直接埋め込」むことはできるが、「SQLインジェクションのリスクを高める可能性がある」
とアドバイスがありました。
ならば無理せず、テーブル名は固定でいい、と昨日の段階では思っていました。が、実際に始めて見るとSQL文中に「 t_cards 」と直書きのテーブル名が何度も出てくることになって、どうも嫌だなと感じます。後でちょっと変えたくなったとき大変でしょう。(それを言っちゃうと要素名はどうするんだとも思いますが、例えば要素名はほぼ共通ながらテーブル名だけ変えるようなケースもあるかもしれないので・・)
これって、「ユーザーの入力がSQL文に直接組み込まれるような場合」がまずいのであって、「自分で定義したテーブル名が入るような場合は問題ないよね?」と思うに至り、あらためて試してみることにしました。
mvctest_model.py
import sqlite3
class Cards:
#データベースのt_cardテーブル
def __init__(self, mydb="mydb.sqlite3",table="t_cards") -> None:
self.mydb = mydb
self.table = table
#テーブルがなければ作成する。
self.createsql = f""" CREATE TABLE IF NOT EXISTS {self.table} (
id INTEGER PRIMARY KEY,
face TEXT NOT NULL,
back TEXT,
level INTEGER DEFAULT 0,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP)
"""
self._create_table()
def _create_table(self):
with sqlite3.connect(self.mydb) as con:
cur = con.cursor()
cur.execute(self.createsql)
con.commit()
改めてテスト
#テストコード
def main():
mycards1 = Cards()
mycards2 = Cards("db2.sqlite3")
mycards3 = Cards("db3.sqlite3","t_anothercards")
if __name__ == "__main__":
main()
db2, db3のファイルファイル、t_anothercards テーブル、などなど自動生成されました。大丈夫そうです。( anothercards 単・複が変かな?(^_^;) )
ではDB操作の機能を追加していきます。
データの取り込み
まず、DBは最初にデータを入れておかなければ話が始まりません。
・リスト形式でデータを追加する関数 loadfromlist(datalist:list)
・CSVファイルからデータを読み込んで追加する関数 loadfromcsv(file)
・テーブルをクリアする関数cleartable() も用意します。
(抜粋)
def loadfromcsv(self, csvfilename) -> None:
with sqlite3.connect(self.mydb) as con:
cur = con.cursor()
with open(csvfilename, newline='', encoding='utf-8') as csvfile:
csvreader = csv.reader(csvfile, delimiter = ",")
for row in csvreader:
con.execute(f"INSERT INTO {self.table}(face, back) VALUES(?,?)",
(row[0],row[1]))
con.commit()
def loadfromlist(self, datalist:list) -> None:
with sqlite3.connect(self.mydb) as con:
cur = con.cursor()
for row in datalist:
con.execute(f"INSERT INTO {self.table}(face, back) VALUES(?,?)",
(row[0],row[1]))
con.commit()
def cleartable(self)-> None:
with sqlite3.connect(self.mydb) as con:
cur = con.cursor()
cur.execute(f"DELETE FROM {self.table}")
con.commit()
テストします。
#テストコード
def main():
mycards1 = Cards()
mylist =[("opinion","意見"),("environment","環境"),("tax","税"),("degree","程度"),("response","回答"),]
mycards1.loadfromlist(mylist)
mycards2 = Cards("mydb2.sqlite3")
mycards2.loadfromcsv("eng0430.csv")
if __name__ == "__main__":
main()
リストでもCSVも読み込めていることが確認できました。デフォルトと違うDBファイル名(mydb2.sqlite3)を指定することもできています。
mycards2 = Cards("mydb2.sqlite3")
mycards2.cleartable()
テーブルの中身のクリアもできました。
SELECTの基本
データのセットが取り込めたところで、次はデータを抽出する関数ですが、ここでまたCopilotに聞いてみました。(複数の結果を取得する場合)「辞書型で結果を取れるか」と。そしたらこんな情報が。
import sqlite3
# データベースに接続
con = sqlite3.connect('mydb.sqlite3')
con.row_factory = sqlite3.Row # 行を辞書のように扱うための設定
# カーソルを作成
cur = con.cursor()
# SELECT文を実行
cur.execute("SELECT * FROM my_table WHERE id = ?", (1,))
# 1行のデータを取得
row = cur.fetchone()
# rowがNoneでないことを確認(結果がある場合)
if row is not None:
# 辞書型としてデータにアクセス
data_dict = dict(row)
print(data_dict) # 例: {'id': 1, 'name': 'Alice', 'age': 30}
# データベース接続を閉じる
con.close()
・・・だそうです。いいことを聞きました。一行読んではいちいち
self.id, self.hanzi, self.pynyin, self.level, self.timestamp = temprow
みたいな面倒なことをやらなくていいわけですね。
Cardsクラスに新しいメンバ関数を追加
def select(self):
with sqlite3.connect(self.mydb) as con:
con.row_factory = sqlite3.Row # 行を辞書のように扱うための設定
cur = con.cursor()
cur.execute(f"SELECT * FROM {self.table} LIMIT 1")
row = cur.fetchone()
#rowがNoneのときdict(row)はNoneではなく空の辞書型を返すので
#Noneを返したければ次のようにする必要がある。
if row is not None:
self.topcard = dict(row)
else:
self.topcard = None
テストします。
#テストコード
def main():
mycards = Cards()
mycards.select()
if mycards.topcard is not None:
print(mycards.topcard)
if __name__ == "__main__":
main()
結果
{'id': 1, 'face': 'opinion', 'back': '意見', 'level': 0, 'timestamp': '2024-05-06 08:50:18'}
ちゃんと取れています。この仕様はイイですね。要素名もそのまま取れる。すごく扱いが楽になります。
なお、ここで作ったtopcard という辞書型のメンバ変数は、何かの条件でならび変えた際に一番上にきたカード、といったイメージです。
SELECT応用
元のコード(リファクタリング的なことを始める前のコード)を見ていくと、SQLのSELECT文を使っているところは
SELECT COUNT(*)
・トータルのデータ数を求める。
・レベル、0,1,2,3のデータ数を求める
SELECT 要素
・WHEREとORDER BYで優先順位を設定して最優先の要素を持ってくる
ですね。
"SELECT COUNT(*) FROM t_shengci"
"SELECT COUNT(*) FROM t_shengci WHERE level = 0"
"SELECT COUNT(*) FROM t_shengci WHERE level = 1"
"SELECT COUNT(*) FROM t_shengci WHERE level = 2"
"SELECT COUNT(*) FROM t_shengci WHERE level = 3"
"SELECT id, hanzi, pinyin, level, timestamp FROM t_shengci \
ORDER BY level ASC, timestamp ASC"
といった内容です。COUNTに関してはぱっと見、WHERE以外全部同じなのに、「くどい」なと思ってCopilotに改善策を尋ねたら、こんなコードをくれました。
def select_where(self, condition, *params):
with sqlite3.connect(self.mydb) as con:
cur = con.cursor()
query = f"SELECT * FROM {self.table} WHERE {condition}"
cur.execute(query, *params)
return cur.fetchall()
呼び出し方の例を聞くと
cards = Cards()
result = cards.select_where("level >= ?", (1,))
result = cards.select_where("face = ?", ("King",))
といった例を出してくれました。パラメータはタプル渡し。AIほんとすごいですね。Cardsクラスだから、トランプカードだと思われて "King"とか出てきたのでしょうね。単語カードなんですけどね。
さて、自分の用途を考えるとfetchall より fetchone が良さそう。まず、カウントを求める関数から。
def select_count_where(self,condition, *params):
with sqlite3.connect(self.mydb) as con:
#SELECT COUNT(*)なので返るのは1行、 LIMIT不要。
#結果は必ずあるからNoneの場合分けも不要
cur = con.cursor()
query = f"SELECT COUNT(*) FROM {self.table} WHERE {condition}"
cur.execute(query, *params)
return(cur.fetchone()[0])
データベースファイルが無い状態からのテスト
#テストコード
def main():
mylist =[("opinion","意見"),("environment","環境"),("tax","税"),("degree","程度"),("response","回答"),]
mycard = Cards()
mycard.loadfromlist(mylist)
print(mycard.select_count_where("level = 0"))
print(mycard.select_count_where("level = 1"))
print(mycard.select_count_where("level = ?",(0,)))
print(mycard.select_count_where("level = ?",(1,)))
if __name__ == "__main__":
main()
結果は
5
0
5
0
直書きでもプレースホルダでもばっちりです。
いいですね、気分が乗ってきました。リファクタリングハイ(?(笑))