Snowflake用Snowpark・Python
こんにちは。コグラフSSD−2事業部のルイスです。
Snowflakeはクラウドベースのデータウェアハウスで、データサイエンティスト、エンジニア、アナリストの間で非常に人気です。
Snowparkは、開発者が好みの言語(Python、Scala、Java)でコードを書き、そのコードをSnowflake上で直接実行できるようにする、Snowflakeの新しい開発者エクスペリエンスです。
この記事では、Pythonを使ってSnowflake Snowparkに接続する方法を紹介します。この記事の指示に従うには、Snowflakeのアカウントが必要です。
事前準備
Python 3.8以上のPython環境がインストールされていることとSnowflakeのアカウントと認証情報が必要です。
インストール
Snowflake コネクタは、pip / conda を使用してインストールします。
pip install snowflake-snowpark-python
接続
インストールしたばかりの Snowflake コネクタライブラリをインポートします。また、Snowparkの関数を'F'としてインポートします。なぜ'F'が必要かというと、データに対するすべての変換はSnowflakeクラスターによって処理され、データセットに変換を適用するには、適切なSnowflake関数を呼び出す必要があるからです。
#!/usr/bin/env python
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
セッション作成
次にSnowparkのセッションオブジェクトを作成します。ユーザー名、パスワード、アカウント情報を適切な場所に挿入してください。Snowflakeアカウント情報は通常URLの一部になります。
def snowpark_session_create():
connection_params = {
"account":"insert-your-account-here",
"user":"insert-username-here",
"password":"insert-password-here",
"role":"insert-role-here",
"warehouse":"insert-warehouse-here"
}
session = Session.builder.configs(connection_params).create()
return session
demo_session = snowpark_session_create()
データ取得
接続が完了したら、(try/catchブロック内で)Snowparkステートメントを実行し、結果セットをpandasデータフレームに代入します。
ここでは、サンプル・データ・データベースのTPCHにあるCUSTOMERテーブルを使用します。この例はmediumというホームサイトから取られています。テーブルには、カラム名C_CUSTKEY、C_NAME、C_ADDRESS、C_NATIONALITY、C_PHONE、 C_ACCTBAL、C_MKTSEGMENT、 C_COMMENTがあります。
テーブルからすべての情報を選択し、別のデータフレームを作成します。
このデータフレームは、C_MKTSEGMENT列ですべての情報をグループ化し、C_ACCTBAL列の合計となるSUBTOTALという新しい列を作成します。
さらに、列のSUBTOTALの合計を作成し、後でパーセント計算するため利用します。それを処理した後、データをpandasのデータフレームに変換し、合計を変数として差し引きます。最後に、合計を変数として"percentage"という新しい列を作成し、データフレームの内容を表示し、コネクターを閉じます。
コードは以下の通りです。
try:
df = demo_session.sql("select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.CUSTOMER LIMIT 10000000")
df2 = df.group_by("C_MKTSEGMENT").agg(F.sum("C_ACCTBAL").alias("SUBTOTAL"))
acct_bal_total = df2.agg(F.sum("SUBTOTAL").alias("SUBTOTAL"))
pdf = acct_bal_total.toPandas()
total_sum = pdf.iloc[0,0]
df2 = df2.with_column("percentage",(df2["SUBTOTAL"]*100)/total_sum)
df2 = df2.sort(F.col("percentage").desc())
df2.show()
finally:
#closing the connection
demo_session.close()
作成されたデータフレームはこのようなテーブルです。
まとめ
Snowpark Pythonは、Snowflakeオブジェクトテーブルを使用し、異なるタイプのデータを処理するためにPandasオブジェクトに変換することができます。このツールを使えば、データ分析ができ、PythonでSnowflakeデータをつなげて様々なモデルを作成することもできます。