7839

雑草魂エンジニアブログ

【Rails】Sidekiq 〜ジョブがスタックしたので、複数プロセスで対応

自動復旧できるシステムが構築したくて、Railsで簡易的なアプリケーションを開発した。その際に、バックグラウンドジョブを実行するために、Sidekiqを使った。ただ、バックグラウンドジョブがスタックすることがあったので、調べたことを備忘録として残しておく。

Sidekiqとは

Rubyのシンプルで効率的なバックグラウンド処理。 Sidekiqはスレッドを使用して、同じプロセスで同時に多くのジョブを処理する。 Railsでは必須ではありませんが、Railsと緊密に統合され、バックグラウンド処理を非常に簡単に実現することができる。

Sidekiqは、非同期処理を実行する際に用いられる。キューのストレージには、Redisを用いる。

メリット

  • マルチスレッドで動くので、処理速度が早い(メモリを共有して、大量のJobを実行できる)
  • マルチスレッドのため、メモリに対するパフォーマンスがいい

懸念点

  • マルチスレッドのため、 メモリが肥大化することがある
  • マルチスレッドのため、利点を最大限に活かせるようスレッドセーフに実装する必要がある

設定などに関しては、前回書いた記事を参考までに載せておく。

ジョブのスタックが発生??

今回のシステムでは、以下のような仕組みで処理を実行していた。

f:id:serip39:20220124012411p:plain

  1. RailsアプリケーションをAPIサーバーとして起動し、エラー内容がPOSTされてくる
  2. エラー内容を確認し、自動復旧処理を実行するジョブをRedisに追加する
  3. Sidekiqが空いていれば、ジョブを受け取る
  4. 自動復旧処理を実行する

このとき、Sidekiqでは、1プロセス10スレッドを起動させていた。しかしながら、エラー通知が同じタイミングで複数入ってきた場合に、前の処理が終わるまで次の処理が実行されないという事象が発生した。(スレッドが空いているため、dequeueは実行され、Sidekiqのダッシュボードでは「実行中」になるものの、処理が実行されていなかった。)

マルチスレッドなので、10スレッドであれば、10個の処理を並列で実行できるもののと勝手に思い込んでいたが、無条件に何でもできるわけではないようであった。

Rubyのスレッドには、以下のような記載があった。

現在の実装では Ruby VM は Giant VM lock (GVL) を有しており、同時に実行されるネイティブスレッドは常にひとつです。ただし、IO 関連のブロックする可能性があるシステムコールを行う場合には GVL を解放します。その場合にはスレッドは同時に実行され得ます。

すなわち、IO 関連のブロックする可能性があるシステムコールを行わない場合は、スレッド処理は常に1つしか実行されない。

複数のジョブを実行した際に、スタックした原因はこれっぽい気がした。

今回のエラーに関しては、そんなに頻繁に発生しない、また発生しても同時に複数発生する頻度は少ないことから、一旦複数プロセスを立ち上げて現状のままの処理で対応するようにした。(複数のプロセスを立ち上げる場合、サーバーのリソースの問題もあるが、今回の処理はそんなに重くないので、問題ないと判断した。)

複数プロセスの起動

今回の各システムのバージョンは以下の通りである。

プロセス管理に関しては、開発環境では、Foreman、本番環境では、Supervisorを用いた。本番環境では、プロセスのデーモン化もしたかったので、Supervisorを採用した。今回は、例として開発環境でのFormanを用いた、複数プロセスの起動方法を説明する。(Foremanの使い方などは省略する。)

設定ファイルは、以下のようにした。

web: rm -f tmp/pids/server.pid && bundle exec rails server -p 3000 -b 0.0.0.0
redis: bundle exec redis-server
sidekiq1: bundle exec sidekiq -c 1 -q sidekiq1 -q default
sidekiq2: bundle exec sidekiq -c 1 -q sidekiq2 -q default
  • -cで、スレッドの数を設定する。Sidekiqはマルチスレッドが利点であるが、今回の処理においては、その利点を使えないので、スレッドを1に設定した。(そうすることで、キューにいれたジョブで待機中のジョブが何個あるか、きちんと可視化できるようにした。)
  • -qで、キューの優先順位を設定する。重み付けなどもできるが、今回は設定せず、順序のみを設定した。詳細は、こちらで確認してほしい。

Workerのキューの振り分け

HogeWorkerを実行する例で、キューの振り分け方法を紹介する。まずは、実装したコードを先に示す。

ps = Sidekiq::ProcessSet.new
empty_ps = ps.find{|p| p['busy'] == 0}
queue_name = empty_ps.nil? ? 'default' : empty_ps['queues'][0]
stats = Sidekiq::Stats.new
if 0 < stats.enqueued && queue_name == 'default'
  send_stack_message_to_slack #スタックしたことを通知
end
HogeWorker.set(queue: queue_name).perform_async(error_data)

ジョブであるWorkerは、キューのジョブをランダムな順番で実行する。そのため、基本的には空いているプロセス側で実行されるが、時折実行中のプロセスに連続でジョブが入ってしまうことがあった。そこで、明示的にキューの振り分けを実装してみた。(本来は不要かもなと思いながらも、明示的にしてみたw)

ps = Sidekiq::ProcessSet.new
ps.size # => 2
ps.each do |process|
  p process['busy']     # => 3
  p process['hostname'] # => 'localhost'
  p process['pid']      # => 16131
  process['queues'].map{|que| p que} # => 'sidekiq', 'default'
end

上記で、現在実行中のプロセス情報を取得することができる。(一度インスタンスを生成すると、データは5秒ごとに更新される。)busyが現在実行中のジョブの数である。ゆえに、busyがゼロであれば、そのプロセスは空いていて、処理が実行可能である。

HogeWorker.set(queue: queue_name).perform_async(error_data)

Workerが利用するキューの名前は、デフォルトでdefaultに設定されている。Worker側で設定も可能であるが、ジョブをプッシュする際に、 .set(queue: queue_name)で上書きすることができる。これで、プッシュする直前に空いているプロセス側に設定する。どちらも空いていない場合は、defaultに設定するようにした。

また、どちらも空いておらず、処理がスタックしてたまり続けると、自動化した意味がないので、待機状態のジョブが2個以上発生した場合に、Slackに通知するシステムとした。

stats = Sidekiq::Stats.new
stats.enqueued # => 5

enqueuedで待機状態のジョブを取得することができる。Sidekiq::ProcessSetと異なり、自動でデータが更新されないので、最新のデータを取得する場合は、インスタンスを生成し直す必要がある。

SidekiqのAPIの詳細は、以下のWikiを確認してほしい。

SidekiqのAPIを使えば、色々と詳細にプロセスやジョブなどを制御ができて、とても便利であると思えた。

まとめ

本来は、ジョブがスタックしないように、必要なときにSidekiqプロセスを立ち上げて、ジョブの実行が終わった時点で、プロセスを終了させるのが理想ではないかと思えた。

起動はこんな感じ。

timestamp = Time.now.to_i
system("bundle exec sidekiq -c 1 -q #{timestamp}") # サブシェルでSidekiqプロセスを起動
HogeWorker.set(queue: timestamp).perform_async(error_data)

ただこれだと、処理が煩雑になるのと、サブシェルを立ち上げすぎて大丈夫なのだろうかと思えたので、今回は実装を断念した。

もっと最適な方法があれば、是非教えて下さい。また、時間があるときにそもそもなぜスタックしたのか、詳細を調べてみたい。