
PythonとAccessを接続するクラス
クラス化のメリット
・メインプログラムを簡素化できる
クラスの作成に、DBのエンジニアを割り当てる
・他のプログラムでクラスを流用できる
エラーの無いプログラムを利用できる
メインプログラムの標準化・可読性が向上する
・複数のデータを同じ手法で取り扱いできる
別のインスタンスとして取り扱える
・メモリ(インスタンス)上で取り扱いできる
データベースへのアクセスが減る
操作内容
内容
1)テーブルの削除
2)テーブルの作成
3)データの追加
4)データの削除
5)データの更新
6)データの検索
ソースコード
import pyodbc as mydb
# import os
#db_path = os.getenv("DB_PATH", r"Z:\\Database_Master\\eng001_Ver7.accdb")
class DbAccessLib:
"""メンバー変数の登録"""
# db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb" # Path to your database
db_path = r"\\\\landisk-0010\\Youtube\\Database_Master\\eng001_Ver7.accdb" # Path to your database
db_name = "Access"
tb_name = "顧客マスタ"
data_field = []
data_array = []
def __init__(self, db_path=None,db_name=None,tb_name=None):
""" DbAccessLib クラスのコンストラクタ。 """
# デフォルトのデータベースパスを設定
if db_path is None:
db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb"
if db_name is None:
db_name = "Access"
if tb_name is None:
tb_name = "顧客マスタ"
# データベース接続文字列の構築
self.connection_string = (
"Driver={Microsoft Access Driver (*.mdb, *.accdb)};"
f"DBQ={db_path};"
)
self.db_path = db_path
self.db_name = db_name
self.tb_name = tb_name
# Function to get data array from the database
def get_table_data(self, tb_name):
query = "SELECT * FROM " + tb_name + " ;" # SQL query to fetch data
try:
# Connecting to the Access database
with mydb.connect(self.connection_string) as connection:
with connection.cursor() as cursor:
cursor.execute(query)
if cursor.description: # If field names exist
self.data_field = [desc[0] for desc in cursor.description] # Get field names
self.data_array = cursor.fetchall() # Fetch results
return self.data_field, self.data_array, False, "クエリが成功しました" # Return success, no error message
else:
return None, None, False, "結果セットがありません。"
except mydb.Error as err:
return None, None, True, f"データベースエラー: {err}"
except Exception as e:
return None, None, True , f"その他のエラー: {e}"
def dorop_tabale(self,tb_name):
sqlstr = f"DROP TABLE {tb_name} ; "
success, error = self.execute_sql( sqlstr )
return success, error
# Function to add a new record to the database
def add_record(self, Record_fields, Record_data, table_name=None ):
try:
placeholders = ", ".join(["?" for _ in Record_fields])
query = f"INSERT INTO {table_name} ({', '.join(Record_fields)}) VALUES ({placeholders})"
print(query)
# データベース接続
with mydb.connect(self.connection_string) as connection:
with connection.cursor() as cursor:
cursor.execute(query, Record_data) # 実際の値を渡す
connection.commit()
return True, None
except mydb.Error as err:
return False, f"データベースエラー: {err}"
except Exception as e:
return False, f"その他のエラー: {e}"
def delete_record(self, Record_name, Record_id ,tb_name):
# Confirm deletion
try:
query = f"DELETE FROM {tb_name} WHERE {Record_name} = ?"
# Execute the delete query safely using parameters
with mydb.connect(self.connection_string) as connection:
with connection.cursor() as cursor:
cursor.execute(query, (Record_id,)) # Use parameterized query
connection.commit()
return True, None
except mydb.Error as err:
return False, f"データベースエラー: {err}"
except Exception as e:
return False, f"レコードの削除中にエラーが発生しました: {e}"
# Function to update the record in the database
def update_record(self, tb_name, data_fields, data_values):
try:
field_name = data_fields[0]
Record_id = int(data_values[0])
# Create an UPDATE SQL query to modify the record
set_clause = ", ".join([f"{field} = ?" for field in data_fields])
query = f"UPDATE { tb_name } SET {set_clause} WHERE {field_name} = ?"
print(query)
# Connecting to the database
with mydb.connect(self.connection_string) as connection:
with connection.cursor() as cursor:
cursor.execute(query, (*data_values, Record_id)) # Execute the query with updated values
connection.commit()
return True, None
except mydb.Error as err:
return False, f"データベースエラー: {err}"
except Exception as e:
return False, f"その他のエラー: {e}"
def execute_sql(self, sqlstr, params=None):
""" クエリを実行する """
try: # エラー処理
with mydb.connect(self.connection_string) as connection:
with connection.cursor() as cursor:
cursor.execute(sqlstr, params or [])
return True, "SQL成功"
except mydb.ProgrammingError as prog_err:
return False, f"プログラミングエラー: {prog_err}"
except mydb.DatabaseError as db_err:
return False, f"データベースエラー: {db_err}"
except mydb.Error as odbc_err:
return False, f"ODBCエラー: {odbc_err}"
except Exception as e:
return False, f"ODBCエラー: {e}"
def execute_query(self, query, params=None):
""" クエリを実行する """
try: # エラー処理
with mydb.connect(self.connection_string) as connection:
with connection.cursor() as cursor:
cursor.execute(query, params or [])
if cursor.description: # If field names exist
self.data_field = [desc[0] for desc in cursor.description] # Get field names
self.data_array = cursor.fetchall() # Fetch results
return self.data_field, self.data_array, True, "クエリが成功しました" # Return success, no error message
else:
return None, None, True, "結果セットがありません。"
except mydb.ProgrammingError as prog_err:
return None, None, False ,f"プログラミングエラー: {prog_err}"
except mydb.DatabaseError as db_err:
return None, None, False ,f"データベースエラー: {db_err}"
except mydb.Error as odbc_err:
return None, None, False ,f"ODBCエラー: {odbc_err}"
except Exception as e:
return None, None, False ,f"エラー: {e}"
def print_Data_Array(self, data_field=None, data_array=None ):
if data_field == None :
data_field = self.data_field
if data_array == None :
data_array = self.data_array
for item in data_field:
print(item, ":", end="")
print("") # 改行
for row in data_array :
for item in row:
print(item, ":", end="") # 各行を表示
print("") # 改行
""" メインプログラム """
def Data_customer():
data = [
['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号'],
[2001, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'],
[2002, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'],
[2003, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'],
[2004, '町井秀人', '155-0033', '東京都', '練馬区氷川台X-X-X', '', '03-54XX-XXXX'],
[2005, '三井雅人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'],
[2006, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'],
[2007, '須田秀樹', '244-0817', '東京都', '大田区石川町X-X-X', '', '045-82X-XXXX'],
[2008, '駒井よし子', '145-0061', '神奈川県', '横浜市神奈川区新子安X-X-X', '', '03-98XX-XXXX'],
[2009, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'],
[2010, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'],
[2011, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'],
[2012, '町井秀人', '155-0033', '東京都', '練馬区氷川台X-X-X', '', '03-54XX-XXXX'],
[2013, '三井雅人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX'],
[2014, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX'],
[2015, '須田秀樹', '244-0817', '東京都', '大田区石川町X-X-X', '', '045-82X-XXXX'],
[2016, '駒井よし子', '145-0061', '神奈川県', '横浜市神奈川区新子安X-X-X', '', '03-98XX-XXXX'],
[2017, '三田沙也加', '125-0031', '東京都', '練馬区富士見台X-X-X', '', '03-36XX-XXXX'],
[2018, '竹原由美', '177-0034', '神奈川県', '横浜市港北区下田町X-X-X', '下田ビル2F', '03-38XX-XXXX'],
[2019, '林香奈子', '223-0064', '東京都', '世田谷区代田X-X-X', '城田ビル', '045-56X-XXXX'],
[2020, '牧野卓', '252-0331', '東京都', '品川区大井町', '大井ビル', '042-32X-XXXX']
]
data_field = data[0]
data_array = data[1:]
return data_field, data_array
def drop_tabale(db_Lib, tb_name ):
success, error = db_Lib.dorop_tabale(tb_name)
if success:
print(f"{tb_name} を削除しました。")
else:
print(f"エラー: {error}")
def create_tabale_custmer( db_Lib, tb_name ):
# テーブル作成のSQL文
sqlstr = """
CREATE TABLE 顧客マスタ (
id AUTOINCREMENT PRIMARY KEY,
顧客番号 LONG NOT NULL,
顧客名 TEXT,
郵便番号 TEXT,
住所1 TEXT,
住所2 TEXT,
住所3 TEXT,
電話番号 TEXT
);
"""
success, error = db_Lib.execute_sql( sqlstr )
if success:
print("顧客マスタ 作成しました。")
else:
print(f"エラー: {error}")
def insert_data( db_Lib, tb_name ):
data_field, data_values = Data_customer()
for data_value in data_values:
success, error =db_Lib.add_record( data_field, data_value , tb_name )
if success:
print("更新に成功しました。")
else:
print(f"更新にエラー: {error}")
db_Lib.print_Data_Array()
def update_data( db_Lib, tb_name ):
data_fields= ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号']
data_values = [2028, '三井正人', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX']
success, error = db_Lib.update_record( tb_name , data_fields, data_values )
if success:
print("更新に成功しました。")
else:
print(f"更新にエラー: {error}")
def delete_data( db_Lib, tb_name ):
Record_id = '2029'
Record_field = '顧客番号'
success, error = db_Lib.delete_record( Record_field,Record_id, tb_name )
if success:
print("削除に成功しました。")
else:
print(f"削除エラー: {error}")
def add_data( db_Lib, tb_name ):
# 新しいレコードを追加(IDは自動生成されるため指定しない)
data_array = ['2029', '三井かずと', '179-0084', '神奈川県', '相模原市南区大野台X-X-X', '', '03-66XX-XXXX']
data_field= ['顧客番号', '顧客名', '郵便番号', '住所1', '住所2', '住所3', '電話番号']
success, error = db_Lib.add_record( data_field, data_array,tb_name, )
if success:
print("追加に成功しました。")
else:
print(f"追加エラー: {error}")
def select_data( db_Lib, tb_name):
sqlstr = f"SELECT * FROM {tb_name} WHERE 顧客番号 = ? ; "
param = 2016
data_field, data_array, success, error = db_Lib.execute_query(sqlstr, param )
if success:
print("取得に成功しました。")
else:
print(f"取得エラー: {error}")
def get_table_data(db_Lib, tb_name):
data_field, data_array, success, error = db_Lib.get_table_data(tb_name)
if success:
print("取得に成功しました。")
else:
print(f"取得エラー: {error}")
def Main():
db_path = r"Z:\\Database_Master\\eng001_Ver7.accdb" # Path to your database
db_name = "Access"
tb_name = "顧客マスタ"
db_Lib = DbAccessLib(db_path,db_name,tb_name)
# テーブル削除
drop_tabale( db_Lib, tb_name )
# テーブル作成
create_tabale_custmer( db_Lib, tb_name )
# データの追加(初期)
insert_data( db_Lib, tb_name )
# データの追加
add_data( db_Lib, tb_name )
# データの削除
delete_data( db_Lib, tb_name )
# データの更新
update_data( db_Lib, tb_name )
# データの検索
select_data( db_Lib, tb_name)
db_Lib.print_Data_Array()
if __name__ == "__main__":
Main()
解説資料
動画で使用した解説資料をダウンロードできます。
ここから先は
0字
/
1ファイル
¥ 300
この記事が気に入ったらチップで応援してみませんか?