satnam satnam - 4 months ago 14
Ruby Question

DB update script skips rows to update if I use multithreading

I have a table that has about 40 million rows (about 120 million IDs) and I need to update some of the rows using a script that I wrote in Ruby. My DB is a PostgreSQL DB.

The problem that I'm having is that if I write a single threaded script, it runs very very slow (since it has to update each row separately, and there is time lost in opening and closing the connection). On the positive side, it successfully updates all the rows.

So to speed it up, I thought I'd write a multithreaded script so that I could utilize multiple connections. But the problem now is that some of the rows are just not updated without telling me why (my script has exception handling).

Here is my script that uses multithreading.

require 'sequel'
require 'logger'
......

DB = Sequel.connect('DB Credentials') # max connections = 20


starting_id = 1
total_objects = 120061827

queue = Queue.new
Thread.abort_on_exception = true

collector = Thread.new {
(starting_id..total_objects).step(100000) do |id|
lower_limit = id
upper_limit = id + 100000
if upper_limit > total_objects
upper_limit = total_objects + 1
end

objects = DB["
SELECT id, extra_fields
FROM table
WHERE json_typeof(extra_fields->'field') = 'object'
AND id >= #{lower_limit}
AND id < #{upper_limit}
"]

while queue.length >= 100
progress_logger.info("Queue length limit reached. Sleeping Collector")
sleep(5)
end

queue.push(objects)
progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}")
end
Thread.exit
}

def update_extra_fields(object, extra_fields, updater_logger, error_logger)
begin
update_query = DB["
UPDATE table
SET extra_fields = '#{JSON.generate(extra_fields)}'
WHERE id = #{object[:id]}
"]

update_result = update_query.update
updater_logger.info("Updated Photo ID: #{object[:id]}") if update_result == 1
rescue Sequel::Error => e
error_logger.info("Could not update 'extra_fields' for #{object[:id]} because #{e.message}")
end
end

def construct_new_tags(tags)
#logic
return new_tags
end

def update_objects(objects, updater_logger, error_logger)
objects.each do |object|
extra_fields = JSON.parse(object[:extra_fields])
tags = extra_fields["field"]
new_tags = construct_new_tags(tags)
extra_fields["new_field"] = new_tags
update_extra_fields(object, extra_fields, updater_logger, error_logger)
end
end

num_threads = 0
all_threads = []
consumer = Thread.new {
while queue.length > 0
if num_threads <= 15
objects_block = queue.shift

all_threads << Thread.new {
num_threads += 1
update_objects(objects_block, updater_logger, error_logger)
num_threads -= 1
}
else
progress_logger.info("Threads limit reached. Sleeping Updater")
sleep(1)
end
end
}

collector.join
consumer.join
all_threads.each do |thread|
thread.join
end


puts "Queue END reached: EXIT"



  1. Nothing ever gets logged in the error_logger

  2. The number of rows updated each time I run the script is different. So I'm confident that there isn't a specific pattern that I'm missing rows to update etc.

  3. If I write a single threaded script that follows the same logic. All the rows are successfully updates.


Answer

I think you have an error in your collector.

When the queue length is larger than 100 you are simply throwing your data away and queue.push(photos) should be queue.push(objects).

        if queue.length >= 100
            progress_logger.info("Queue length limit reached. Sleeping Collector")
            sleep(5)
        else
            queue.push(photos)
            progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}")
        end

You probably want something like this:

queue = Queue.new
...
        while queue.length >= 100
            progress_logger.info("Queue length limit reached. Sleeping Collector")
            sleep(5)
        end

        queue.push(objects)
        progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}")

...

all_threads = []

consumer = Thread.new {
  loop {
  if num_threads <= 15
    objects_block = queue.shift

    all_threads << Thread.new {
      num_threads += 1
      update_objects(objects_block, updater_logger, error_logger)
      num_threads -= 1
    }
  else
    progress_logger.info("Threads limit reached. Sleeping Updater")
    sleep(1)
  end
  }
}

collector.join

while !queue.empty?
  sleep(10)
end

all_threads.each do |thread|
  thread.join
end
Comments