見出し画像

Apache Sparkプログラミングを勉強してみる

今回はApache Sparkについてブログを書きます。クラウドでデータ活用基盤を構築していくのに必須の技術となっています。私自身も数年前に始めましたが、復習のつもりで勉強したことを書いてみました。今回は私が経験のあるAWS Glueを使ってサンプルを作成してみました。Sparkを学びたい人だけでなく、これからAWSでデータ活用基盤を構築する人にも有用になっています。それでは始めます。

1. Apache Sparkの概要

Sparkとは何か

Apache Sparkは、ビッグデータを効率的に処理するためのオープンソースの分散コンピューティングフレームワークです。大量のデータを分散処理することで、処理時間を大幅に短縮し、リアルタイムの分析や機械学習の実行を可能にします。SparkはAWS GlueやDatabricks、Amazon EMRやSnowflakeなどさまざまなサービスで利用、連携可能となっています。

Sparkの重要性と用途

Sparkは、ビッグデータの分析やETL(抽出、変換、ロード)処理、機械学習、ストリーミングデータ処理など、様々な用途で利用されています。特に、データの処理速度が速く、スケーラビリティに優れている点が特徴です。また、Databricksではバックグラウンドで自動的にセットアップされていたり、AWS GlueではGlueContextにラップされていたりして、比較的簡単に扱うことが出来ます。

Sparkの用途

ブログの目的と学習目標

本ブログでは、Pythonでの開発経験はあるが、Apache Sparkは初めてという方を対象に、Sparkの基礎から始め、AWS Glueを使用した実践的な開発方法までを学んでいきます。また、単体テストとカバレッジ取得の方法についても触れていきます。

2. Sparkのアーキテクチャと基本概念

主要コンポーネント

  • ドライバー: アプリケーション全体を管理し、タスクの分散を行います。

  • エグゼキューター: ドライバーから受け取ったタスクを実行し、結果を返します。

  • クラスターマネージャー: クラスター全体を管理し、リソースの割り当てを行います。

主要コンポーネント

RDDの概念

RDD(Resilient Distributed Dataset)は、Sparkの基本的なデータ構造で、分散環境での耐障害性と並列処理を実現します。

DataFrameとDatasetの紹介

  • DataFrame: 構造化データを扱うための高レベルなAPIで、SQLのような操作が可能です。

  • Dataset: 型安全な操作を提供するDataFrameの拡張です。特に、強い型チェックが必要な場合に使用します。

DataFrameとDataset

3. Spark開発環境のセットアップ

AWS Glueを利用したSpark環境の構築

AWS Glueはサーバーレスのデータ統合サービスで、簡単にSpark環境を構築できます。以下の手順でセットアップを行います。

  1. AWS Glue用のIAM Roleを作成します。

  • S3からのデータの読み書き、CloudWatchLogsへのログの出力ができるようにします。

IAM設定例
  1. AWS Management ConsoleでGlue Studioを開きます。

  2. ジョブの作成からSparkを選択し、PySparkコードを実行します。

Glue Studio


Script editorからEngineはSparkを選択
Glue Script editorの初期状態(デフォルトのPythonコード)
Glue Scriptの「Job details」より実行IAM Roleを今回作成したものに変更する
  • デフォルトのPythonコード

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()
  • 新たなツールやライブラリのインストールは一旦不要です。

4. データ取り込みの実践(抽出)

AWS GlueでPySparkを使ったCSVファイルの読み込み

Glueのデータカタログを作成し、データソースを定義します。次に、PySparkコードでCSVファイルを読み込みます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# CSVファイルの読み込み
df = spark.read.csv("s3://spark-test-XXXX-bucket/sample_spark_data.csv", header=True, inferSchema=True)

# データフレームの表示
df.show()

job.commit()
  • 初期状態から以下の部分のコードを追加しただけである

# CSVファイルの読み込み
df = spark.read.csv("s3://spark-test-XXXX-bucket/sample_spark_data.csv", header=True, inferSchema=True)

# データフレームの表示
df.show()

説明:

  • SparkSession: Sparkで作業する際に必須のエントリーポイントです。このセッションを使って、データの読み書きやクエリの実行を行います。AWS GlueではGlueContextにラップしているのでそれを利用します。

  • spark.read.csv: csvメソッドを使用してCSVファイルを読み込みます。header=TrueでCSVの最初の行をカラム名として扱い、inferSchema=Trueでデータ型を自動推定します。

5. データクレンジングの手法(変換)

欠損値の処理

データに欠損値が含まれている場合、これを処理するための方法を紹介します。

# 欠損値を特定の値で埋める
df_clean = df.na.fill({"column_name": "default_value"})

# 欠損値を持つ行を削除する
df_clean = df_clean.na.drop()

説明:

  • na.fill: 欠損値(null)を指定した値で置き換えます。辞書形式で列名とその値を指定します。

  • na.drop: 欠損値を含む行を削除します。

