トレタのAPI開発を担当している芹沢です。
トレタでは、長時間かかるバッチ処理を複数台のサーバ上で処理させて短時間で処理できるバッチ処理基盤をAWS上で構築しました。この仕組みについて説明します。
目的
短期的には以下の課題を解決するため、長期的には似たような要件が再度発生した時に、同じ手法で解決できることを目的に作りました。
- 非同期でDBをデータソースとしたデータを加工してCSVファイルとして出力してS3にputしたい
- データソースはDBに入っているリアルタイムのデータであることが求められる
- CSVファイルの作成は決められた時間内に完了する必要がある
- 対象となるデータソースの量は日々増加し続けるが、常に決められた時間内にCSV作成が完了している必要がある
難点
今回の要件で技術的に難しい点は以下の2点です。
DBを直接参照しながら大量のデータを処理する
例えば、データソースとしてDBからHDFSやRedshiftに同期されたデータを使えるのならば、AWS EMRなどのサービスを活用することで大量のデータを処理することは比較的容易です。しかし、今回の要件は、更新頻度が高いデータソースをできるだけリアルタイムに近い状態で取得してCSVを作成する必要がありました。
一定のスループットを担保し続ける
今回の要件上、1回のCSV処理作成にかかる時間を15分以内に抑える必要がありました。inputとなるデータ量が常に一定であれば一度15分以内に終わるように構築すれば、あとはそれを延々と動かし続ければ良いのですが、データ量が増加し続けるという事情があるため、データ量が増えても一定のスループットを担保し続ける仕組みを用意する必要がありました。
バッチ処理専用の環境を作ることにしました
これらの課題を解決するために以下の様な基盤を構築し、その上にデータ処理用のバッチを実装しました。
処理の流れは図の通りです。
バッチの実装について
いわゆるJob Queueですが、以下2つのJobが介在します。前者はSidekiq、後者はShoryukenを使って実装されています。
IDをグルーピングしてCSV作成Jobを作るJob
- sidekiq-cronによって15分に1回起動します
- 処理対象のレコードのIDを取得し、100IDずつにグルーピングします
- グルーピングしたIDを引数としたCSV作成Jobを処理対象のレコード数分だけ作成します
CSVを作成するJob
- Jobに含まれるIDを元にCSVを作成します
- 作成したCSVをS3にpushします
1st release時点では、処理対象のデータのIDが800個ほどあったので、1回の処理につき8個のCSV作成Jobが生成され、8個のShoryuken workerが8並列で処理します。
個々のCSV作成Jobの処理時間は概ね10分〜11分に収まっているので、結果的に15分以内に処理を完了させるという目標を達成できました。処理対象のIDが増加してもShoryuken orkerの数を増やすことでスループットを増加させることが可能になっており、スケールアウトしやすい構造になっています。
Shoryukenについて
あまり日本語の記事を見かけなかったので簡単に紹介します。
Sidekiqとほぼ同じような使い方で使えるworker gemです。Sidekiqと異なる点として、Queue storeにAWS SQSを使います。元々トレタでは非同期処理にはSidekiq + redisを使っていましたが、今回のようにJobの数がスケールしていくことが想定されるケースにおいてはQueueのサイズや数に融通が効くSQSの方が適しているという判断の元、SQSを使うためにShoryukenを採用しました。
Tips: 各Jobのロギングについて
ShoryukenにはJobの実行時に前処理・後処理を挟むなどの目的に使えるMiddleware機構があります。 今回はJob実行時のパフォーマンス計測とエラー時の原因調査のために各Jobのログをfluentdで拾ってBigQueryに入れておきたかったので、以下のようなMiddlewareを自作してログをテキストファイルで残すようにしました。
module Shoryuken module Middleware module CustomMiddleware module Server class ProcessingTimeMeasure def call(worker, queue, sqs_msg, body) shoryuken_log_path = Rails.root.join('log', 'shoryuken_worker.log') @logger = ::Logger.new(shoryuken_log_path) @logger.formatter = proc do |_, _, _, message| message.to_json + "\n" end @success = true @messages = [] started_at = Time.zone.now # ここでJob本体の処理が実行されます yield ended_at = Time.zone.now elapsed_time = (ended_at - started_at) * 1000 rescue => e @success = false @messages << e.to_s Bugsnag.auto_notify(e) ensure body = { timestamp: Time.zone.now.to_i, started_at: started_at, ended_at: ended_at, elapsed_time: elapsed_time, success: @success, messages: @messages, worker: worker.class.to_s, } @logger.info(body) end end end end end end
このMiddlewareをShoryukenに積むにはconfig/initializes
配下に適当なファイルを置いて以下のように追記します。
Shoryuken.configure_server do |config| config.server_middleware do |chain| chain.add Shoryuken::Middleware::CustomMiddleware::Server::ProcessingTimeMeasure end end
まとめ
今のところ安定して稼働していますが、基盤としてはまだ汎用的なつくりにはなっていないので、汎用基盤として稼働させていくためには以下のような機能が必要になりそうだと考えています。
- 処理時間に制限があるJobの時間が経過した場合に通知する(Shoryuken workerの数を増やすタイミングを見極めるための何か)
- Job管理(workflow系ツールの導入など?)
- Shoryukenのプロセス監視と自動復旧
今後も開発を続けながら、何かシェアできる知見が得られたらまた開発者ブログに書きたいと思います。
お約束
トレタはエンジニアを募集しています。iPadアプリやAPIの開発以外に、こういった基盤開発にも興味があるエンジニアはぜひご応募ください。