Amazon SQSを利用してAmazon S3からGoogle BigQueryにデータ投入するBQinというツールを書いた

こんにちは。技術部の池田です。

この記事では、AWSを使っているプロジェクトではありがちなAmazon S3からGoogle BigQueryにデータを投入するためのツールを書いた話をします。

BQin - BigQuery data importer with AWS S3 and SQS messaging.

名前からお察しの方もいらっしゃるとは思いますが、BQinは弊社藤原Rinから着想を得ています。 このツールは一言で表すと、データ投入先がRedshiftからBigQueryに変更されたRinです。 プロダクションに投入し1ヶ月以上になりますが、深刻な問題は発生せず動いております。

開発動機的な話

とあるプロジェクトでAWS S3にデータが投入されるから、GCP BigQueryへデータを転送したいという話がありました。 はじめのうちは、Cloud ComposerBigQuery Data Transfer Serviceなどの利用を検討していました。

  • GCPは現状BigQueryしか使っていない。
  • 何かしらの動くものは外形監視などが整っているAWSに寄せたい。

等々の大小細々とした理由から、S3のからの通知でデータをBigQueryに転送するなにかを作ることになりました。 初期構想では、Google Cloud Platformのbqコマンドやgsutilコマンドをlambdaで実行しようとしてました。 しかし、

『 要するにRinのbq版なんだよねこれ… 』by 藤原

の一言で、時間に余裕もあり作ってしまいました。

アーキテクチャー的な話

基本的なアーキテクチャーはRinと同じような感じです。

f:id:ikeda-masashi:20200601113925p:plain
BQinアーキテクチャー

動作の流れとしては以下のようになっております。

  1. S3に(何者かが) データを保存する
  2. SQS に S3のpath等が記述されたメッセージ(s3:CreateObject:*)が通知される。
  3. BQinがSQSメッセージを受信して次の処理を行う。

    • S3からデータを読み込み、Google Cloud Storageの一時バケットに転送する。
    • BigQueryのLoad Jobを作成し、データのLoadが完了するのを待つ。
    • Google Cloud Storageに転送したデータを消す。
  4. SQSのメッセージを削除し、データの処理を完了したことにする。

Go言語で制作されているので、バイナリダウンロードしS3とSQSの設定をした上で以下のような config を用意すれば利用できます。

config.yaml

queue_name: my_queue_name    # SQS queue name

cloud:
  aws: #awsのcredentialは省略した場合、インスタンス等から読み取るのもOK。
    region: ap-northeast-1
    access_key_id: {{ must_env "ACCESSS_KEY_ID" }}
    secret_access_key: {{ must_env "SECRET_ACCESS_KEY" }}
  gcp: #gcpのクレデンシャルはOfficalのGOOGLE_APPLICATION_CREDENTIALSの環境変数で与える形でもOK
    base64_credential: {{ must_env "GCP_CREDENTIAL_BASE64_JSON" }}

s3:
  bucket: bqin.bucket.test
  region: ap-northeast-1

big_query:
  project_id: bqin-test
  dataset: test

option:
  temporary_bucket: my_bucket_name # データの一時的な置き場
  gzip: true
  source_format: json # [csv, json, parquet] から選択
  auto_detect: true # works only csv or json

rules:
  - big_query:
      table: foo
    s3:
      key_prefix: test/foo

  - big_query:
      project_id: hoge
      dataset: bqin_test
      table: role
    s3:
      bucket: bqin.bucket.test
      key_regexp: test/([a-z]+)/([a-z]+)/.+\.csv
    option:
      gzip: false
      source_format: csv

起動コマンドとしては、以下で動きます。

$ bqin run -config config.yaml

Fargateで使いやすいように、Credential周りの設定は以下のような工夫があります。

  • AWSはconfigで省略した場合はEC2のinstance-roleやECSのtask-roleをから利用するようになってます。
  • GCPはSSMのパラメータストアからECSで取り扱いやすいようにcredentialのjsonをbase64エンコードしたものを取り扱えるようになっています。

どんなときにBQinを利用すると嬉しい?

GCPのサービスにはAWS S3からのデータ転送をサポートしてくれる物があるので、BQinの出番はあまり多くないような気もします。 しかし、以下のような場合はBQinが役に立つかもしれません。

S3のデータ投入のタイミングで取り込み開始したい場合

GCPのサービスの多くは、スケジュール実行のものが多いです。BQinはSQSを使っているのでS3からのPush型でデータの取り込みができます。 先日発表されたAmazon AppFlowなどもありますし、dataを貯めるのはS3で高度な分析等だけはBigQueryを利用したいというケースはあるかもしれません。 弊社の場合、Mobile AppのログはFirebaseに、Server AppのログはAmazon S3に保管されるケースがありがちです。FirebaseからはログをBigQueryへエクスポートできるので、Server AppのログもS3からBigQueryに転送したいというニーズがあります。

GCPはそれほど利用していない。

Google Cloud Platformを多用している場合は、terraformなどでの管理体制が確立していることがあります。その場合、Cloud ComposerやGKEなどを新しく利用することに腰の重さは感じないでしょう。しかし、『 BigQueryだけ使いたい 』『Spreadsheetと連携したい』等の局所利用にとどめてるケースではAWS側でデータ投入機構を管理できるのはメリットだと思います。

おわりに

データ分析基盤を整える周りの話でAWS-GCP間のデータ転送の話はよくある問題だと思います。 何を使うのが良いのかはケースバイケースだと思いますが、選択肢の一つにBQinを考えてみるのはいかがでしょうか? 今回は、Amazon S3からGoogle BigQueryへの転送を助けるツールを作ってみましたという話でした。

カヤックでは、ちょっと困った事が起きたときにツールを作るエンジニアも募集しています