タスク(バッチ)の二重起動や重複実行を防ぐ方法はいくつかありますが、Redisロックを使ったGem(SidekiqUniqueJobs)を見つけたので試してみました。

SidekiqUniqueJobs導入

GitHub – mhenrixon/sidekiq-unique-jobs: Prevents duplicate Sidekiq jobs

Sidekiq-Cronは導入済みの想定です。
GitHub – sidekiq-cron/sidekiq-cron: Scheduler / Cron for Sidekiq jobs

Gemfile

# Use Sidekiq
gem 'sidekiq'
gem 'sidekiq-cron'
+ gem 'sidekiq-unique-jobs'
% bundle install

Gemfile.lock

    sidekiq (7.3.9)
    sidekiq-cron (2.3.0)
    sidekiq-unique-jobs (8.0.11)

config/initializers/sidekiq.rb

+ require 'sidekiq-unique-jobs'

Sidekiq.configure_server do |config|
  config.redis = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1') }

+  config.client_middleware do |chain|
+    chain.add SidekiqUniqueJobs::Middleware::Client
+  end
+  config.server_middleware do |chain|
+    chain.add SidekiqUniqueJobs::Middleware::Server
+  end
+  SidekiqUniqueJobs::Server.configure(config)

  config.on(:startup) do
    schedule_file = "config/schedule/#{Rails.env}.yml"
    if File.exist?(schedule_file)
      schedule_hash = YAML.load_file(schedule_file)
      Sidekiq::Cron::Job.load_from_hash! schedule_hash

      # NOTE: load_from_hash!は追加・変更のみで削除されない為
      destroy_jobs = Sidekiq::Cron::Job.all.map(&:name) - schedule_hash.keys
      Sidekiq.logger.info "delete destroy_jobs: #{destroy_jobs.join(',')}"
      destroy_jobs.each do |destroy_job|
        Sidekiq::Cron::Job.find(destroy_job)&.destroy
      end
    end

    ActiveRecord::Base.connection_pool.disconnect!
    Rails.application.load_tasks # NOTE: Rake::Taskを実行する為
  end
end

Sidekiq.configure_client do |config|
  config.redis = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1') }

+  config.client_middleware do |chain|
+    chain.add SidekiqUniqueJobs::Middleware::Client
+  end
end

※ymlから削除やキー名を変更しても、Cronジョブが消えなかったので、序でに対応しています。

SidekiqUniqueJobsを使う

class RakeTaskExecuteWorker
  include Sidekiq::Worker
  sidekiq_options queue: :schedule,
                  retry: 0, # NOTE: cronから実行する為、リトライしない
+                  lock: :until_and_while_executing, # NOTE: 二重起動防止(キュー登録と実行排他)
+                  lock_ttl: 60 * 60 # 60分 # NOTE: 途中でインスタンスが落ちた時に解除される。超えると二重起動可能になってしまう

lockはどれを使うのがいいのか?

lock_ttl未設定で、同じジョブを2つ実行した場合、
:until_executing ←両方とも実行中に入る。2つ目は1つ目が終わってから実行される。
:until_executed ←キュー〜実行前まで。1つ目のみ実行中に入る。2つ目は実行されない。
:while_executing ←実行中のみ。1つ目のみ実行中に入る。2つ目は実行されない。
:until_and_while_executing ←キュー〜実行中。1つ目のみ実行中に入る。2つ目は実行されない。

Sidekiq-Cronで一番安全なのは:until_and_while_executingだと思う。

lock_ttlはいくつにするのがいいのか?

未設定の場合は下記が自動設定される。retryhが0の場合は、1時間。

(retry_count + 1) * 60 * 60

想定最大実行時間よりも長くする必要がありますが難しい。。。
1時間毎のCron実行を想定して、60分にしましたが、状況に応じて伸ばした方が良さそう。

実行中にインスタンスが落ちたらどうなる?

ttlが切れるまで、実行されなくなる。
正常終了したらロックが解除されますが、以上終了時はロックが残る。

メモリ使い過ぎ等で、落ちる事は良くあるので、対策したいところ。

起動時にRedisロックを削除するようにする

複数台構成の場合は、構成に合わせて変更した方が良いと思いますが、
1インスタンスなら起動時に雑に削除するで問題ないので、下記を追加しました。

config/initializers/sidekiq.rb

  config.on(:startup) do
