こんにちは。技術部の池田です。
この記事では、AWSを使っているプロジェクトではありがちなAmazon S3からGoogle BigQueryにデータを投入するためのツールを書いた話をします。
BQin - BigQuery data importer with AWS S3 and SQS messaging.
名前からお察しの方もいらっしゃるとは思いますが、BQinは弊社藤原のRinから着想を得ています。 このツールは一言で表すと、データ投入先がRedshiftからBigQueryに変更されたRinです。 プロダクションに投入し1ヶ月以上になりますが、深刻な問題は発生せず動いております。
開発動機的な話
とあるプロジェクトでAWS S3にデータが投入されるから、GCP BigQueryへデータを転送したいという話がありました。 はじめのうちは、Cloud ComposerやBigQuery Data Transfer Serviceなどの利用を検討していました。
- GCPは現状BigQueryしか使っていない。
- 何かしらの動くものは外形監視などが整っているAWSに寄せたい。
等々の大小細々とした理由から、S3のからの通知でデータをBigQueryに転送するなにかを作ることになりました。 初期構想では、Google Cloud Platformのbqコマンドやgsutilコマンドをlambdaで実行しようとしてました。 しかし、
『 要するにRinのbq版なんだよねこれ… 』by 藤原
の一言で、時間に余裕もあり作ってしまいました。
アーキテクチャー的な話
基本的なアーキテクチャーはRinと同じような感じです。
動作の流れとしては以下のようになっております。
- S3に(何者かが) データを保存する
- SQS に S3のpath等が記述されたメッセージ(s3:CreateObject:*)が通知される。
BQinがSQSメッセージを受信して次の処理を行う。
- S3からデータを読み込み、Google Cloud Storageの一時バケットに転送する。
- BigQueryのLoad Jobを作成し、データのLoadが完了するのを待つ。
- Google Cloud Storageに転送したデータを消す。
SQSのメッセージを削除し、データの処理を完了したことにする。
Go言語で制作されているので、バイナリダウンロードしS3とSQSの設定をした上で以下のような config を用意すれば利用できます。
config.yaml
queue_name: my_queue_name # SQS queue name cloud: aws: #awsのcredentialは省略した場合、インスタンス等から読み取るのもOK。 region: ap-northeast-1 access_key_id: {{ must_env "ACCESSS_KEY_ID" }} secret_access_key: {{ must_env "SECRET_ACCESS_KEY" }} gcp: #gcpのクレデンシャルはOfficalのGOOGLE_APPLICATION_CREDENTIALSの環境変数で与える形でもOK base64_credential: {{ must_env "GCP_CREDENTIAL_BASE64_JSON" }} s3: bucket: bqin.bucket.test region: ap-northeast-1 big_query: project_id: bqin-test dataset: test option: temporary_bucket: my_bucket_name # データの一時的な置き場 gzip: true source_format: json # [csv, json, parquet] から選択 auto_detect: true # works only csv or json rules: - big_query: table: foo s3: key_prefix: test/foo - big_query: project_id: hoge dataset: bqin_test table: role s3: bucket: bqin.bucket.test key_regexp: test/([a-z]+)/([a-z]+)/.+\.csv option: gzip: false source_format: csv
起動コマンドとしては、以下で動きます。
$ bqin run -config config.yaml
Fargateで使いやすいように、Credential周りの設定は以下のような工夫があります。
- AWSはconfigで省略した場合はEC2のinstance-roleやECSのtask-roleをから利用するようになってます。
- GCPはSSMのパラメータストアからECSで取り扱いやすいようにcredentialのjsonをbase64エンコードしたものを取り扱えるようになっています。
どんなときにBQinを利用すると嬉しい?
GCPのサービスにはAWS S3からのデータ転送をサポートしてくれる物があるので、BQinの出番はあまり多くないような気もします。 しかし、以下のような場合はBQinが役に立つかもしれません。
S3のデータ投入のタイミングで取り込み開始したい場合
GCPのサービスの多くは、スケジュール実行のものが多いです。BQinはSQSを使っているのでS3からのPush型でデータの取り込みができます。 先日発表されたAmazon AppFlowなどもありますし、dataを貯めるのはS3で高度な分析等だけはBigQueryを利用したいというケースはあるかもしれません。 弊社の場合、Mobile AppのログはFirebaseに、Server AppのログはAmazon S3に保管されるケースがありがちです。FirebaseからはログをBigQueryへエクスポートできるので、Server AppのログもS3からBigQueryに転送したいというニーズがあります。
GCPはそれほど利用していない。
Google Cloud Platformを多用している場合は、terraformなどでの管理体制が確立していることがあります。その場合、Cloud ComposerやGKEなどを新しく利用することに腰の重さは感じないでしょう。しかし、『 BigQueryだけ使いたい 』『Spreadsheetと連携したい』等の局所利用にとどめてるケースではAWS側でデータ投入機構を管理できるのはメリットだと思います。
おわりに
データ分析基盤を整える周りの話でAWS-GCP間のデータ転送の話はよくある問題だと思います。 何を使うのが良いのかはケースバイケースだと思いますが、選択肢の一つにBQinを考えてみるのはいかがでしょうか? 今回は、Amazon S3からGoogle BigQueryへの転送を助けるツールを作ってみましたという話でした。