ChristofferJoergensen ChristofferJoergensen - 1 month ago 15
Ruby Question

How best to keep a job queue clean of retry/duplicate jobs (using sidekiq and redis-semaphore)

I have a rails app that fetches a lot of emails from multiple IMAP accounts.


  • I use sidekiq to handle the jobs.

  • I use sidetiq to schedule the jobs.

  • I use redis-semaphore to ensure that recurring jobs for the same user don't stumble upon each other.



2 issues though:


  • 1: When a job hits "if s.lock" redis-semaphore puts it on hold until all previous jobs have finished. I need the job to be cancelled instead of being queued.

  • 2: If during a job an exception is raised, resulting in a crash, sidekiq will put the job back into the queue for a retry. I need the job to be cancelled instead of being queued. Putting "sidekiq_options :retry => false" into the code does not seem to make a difference.



My code:

class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable

tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|

if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end

class ImapJob
include Sidekiq::Worker

def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
if s.lock
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
end
end

Answer

1. Create a Redis subclass and overload blpop to accept -1 for non-blocking use of lpop.

redis-semaphore calls @redis.blpop in Redis::Semaphore#lock. While you could overload the lock method to use @redis.lpop instead, a much simpler approach would be to pass a custom instance of Redis to the semaphore.

Place the following in the lib of your rails app and require it in your config/initializers/sidekiq.rb (or do whatever your preference might be for loading the following class).

class NonBlockingRedis < Redis
  def blpop(key, timeout)
    if timeout == -1
      result = lpop(key)
      return result if result.nil?
      return [key, result]
    else
      super(key, timeout)
    end
  end
end

Whenever you call Redis::Semaphore.new, pass a :redis key with a new instance of the NonBlockingRedis class.

Call s.lock with -1 as an argument to use lpop instead of blpop.

s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
  user = User.where(_id: user_id).first
  emails = ImapMails.receive_mails(user)
  s.unlock
end

2. Using sidekiq_options retry: false in your worker class should work, see below for an example.

In your question, you didn't specify which worker you were having problems with jobs ending up in the retry queue. Since FetchMailsJobs ends up enqueing ImapJob jobs, an exception in the former may cause it to appear that the ImapJob is being re-queued.

With your semaphore lock, it would also be a good idea to wrap your work in a begin rescue ensure block.

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  sidekiq_options retry: false

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  sidekiq_options retry: false

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock - 1
      begin
        user = User.where(_id: user_id).first
        emails = ImapMails.receive_mails(user)
      rescue => e
        # ignore; do nothing
      ensure
        s.unlock
      end
    end
  end
end

See sidekiq Advanced Options: workers for more information.

Comments