Snowflake Taskを駆使してデータを処理する
Snowflakeは、柔軟性と拡張性に優れたデータウェアハウスを構築し管理することができるサービスです。
Taskは、Snowflake上で事前にSQLステートメントを登録し実行できる機能です。
複数のTaskを集合させてDAGを組むことができ、外部のクラウドサービスや自前のワークフローを用意したりせずとも、複数の工程を必要とするデータ処理を実行できることが大きな魅力です。
今回は、サンプルデータを使用したデータを抽出・集計処理を例にTask機能の利便性を紹介したいと思います。
目次
はじめに
こんにちは。
モバイルソリューショングループのnakada.rです。
今回はSnowflakeで用意されているサンプルデータを抽出して、
集計したデータをテーブルに保存する処理をTask機能を用いて実現してみます。
使用するデータ
本手順では、Snowflakeにあらかじめ用意されているデータセットを使用します。
SNOWFLAKE_SAMPLE_DATA
というデータベース名でデータシェアリングされており、
トライアルアカウントからでも読み取り専用で参照できるようになっています。
今回は、TPCH_SF1
スキーマで提供されている商品の購買情報のデータセットを使用して
- データシェアリングされているサンプルデータを用意したデータベースに複製
- どの商品がどれだけ購入されたかを集計したテーブルを作成
する一連の処理をTaskでDAGを組むことにより実現します。
(TPCベンチマークH標準仕様より一部抜粋)
処理の流れ
単一の処理を行うTaskを複数作成し、事前に定義した処理順序でTaskを実行していくDAGを組みます。
DAGとは、有向非巡回グラフ(Directed Acyclic Graph)のことで、
SnowflakeでのDAGは、単一のルートTaskと追加のTaskから構成される、依存関係によって編成された一連のタスクのことを指します。
(SnowflakeでのDAGに関しての詳細は公式リファレンスを参照してください)
最終的に作成されるDAGは以下のような流れになります。
TASK_ROOT
は複数の後続Taskを呼び出すための、処理を行わない空のTask。
TASK_EXTRACT_[テーブル名]
はサンプルデータから必要なテーブルを複製するTask。
TASK_SALES_BY_PRODUCT
は複製したテーブルから、商品毎の販売数を集計してテーブルに保存するTaskとなっています。
Taskの作成
はじめに、複数の後続Taskを呼び出すためのルートTask作成します。
Taskを作成する際の詳細なオプションに関してはSnowflakeの公式リファレンスを参照してください。
create task test_db.public.task_root
warehouse = 'COMPUTE_WH'
as
select 1;
今回は、Taskを実行するWAREHOUSEとしてCOMPUTE_WHを指定しています。
また、ルートTaskとして複数のTaskは指定できないため、実質処理を行わない内容のSQLをダミーとして設定しています。
次にサンプルデータからテーブルを複製する4つの後続Taskを作成します。
create task task_extract_orders
warehouse = 'COMPUTE_WH'
after test_db.public.task_root
as
create or replace table test_db.public.orders as
select
*
from
snowflake_sample_data.tpch_sf1.orders;
create task test_db.public.task_extract_lineitem
warehouse = 'COMPUTE_WH'
after test_db.public.task_root
as
create or replace table test_db.public.lineitem as
select
*
from
snowflake_sample_data.tpch_sf1.lineitem;
create task test_db.public.task_extract_partsupp
warehouse = 'COMPUTE_WH'
after test_db.public.task_root
as
create or replace table test_db.public.partsupp as
select
*
from
snowflake_sample_data.tpch_sf1.partsupp;
create task test_db.public.task_extract_part
warehouse = 'COMPUTE_WH'
after test_db.public.task_root
as
create or replace table test_db.public.part as
select
*
from
snowflake_sample_data.tpch_sf1.part;
ポイントとしてはそれぞれのTaskで、after
オプションを使用してTASK_ROOTが実行された後にそれぞれのTaskが起動するよう設定している点です。
このオプションをそれぞれ設定することでTASK_ROOTの実行完了後にテーブルを複製する4つのTaskが起動するようになります。
最後に複製したテーブルを参照し、それぞれの商品毎の販売数を集計して新規にテーブルを作成するTaskを作成します。
create task test_db.public.task_sales_by_product
warehouse = 'COMPUTE_WH'
after test_db.public.task_extract_orders, test_db.public.task_extract_lineitem, test_db.public.task_extract_partsupp, test_db.public.task_extract_part
as
create table test_db.public.sales_by_product as
select
p.p_name as name,
p.p_brand as brand,
sum(l.l_quantity) as quantity
from
test_db.public.orders as o
left join
test_db.public.lineitem as l
on
o.o_orderkey = l.l_orderkey
left join
test_db.public.partsupp as ps
on
l.l_partkey = ps.ps_partkey
and
l.l_suppkey = ps.ps_suppkey
left join
test_db.public.part as p
on
ps.ps_partkey = p.p_partkey
group by p.p_name, p.p_brand;
;
ここでは先行Taskとして前述のテーブルを複製する4つのTaskを指定しています。
これにより、クエリの実行時に参照するテーブルが作成された後に集計処理が実行されるようになります。
Taskを作成した直後はステータスが中断状態になっているため、手動で再開します。
ルートTaskはスケジュール実行の設定をしない限りは再開できない仕様になっているため、
後続Taskのみステータスを変更します。
alter task test_db.public.task_extract_orders resume;
alter task test_db.public.task_extract_lineitem resume;
alter task test_db.public.task_extract_partsupp resume;
alter task test_db.public.task_extract_part resume;
alter task test_db.public.task_sales_by_product resume;
ここまでの作業が完了すると、Snowflakeコンソールからも下図のようなDAGになっていることが確認できると思います。
以上で準備完了です。
手動でルートTaskを実行してみましょう。
execute task task_root;
TASK_SALES_BY_PRODUC テーブルが作成されて、商品毎の販売数が確認できたら成功です!
おわりに
今回は手動でタスクを実行しましたが、Taskはスケジュール実行の設定も可能なため、日次でデータ抽出や加工処理にも利用できます。
またTaskのSQLステートメントからストアドプロシージャを呼び出すことも可能なため、変数などを用いた複雑な処理も実現可能です。
Task機能の可能性に夢が膨らみますね。