+    # NOTE: 実行中にインスタンスが落ちるとRedisロックが残り、TTLが切れるまで起動できなくなる為(1インスタンスの場合のみ)
+    Sidekiq.redis do |redis_connection|
+      redis_lock_keys = redis_connection.keys('uniquejobs:*:LOCKED')
+      Sidekiq.logger.info "delete redis_lock_keys: #{redis_lock_keys.join(',')}"
+      redis_lock_keys.each do |redis_lock_key|
+        redis_connection.del(redis_lock_key)
+      end
+    end

閾値を超えたら気付けるようにしたい

on_conflictで、二重起動したら通知したりも出来そうだけど、超える前に気付けるようにしたい。
監視と同じようにWarningとCriticalの閾値をTask毎に設定したい。Workerに実装してみました。

config/schedule/development.rb, production.rb

_test_sleep:
  status: 'disabled' # MEMO: 本番も手動実行
  cron: '0 10 * * 1' # (月曜日 10:00)
  class: 'RakeTaskExecuteWorker'
  queue: 'high' # NOTE: 手動実行の為
  args:
    task_name: 'test:sleep'
    execution_interval: 420 # 7分
    warnning_run_time: 360 # 6分
  description: 'Sleepテスト'
_test_sleep_warning:
  status: 'disabled' # MEMO: 本番も手動実行
  cron: '0 10 * * 1' # (月曜日 10:00)
  class: 'RakeTaskExecuteWorker'
  queue: 'high' # NOTE: 手動実行の為
  args:
    task_name: 'test:sleep'
    execution_interval: 360 # 6分
    warnning_run_time: 300 # 5分
  description: 'Sleepテスト(warning)'
_test_sleep_critical:
  status: 'disabled' # MEMO: 本番も手動実行
  cron: '0 10 * * 1' # (月曜日 10:00)
  class: 'RakeTaskExecuteWorker'
  queue: 'high' # NOTE: 手動実行の為
  args:
    task_name: 'test:sleep'
    execution_interval: 300 # 5分
    warnning_run_time: 240 # 4分
  description: 'Sleepテスト(critical)'

app/workers/rake_task_execute_worker.rb

class RakeTaskExecuteWorker
  include Sidekiq::Worker
  sidekiq_options queue: :schedule,
                  retry: 0, # NOTE: cronから実行する為、リトライしない
                  lock: :until_and_while_executing, # NOTE: 二重起動防止(キュー登録と実行排他)
                  lock_ttl: 60 * 60 # 60分 # NOTE: 途中でインスタンスが落ちた時に解除される。超えると二重起動可能になってしまう

  # RakeTaskを実行
  def perform(*args)
    task_name = args[0]['task_name']
    Rails.logger.info("=== START #{self.class.name}.perform(#{task_name}) ===")

    start_time = Time.current
    Rake::Task[task_name].execute

    check_run_time(start_time, args, args[0]['execution_interval'], args[0]['warnning_run_time'])
    Rails.logger.info("=== END #{self.class.name}.perform(#{task_name}) ===")
  end

  private

  def check_run_time(start_time, args, execution_interval, warnning_run_time)
    current_run_time = Time.current - start_time

    if execution_interval.present? && current_run_time > execution_interval.to_i
      message = "[Critical] RakeTaskの実行時間(#{current_run_time.to_i}秒)がexecution_interval(#{execution_interval}秒)を超えました。二重起動されて問題が発生してないか確認してください。args: #{args}"
      Rails.logger.fatal(message)
      Sentry.capture_exception(StandardError.new(message))
    elsif warnning_run_time.present? && current_run_time > warnning_run_time.to_i
      message = "[Warning] RakeTaskの実行時間(#{current_run_time.to_i}秒)がwarnning_run_time(#{warnning_run_time}秒)を超えました。パフォーマンス改善や実行間隔見直を検討してください。args: #{args}"
      Rails.logger.warn(message)
      Sentry.capture_exception(StandardError.new(message))
    end
  end
end

lib/tasks/test/sleep.rake

# :nocov:
namespace :test do
  desc 'Sleepテスト'
  task sleep: :environment do |task|
    Rails.logger.info "=== START #{task.name} ==="

    max_count = 300 # 約5分
    max_count.times do |count|
      Rails.logger.info "#{count + 1}/#{max_count}" if (count + 1) % 5 == 0
      sleep 1
    end

    Rails.logger.info "=== END #{task.name} ==="
  end
end
# :nocov:

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です