Alexis R.L. Alexis R.L. - 1 month ago 18
Python Question

Implementing multiprocessing in a loop scraper and appending the data

I am making a web scraper to build a database. The site I plan to use has index pages each containing 50 links. The amount of pages to be parsed is estimated to be around 60K and up, this is why I want to implement multiprocessing.

Here is some pseudo-code of what I want to do:

def harvester(index):
main=dict()
....
links = foo.findAll ( 'a')
for link in links:
main.append(worker(link))
# or maybe something like: map_async(worker(link))
def worker(url):
''' this function gather the data from the given url'''
return dictionary


Now what I want to do with that is to have a certain number of worker function to gather data in parallel on different pages. This data would then be appended to a big dictionary located in harvester or written directly in a csv file by the worker function.


  • I'm wondering how I can implement parallelism. I have done a faire
    amount of research on using gevent, threading and multiprocessing but
    I am not sure how to implement it.

  • I am also not sure if appending data to a large dictionary or writing
    directly in a csv using DictWriter will be stable with that many input at the same time.



Thanks

Answer

I propose you to split your work into separate workers which communicate via Queues.

Here you mostly have IO wait time (crawling, csv writing)

So you can do the following (not tested, just see the idea):

import threading
import Queue
class CsvWriter(threading.Thread):

    def __init__(self, resultq):
        super(CsvWriter, self).__init__()
        self.resultq = resultq
        self.writer = csv.DictWriter(open('results.csv', 'rb'))

    def run(self):
        done = False
        while not done:
            row = self.requltq.get()
            if row != -1:
                self.writer.writerow(row)
            else:
                done = True

class Crawler(threading.Thread):

    def __init__(self, inputqueue, resultq):
        super(Crawler, self).__init__()
        self.iq = inputq
        self.oq = resultq

    def run(self):
        done = False
        while not done:
           link = self.iq.get()
           if link != -1:
               result = self.extract_data(link)
               self.oq.put(result)
           else:
               done = True

     def extract_data(self, link):
          # crawl and extract what you need and return a dict
          pass


def main():
   linkq = Queue.Queue()
    for url in your_urls:
       linkq.put(url)
    resultq = Queue.Queue()
    writer = CsvWriter(resultq)
    writer.start()
    crawlers = [Crawler(linkq, resultq) for _ in xrange(10)]
    [c.start() for c in crawlers]
    [linkq.put(-1) for _ in crawlers]
    [c.join() for c in crawlers]
    resultq.put(-1)
    writer.join()

This code should work (fix possible typos) and make it to exit when all the urls are finished

Comments