Apache Sparkプログラミングを勉強してみる
今回はApache Sparkについてブログを書きます。クラウドでデータ活用基盤を構築していくのに必須の技術となっています。私自身も数年前に始めましたが、復習のつもりで勉強したことを書いてみました。今回は私が経験のあるAWS Glueを使ってサンプルを作成してみました。Sparkを学びたい人だけでなく、これからAWSでデータ活用基盤を構築する人にも有用になっています。それでは始めます。
1. Apache Sparkの概要
Sparkとは何か
Apache Sparkは、ビッグデータを効率的に処理するためのオープンソースの分散コンピューティングフレームワークです。大量のデータを分散処理することで、処理時間を大幅に短縮し、リアルタイムの分析や機械学習の実行を可能にします。SparkはAWS GlueやDatabricks、Amazon EMRやSnowflakeなどさまざまなサービスで利用、連携可能となっています。
Sparkの重要性と用途
Sparkは、ビッグデータの分析やETL(抽出、変換、ロード)処理、機械学習、ストリーミングデータ処理など、様々な用途で利用されています。特に、データの処理速度が速く、スケーラビリティに優れている点が特徴です。また、Databricksではバックグラウンドで自動的にセットアップされていたり、AWS GlueではGlueContextにラップされていたりして、比較的簡単に扱うことが出来ます。
ブログの目的と学習目標
本ブログでは、Pythonでの開発経験はあるが、Apache Sparkは初めてという方を対象に、Sparkの基礎から始め、AWS Glueを使用した実践的な開発方法までを学んでいきます。また、単体テストとカバレッジ取得の方法についても触れていきます。
2. Sparkのアーキテクチャと基本概念
主要コンポーネント
ドライバー: アプリケーション全体を管理し、タスクの分散を行います。
エグゼキューター: ドライバーから受け取ったタスクを実行し、結果を返します。
クラスターマネージャー: クラスター全体を管理し、リソースの割り当てを行います。
RDDの概念
RDD(Resilient Distributed Dataset)は、Sparkの基本的なデータ構造で、分散環境での耐障害性と並列処理を実現します。
DataFrameとDatasetの紹介
DataFrame: 構造化データを扱うための高レベルなAPIで、SQLのような操作が可能です。
Dataset: 型安全な操作を提供するDataFrameの拡張です。特に、強い型チェックが必要な場合に使用します。
3. Spark開発環境のセットアップ
AWS Glueを利用したSpark環境の構築
AWS Glueはサーバーレスのデータ統合サービスで、簡単にSpark環境を構築できます。以下の手順でセットアップを行います。
AWS Glue用のIAM Roleを作成します。
S3からのデータの読み書き、CloudWatchLogsへのログの出力ができるようにします。
AWS Management ConsoleでGlue Studioを開きます。
ジョブの作成からSparkを選択し、PySparkコードを実行します。
デフォルトのPythonコード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()
新たなツールやライブラリのインストールは一旦不要です。
4. データ取り込みの実践(抽出)
AWS GlueでPySparkを使ったCSVファイルの読み込み
Glueのデータカタログを作成し、データソースを定義します。次に、PySparkコードでCSVファイルを読み込みます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# CSVファイルの読み込み
df = spark.read.csv("s3://spark-test-XXXX-bucket/sample_spark_data.csv", header=True, inferSchema=True)
# データフレームの表示
df.show()
job.commit()
初期状態から以下の部分のコードを追加しただけである
# CSVファイルの読み込み
df = spark.read.csv("s3://spark-test-XXXX-bucket/sample_spark_data.csv", header=True, inferSchema=True)
# データフレームの表示
df.show()
説明:
SparkSession: Sparkで作業する際に必須のエントリーポイントです。このセッションを使って、データの読み書きやクエリの実行を行います。AWS GlueではGlueContextにラップしているのでそれを利用します。
spark.read.csv: csvメソッドを使用してCSVファイルを読み込みます。header=TrueでCSVの最初の行をカラム名として扱い、inferSchema=Trueでデータ型を自動推定します。
5. データクレンジングの手法(変換)
欠損値の処理
データに欠損値が含まれている場合、これを処理するための方法を紹介します。
# 欠損値を特定の値で埋める
df_clean = df.na.fill({"column_name": "default_value"})
# 欠損値を持つ行を削除する
df_clean = df_clean.na.drop()
説明:
na.fill: 欠損値(null)を指定した値で置き換えます。辞書形式で列名とその値を指定します。
na.drop: 欠損値を含む行を削除します。
重複データの削除
データに重複がある場合、その削除方法を示します。
# 重複行を削除
df_clean = df_clean.dropDuplicates()
説明:
dropDuplicates: データフレーム内の重複行を削除します。
列名の変更や型変換
Sparkでは、列名の変更やデータ型の変換も簡単に行えます。
# 列名の変更
df_clean = df_clean.withColumnRenamed("old_column_name", "new_column_name")
# 型変換
from pyspark.sql.functions import col
df_clean = df_clean.withColumn("column_name", col("column_name").cast("Integer"))
説明:
withColumnRenamed: 特定の列の名前を変更します。
cast: 指定した型に変換する際に使用します。この例ではInteger型に変換しています。
集計の実施
# カラムの平均を計算
df_clean.groupBy("group_column").agg({"value_column": "avg"}).show()
説明:
groupBy: 特定の列を基にグループ化し、集計関数を適用できます。この例では、value_columnの平均値を計算しています。
6. データ出力(ロード)の方法
処理済みデータをCSV形式で保存
AWS Glueを使用してデータをS3に出力します。
df_clean.write.csv("s3://your-bucket/cleaned-data.csv", header=True)
説明:
write.csv: SparkデータフレームをCSV形式でS3に保存します。header=Trueでヘッダー行を含めます。
7. まとめ AWS GlueでCSVファイルを読み込んで、少し加工して、CSVファイルとしてデータ出力(ロード)を行う
それでは今までの知識を組み合わせて、AWS GlueよりS3バケットからCSVファイルを読み込んで、少し加工(クレンジング)して、S3バケットの違うCSVファイルにデータ出力を行うプログラムを組んでみましょう。
ソースコードはこうなります
CSVを読み込み、データを加工して、他のファイルに出力するソースコード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit, col
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# CSVファイルの読み込み
df = spark.read.csv("s3://spark-test-XXXX-bucket/sample_spark_data.csv", header=True, inferSchema=True)
# データフレームの表示
df.show()
# 列の値の変更
df_clean = df.withColumn("Department", lit('会社員'))
# 列名の変更
df_clean = df_clean.withColumnRenamed("Department", "Salaryman")
# 列の挿入
df_clean = df_clean.withColumn("Nationary", lit('アメリカ人'))
# 型変換
df_clean = df_clean.withColumn("Salary", col("Salary").cast("Integer"))
# カラムの平均を計算
df_clean.groupBy("Occupation").agg({"Salary": "avg"}).show()
# データロード
df_clean.write.csv("s3://spark-test-XXXX-bucket/cleaned-sample_spark_data", header=True)
job.commit()
lit関数が必要な理由
SparkのDataFrame操作は、通常、列を基にした操作に特化しています。litを使わずに定数を挿入しようとするとエラーが発生することがあります。litを使用することで、Sparkが定数をDataFrameの操作対象として認識できるようになります。
データ加工前のCSVはこちらです
Name,Age,Occupation,Salary,Department
Alice,25,Engineer,70000,IT
Bob,30,Doctor,120000,Healthcare
Charlie,35,Artist,50000,Arts
David,40,Lawyer,90000,Law
Eva,28,Scientist,85000,Research
Peter,28,Engineer,100000,IT
Haidi,32,Engineer,130000,IT
実行すると次のようになります。
データ加工後のCSVはこうなります
Name,Age,Occupation,Salary,Salaryman,Nationary
Alice,25,Engineer,70000,会社員,アメリカ人
Bob,30,Doctor,120000,会社員,アメリカ人
Charlie,35,Artist,50000,会社員,アメリカ人
David,40,Lawyer,90000,会社員,アメリカ人
Eva,28,Scientist,85000,会社員,アメリカ人
Peter,28,Engineer,100000,会社員,アメリカ人
Haidi,32,Engineer,130000,会社員,アメリカ人
各セクションで使用したコードは、PySparkを使ってデータを処理するための基本的な操作に焦点を当てています。特に、データフレームを利用した操作やテストの実施方法は、実際のプロジェクトにおいて非常に重要です。データのクレンジングや集計操作は、ほぼすべてのデータ処理プロジェクトで不可欠な作業であり、AWS Glueのサーバーレス機能を活用することで効率的な開発が可能です。
このブログを通して、Sparkの基礎から実際のETL(抽出、変換、ロード)処理の一貫した学習の流れを作り、読者がプロジェクトにすぐに応用できる内容にしました。
実際のプロジェクトではこの方式の応用で進んでいきます。基本を押さえて、応用できるようにハンズオンを行っていきましょう!