user4979733 user4979733 - 1 year ago 162
Python Question

Python: Using sqlite3 with multiprocessing

I have a SQLite3 DB. I need to parse 10000 files. I read some data from each file, and then query the DB with this data to get a result. My code works fine in a single process environment. But I get an error when trying to use the mulitprocessing Pool.

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files:
foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4)
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB


I get the following error: sqlite3.ProgrammingError: Base Cursor.__init__ not called.

My DB class is implemented as follows:

def open_db(sqlite_file):
"""Open SQLite database connection.

Args:
sqlite_file -- File path

Return:
Connection
"""

log.info('Open SQLite database %s', sqlite_file)
try:
conn = sqlite3.connect(sqlite_file)
except sqlite3.Error, e:
log.error('Unable to open SQLite database %s', e.args[0])
sys.exit(1)

return conn

def close_db(conn, sqlite_file):
"""Close SQLite database connection.

Args:
conn -- Connection
"""

if conn:
log.info('Close SQLite database %s', sqlite_file)
conn.close()

class MapDB:

def __init__(self, sqlite_file):
"""Initialize.

Args:
sqlite_file -- File path
"""

# 1. Open database.
# 2. Setup to receive data as dict().
# 3. Get cursor to execute queries.
self._sqlite_file = sqlite_file
self._conn = open_db(sqlite_file)
self._conn.row_factory = sqlite3.Row
self._cursor = self._conn.cursor()

def close(self):
"""Close DB connection."""

if self._cursor:
self._cursor.close()
close_db(self._conn, self._sqlite_file)

def check(self):
...

def get_driver_net(self, net):
...

def get_cell_id(self, net):
...


Function foo() looks like this:

def foo(f, x1, x2, db):

extract some data from file f
r1 = db.get_driver_net(...)
r2 = db.get_cell_id(...)


The overall not working implementation is as follows:

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)
pool.close()
mapdb.close()


To fix this, I think I need to create the MapDB() object inside each pool worker (so have 4 parallel/independent connections). But I'm not sure how to do this. Can someone show me an example of how to accomplish this with Pool?

Answer Source

What about defining foo like this:

def foo(f, x1, x2, db_path):
    mapdb = MapDB(db_path)
    ... open mapdb
    ... process data ...
    ... close mapdb

and then change your pool.map call to:

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)    

Update

Another option is to handle the worker threads yourself and distribute work via a Queue.

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
  mapdb = ...open the sqlite database
  while True:
    item = q.get()
    if item[0] == "file":
      file = item[1]
      ... process file ...
      q.task_done()
    else:
      break
  ...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
  worker = Thread(target=worker)
  worker.daemon = True
  worker.start

# Place work on the Queue

for x in ...list of files...:
  q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
  q.put(("end",))

# Wait for all work to be done.

q.join()

The termination tokens are used to ensure that the sqlite connections are closed - in case that matters.