タスク(バッチ)の二重起動や重複実行を防ぐ方法はいくつかありますが、Redisロックを使ったGem(SidekiqUniqueJobs)を見つけたので試してみました。
SidekiqUniqueJobs導入
GitHub – mhenrixon/sidekiq-unique-jobs: Prevents duplicate Sidekiq jobs
Sidekiq-Cronは導入済みの想定です。
GitHub – sidekiq-cron/sidekiq-cron: Scheduler / Cron for Sidekiq jobs
Gemfile
# Use Sidekiq
gem 'sidekiq'
gem 'sidekiq-cron'
+ gem 'sidekiq-unique-jobs'
% bundle install
Gemfile.lock
sidekiq (7.3.9) sidekiq-cron (2.3.0) sidekiq-unique-jobs (8.0.11)
config/initializers/sidekiq.rb
+ require 'sidekiq-unique-jobs'
Sidekiq.configure_server do |config|
config.redis = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1') }
+ config.client_middleware do |chain|
+ chain.add SidekiqUniqueJobs::Middleware::Client
+ end
+ config.server_middleware do |chain|
+ chain.add SidekiqUniqueJobs::Middleware::Server
+ end
+ SidekiqUniqueJobs::Server.configure(config)
config.on(:startup) do
schedule_file = "config/schedule/#{Rails.env}.yml"
if File.exist?(schedule_file)
schedule_hash = YAML.load_file(schedule_file)
Sidekiq::Cron::Job.load_from_hash! schedule_hash
# NOTE: load_from_hash!は追加・変更のみで削除されない為
destroy_jobs = Sidekiq::Cron::Job.all.map(&:name) - schedule_hash.keys
Sidekiq.logger.info "delete destroy_jobs: #{destroy_jobs.join(',')}"
destroy_jobs.each do |destroy_job|
Sidekiq::Cron::Job.find(destroy_job)&.destroy
end
end
ActiveRecord::Base.connection_pool.disconnect!
Rails.application.load_tasks # NOTE: Rake::Taskを実行する為
end
end
Sidekiq.configure_client do |config|
config.redis = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1') }
+ config.client_middleware do |chain|
+ chain.add SidekiqUniqueJobs::Middleware::Client
+ end
end
※ymlから削除やキー名を変更しても、Cronジョブが消えなかったので、序でに対応しています。
SidekiqUniqueJobsを使う
class RakeTaskExecuteWorker
include Sidekiq::Worker
sidekiq_options queue: :schedule,
retry: 0, # NOTE: cronから実行する為、リトライしない
+ lock: :until_and_while_executing, # NOTE: 二重起動防止(キュー登録と実行排他)
+ lock_ttl: 60 * 60 # 60分 # NOTE: 途中でインスタンスが落ちた時に解除される。超えると二重起動可能になってしまう
lockはどれを使うのがいいのか?
lock_ttl未設定で、同じジョブを2つ実行した場合、
:until_executing
←両方とも実行中に入る。2つ目は1つ目が終わってから実行される。
:until_executed
←キュー〜実行前まで。1つ目のみ実行中に入る。2つ目は実行されない。
:while_executing
←実行中のみ。1つ目のみ実行中に入る。2つ目は実行されない。
:until_and_while_executing
←キュー〜実行中。1つ目のみ実行中に入る。2つ目は実行されない。
Sidekiq-Cronで一番安全なのは:until_and_while_executing
だと思う。
lock_ttlはいくつにするのがいいのか?
未設定の場合は下記が自動設定される。retryhが0の場合は、1時間。
(retry_count + 1) * 60 * 60
想定最大実行時間よりも長くする必要がありますが難しい。。。
1時間毎のCron実行を想定して、60分にしましたが、状況に応じて伸ばした方が良さそう。
実行中にインスタンスが落ちたらどうなる?
ttlが切れるまで、実行されなくなる。
正常終了したらロックが解除されますが、以上終了時はロックが残る。
メモリ使い過ぎ等で、落ちる事は良くあるので、対策したいところ。
起動時にRedisロックを削除するようにする
複数台構成の場合は、構成に合わせて変更した方が良いと思いますが、
1インスタンスなら起動時に雑に削除するで問題ないので、下記を追加しました。
config/initializers/sidekiq.rb
config.on(:startup) do
+ # NOTE: 実行中にインスタンスが落ちるとRedisロックが残り、TTLが切れるまで起動できなくなる為(1インスタンスの場合のみ)
+ Sidekiq.redis do |redis_connection|
+ redis_lock_keys = redis_connection.keys('uniquejobs:*:LOCKED')
+ Sidekiq.logger.info "delete redis_lock_keys: #{redis_lock_keys.join(',')}"
+ redis_lock_keys.each do |redis_lock_key|
+ redis_connection.del(redis_lock_key)
+ end
+ end
閾値を超えたら気付けるようにしたい
on_conflict
で、二重起動したら通知したりも出来そうだけど、超える前に気付けるようにしたい。
監視と同じようにWarningとCriticalの閾値をTask毎に設定したい。Workerに実装してみました。
config/schedule/development.rb, production.rb
_test_sleep:
status: 'disabled' # MEMO: 本番も手動実行
cron: '0 10 * * 1' # (月曜日 10:00)
class: 'RakeTaskExecuteWorker'
queue: 'high' # NOTE: 手動実行の為
args:
task_name: 'test:sleep'
execution_interval: 420 # 7分
warnning_run_time: 360 # 6分
description: 'Sleepテスト'
_test_sleep_warning:
status: 'disabled' # MEMO: 本番も手動実行
cron: '0 10 * * 1' # (月曜日 10:00)
class: 'RakeTaskExecuteWorker'
queue: 'high' # NOTE: 手動実行の為
args:
task_name: 'test:sleep'
execution_interval: 360 # 6分
warnning_run_time: 300 # 5分
description: 'Sleepテスト(warning)'
_test_sleep_critical:
status: 'disabled' # MEMO: 本番も手動実行
cron: '0 10 * * 1' # (月曜日 10:00)
class: 'RakeTaskExecuteWorker'
queue: 'high' # NOTE: 手動実行の為
args:
task_name: 'test:sleep'
execution_interval: 300 # 5分
warnning_run_time: 240 # 4分
description: 'Sleepテスト(critical)'
app/workers/rake_task_execute_worker.rb
class RakeTaskExecuteWorker
include Sidekiq::Worker
sidekiq_options queue: :schedule,
retry: 0, # NOTE: cronから実行する為、リトライしない
lock: :until_and_while_executing, # NOTE: 二重起動防止(キュー登録と実行排他)
lock_ttl: 60 * 60 # 60分 # NOTE: 途中でインスタンスが落ちた時に解除される。超えると二重起動可能になってしまう
# RakeTaskを実行
def perform(*args)
task_name = args[0]['task_name']
Rails.logger.info("=== START #{self.class.name}.perform(#{task_name}) ===")
start_time = Time.current
Rake::Task[task_name].execute
check_run_time(start_time, args, args[0]['execution_interval'], args[0]['warnning_run_time'])
Rails.logger.info("=== END #{self.class.name}.perform(#{task_name}) ===")
end
private
def check_run_time(start_time, args, execution_interval, warnning_run_time)
current_run_time = Time.current - start_time
if execution_interval.present? && current_run_time > execution_interval.to_i
message = "[Critical] RakeTaskの実行時間(#{current_run_time.to_i}秒)がexecution_interval(#{execution_interval}秒)を超えました。二重起動されて問題が発生してないか確認してください。args: #{args}"
Rails.logger.fatal(message)
Sentry.capture_exception(StandardError.new(message))
elsif warnning_run_time.present? && current_run_time > warnning_run_time.to_i
message = "[Warning] RakeTaskの実行時間(#{current_run_time.to_i}秒)がwarnning_run_time(#{warnning_run_time}秒)を超えました。パフォーマンス改善や実行間隔見直を検討してください。args: #{args}"
Rails.logger.warn(message)
Sentry.capture_exception(StandardError.new(message))
end
end
end
lib/tasks/test/sleep.rake
# :nocov:
namespace :test do
desc 'Sleepテスト'
task sleep: :environment do |task|
Rails.logger.info "=== START #{task.name} ==="
max_count = 300 # 約5分
max_count.times do |count|
Rails.logger.info "#{count + 1}/#{max_count}" if (count + 1) % 5 == 0
sleep 1
end
Rails.logger.info "=== END #{task.name} ==="
end
end
# :nocov: