Sparkでパーティション単位で上書きする

TL;DR

confでspark.sql.sources.partitionOverwriteModeをdynamicにする

spark.conf.set('spark.sql.sources.partitionOverwriteMode','dynamic')
(
   df.write
   .partitionBy(parts)
   .mode('ovewrite')
   .parquet(path)
)

目的

基本的には直近データだけ追記なんだけど、過去分と重複しているものもあって、そこは上書きしたいってときにどうするのが楽か。直近N日分の全実績を連携します、みたいなインターフェースのときにつかう。(パーティション単位で必ず全量であるという前提)
一緒に仕事をしてたりしてなかったりする山pさんのblogへのアンサーです。
PySparkで日付毎にデータを蓄積する際のdf.write.modeについて

テストデータ作成

df_all=(
   spark.range(100)
   .withColumn('day',f.floor(f.rand()*5+1).cast('integer'))
   .withColumn('time',f.floor(f.rand()*24).cast('integer')*100+f.floor(f.rand()*60).cast('integer'))
   .withColumn('v1',f.rand())
   .drop('id')
   .orderBy('day','time')
)
df_all.show(30,truncate=False)

+---+----+-------------------+
|day|time|v1                 |
+---+----+-------------------+
|1  |8   |0.5513654872922117 |
|1  |55  |0.4698297534046909 |
|1  |117 |0.4880556139436356 |
|1  |628 |0.15474645262402442|
|1  |906 |0.9056653443007471 |
|1  |1033|0.7766742995309728 |
|1  |1452|0.46746331127877827|
|1  |1728|0.9332141872301741 |
|1  |2027|0.0700844044477612 |
|1  |2124|0.45203191820354394|
|1  |2228|0.1636419591263497 |
|1  |2238|0.10166083725635766|
|1  |2241|0.3942152875235496 |
|2  |47  |0.6058353045541233 |
|2  |150 |0.7311324869007179 |
|2  |256 |0.9041655665371766 |
|2  |358 |0.5004425230868429 |
|2  |515 |0.2466091429945031 |
|2  |748 |0.4396610834058198 |
|2  |824 |0.9852832769634179 |
|2  |1006|0.285056338565408  |
|2  |1236|0.29963280724141605|
|2  |1531|0.6377686944419767 |
|2  |1637|0.8979489141252349 |
|2  |1724|0.5868945276164151 |
|2  |1748|0.9726782645403069 |
|2  |1952|0.916742675542807  |
|2  |2005|0.4853788825216445 |
|2  |2128|0.17529075099325742|
|2  |2201|0.4119881594370449 |
+---+----+-------------------+
only showing top 30 rows

全体の件数はこんなかんじ。

(
   df_all
   .groupBy('day')
   .count()
   .orderBy('day')
).show(10,truncate=False)

+---+-----+
|day|count|
+---+-----+
|1  |19   |
|2  |21   |
|3  |19   |
|4  |28   |
|5  |13   |
+---+-----+

きのうのデータ連携では1日から4日の12:30までのデータが、きょうのデータ連携では2日から5日までのデータが連携されたとする。

df_1=(
   df_all
   .filter((f.col('day')<=3)|((f.col('day')==4))&(f.col('time')<=1230))
)
(
   df_1
   .groupBy('day')
   .count()
   .orderBy('day')
).show(10,truncate=False)

+---+-----+
|day|count|
+---+-----+
|1  |19   |
|2  |21   |
|3  |19   |
|4  |14   |
+---+-----+


df_2=(
   df_all
   .filter(f.col('day')>=2)
)
(
   df_2
   .groupBy('day')
   .count()
   .orderBy('day')
).show(10,truncate=False)

+---+-----+
|day|count|
+---+-----+
|2  |21   |
|3  |19   |
|4  |28   |
|5  |13   |
+---+-----+

(ふつうの)Overwriteで書き込むとこうなる

#case 1 overwrite
case1_path='/home/jovyan/work/case1'
(
   df_1.write
   .mode('overwrite')
   .partitionBy('day')
   .parquet(case1_path)
)
(
   df_2.write
   .mode('overwrite')
   .partitionBy('day')
   .parquet(case1_path)
)
(
   spark.read.parquet(case1_path)
   .groupBy('day')
   .count()
   .orderBy('day')
).show(10,truncate=False)

+---+-----+
|day|count|
+---+-----+
|2  |21   |
|3  |19   |
|4  |28   |
|5  |13   |
+---+-----+

きのう書き込んだはずの1日のデータは全て消えている。
(ふつうの)Overwriteはパーティション関係なく基準ディレクトリ以下全て完全に置き換えてしまうため。

Appendで書き込むとこうなる

#case 2 append
case2_path='/home/jovyan/work/case2'
(
   df_1.write
   .mode('append')
   .partitionBy('day')
   .parquet(case2_path)
)
(
   df_2.write
   .mode('append')
   .partitionBy('day')
   .parquet(case2_path)
)
(
   spark.read.parquet(case2_path)
   .groupBy('day')
   .count()
   .orderBy('day')
).show(10,truncate=False)

+---+-----+
|day|count|
+---+-----+
|1  |19   |
|2  |42   |
|3  |38   |
|4  |42   |
|5  |13   |
+---+-----+

きのうときょうの連携で重複していた2日から4日午前のデータが重複してしまう。

Overwrite (dynamic)

#case 3 overwrite(dynamic)
case3_path='/home/jovyan/work/case3'
overwritemode_save=spark.conf.get('spark.sql.sources.partitionOverwriteMode')
spark.conf.set('spark.sql.sources.partitionOverwriteMode','dynamic')
(
   df_1.write
   .mode('overwrite')
   .partitionBy('day')
   .parquet(case3_path)
)
(
   df_2.write
   .mode('overwrite')
   .partitionBy('day')
   .parquet(case3_path)
)
(
   spark.read.parquet(case3_path)
   .groupBy('day')
   .count()
   .orderBy('day')
).show(10,truncate=False)
spark.conf.set('spark.sql.sources.partitionOverwriteMode',overwritemode_save)

+---+-----+
|day|count|
+---+-----+
|1  |19   |
|2  |21   |
|3  |19   |
|4  |28   |
|5  |13   |
+---+-----+

スバラシイ!


(以前のoverwritemodeをいちいち退避して復元しているのは、エンタープライズなSIをやっていた性か。。。)
confはセッション単位なので、セッションを共有するなんて酔狂なことをしている人じゃない限り、やらなくて大丈夫です。

大事なお知らせ

保存先がS3の方は、S3コネクタの実装次第でものすごく遅くなることがあり得るから気をつけてねというお話があります。

ご参考

Use Dynamic Partition Overwrite for ETL with Apache Spark
https://paulstaab.de/blog/2020/spark-dynamic-partition-overwrite.html

Spark Dynamic Partition Inserts — Part 1
https://medium.com/nmc-techblog/spark-dynamic-partition-inserts-part-1-5b66a145974f

Spark Dynamic Partition Inserts and AWS S3 — Part 2
https://medium.com/nmc-techblog/spark-dynamic-partition-inserts-and-aws-s3-part-2-9ba0c97ad2c0

この記事が気に入ったらサポートをしてみませんか?