重複データの削除

データに重複がある場合、その削除方法を示します。

# 重複行を削除
df_clean = df_clean.dropDuplicates()

説明:

  • dropDuplicates: データフレーム内の重複行を削除します。

列名の変更や型変換

Sparkでは、列名の変更やデータ型の変換も簡単に行えます。

# 列名の変更
df_clean = df_clean.withColumnRenamed("old_column_name", "new_column_name")

# 型変換
from pyspark.sql.functions import col
df_clean = df_clean.withColumn("column_name", col("column_name").cast("Integer"))

説明:

  • withColumnRenamed: 特定の列の名前を変更します。

  • cast: 指定した型に変換する際に使用します。この例ではInteger型に変換しています。

集計の実施

# カラムの平均を計算
df_clean.groupBy("group_column").agg({"value_column": "avg"}).show()

説明:

  • groupBy: 特定の列を基にグループ化し、集計関数を適用できます。この例では、value_columnの平均値を計算しています。

6. データ出力(ロード)の方法

処理済みデータをCSV形式で保存

AWS Glueを使用してデータをS3に出力します。

df_clean.write.csv("s3://your-bucket/cleaned-data.csv", header=True)

説明:

  • write.csv: SparkデータフレームをCSV形式でS3に保存します。header=Trueでヘッダー行を含めます。

7. まとめ AWS GlueでCSVファイルを読み込んで、少し加工して、CSVファイルとしてデータ出力(ロード)を行う

それでは今までの知識を組み合わせて、AWS GlueよりS3バケットからCSVファイルを読み込んで、少し加工(クレンジング)して、S3バケットの違うCSVファイルにデータ出力を行うプログラムを組んでみましょう。

  • ソースコードはこうなります

  • CSVを読み込み、データを加工して、他のファイルに出力するソースコード

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit, col

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# CSVファイルの読み込み
df = spark.read.csv("s3://spark-test-XXXX-bucket/sample_spark_data.csv", header=True, inferSchema=True)

# データフレームの表示
df.show()

# 列の値の変更
df_clean = df.withColumn("Department", lit('会社員'))

# 列名の変更
df_clean = df_clean.withColumnRenamed("Department", "Salaryman")

# 列の挿入
df_clean = df_clean.withColumn("Nationary", lit('アメリカ人'))

# 型変換
df_clean = df_clean.withColumn("Salary", col("Salary").cast("Integer"))

# カラムの平均を計算
df_clean.groupBy("Occupation").agg({"Salary": "avg"}).show()

# データロード
df_clean.write.csv("s3://spark-test-XXXX-bucket/cleaned-sample_spark_data", header=True)

job.commit()

lit関数が必要な理由

  • SparkのDataFrame操作は、通常、列を基にした操作に特化しています。litを使わずに定数を挿入しようとするとエラーが発生することがあります。litを使用することで、Sparkが定数をDataFrameの操作対象として認識できるようになります。

  • データ加工前のCSVはこちらです

Name,Age,Occupation,Salary,Department
Alice,25,Engineer,70000,IT
Bob,30,Doctor,120000,Healthcare
Charlie,35,Artist,50000,Arts
David,40,Lawyer,90000,Law
Eva,28,Scientist,85000,Research
Peter,28,Engineer,100000,IT
Haidi,32,Engineer,130000,IT


S3の所定のバケットにCSVファイルを設置
  • 実行すると次のようになります。

Glue Scriptを実行「Run」


  • データ加工後のCSVはこうなります

S3バケットの指定ディレクトリにCSVファイルが出力
Name,Age,Occupation,Salary,Salaryman,Nationary
Alice,25,Engineer,70000,会社員,アメリカ人
Bob,30,Doctor,120000,会社員,アメリカ人
Charlie,35,Artist,50000,会社員,アメリカ人
David,40,Lawyer,90000,会社員,アメリカ人
Eva,28,Scientist,85000,会社員,アメリカ人
Peter,28,Engineer,100000,会社員,アメリカ人
Haidi,32,Engineer,130000,会社員,アメリカ人

各セクションで使用したコードは、PySparkを使ってデータを処理するための基本的な操作に焦点を当てています。特に、データフレームを利用した操作やテストの実施方法は、実際のプロジェクトにおいて非常に重要です。データのクレンジングや集計操作は、ほぼすべてのデータ処理プロジェクトで不可欠な作業であり、AWS Glueのサーバーレス機能を活用することで効率的な開発が可能です。
このブログを通して、Sparkの基礎から実際のETL(抽出、変換、ロード)処理の一貫した学習の流れを作り、読者がプロジェクトにすぐに応用できる内容にしました。
実際のプロジェクトではこの方式の応用で進んでいきます。基本を押さえて、応用できるようにハンズオンを行っていきましょう!

いいなと思ったら応援しよう!