user1475412 user1475412 - 2 months ago 16
Python Question

SQLAlchemy ThreadPoolExecutor "Too many clients"

I wrote a script with this sort of logic in order to insert many records into a PostgreSQL table as they are generated.

#!/usr/bin/env python3
import asyncio
from concurrent.futures import ProcessPoolExecutor as pool
from functools import partial

import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base


metadata = sa.MetaData(schema='stackoverflow')
Base = declarative_base(metadata=metadata)


class Example(Base):
__tablename__ = 'example'
pk = sa.Column(sa.Integer, primary_key=True)
text = sa.Column(sa.Text)


sa.event.listen(Base.metadata, 'before_create',
sa.DDL('CREATE SCHEMA IF NOT EXISTS stackoverflow'))

engine = sa.create_engine(
'postgresql+psycopg2://postgres:password@localhost:5432/stackoverflow'
)
Base.metadata.create_all(engine)
session = sa.orm.sessionmaker(bind=engine, autocommit=True)()


def task(value):
engine.dispose()
with session.begin():
session.add(Example(text=value))


async def infinite_task(loop):
spawn_task = partial(loop.run_in_executor, None, task)
while True:
await asyncio.wait([spawn_task(value) for value in range(10000)])


def main():
loop = asyncio.get_event_loop()
with pool() as executor:
loop.set_default_executor(executor)
asyncio.ensure_future(infinite_task(loop))
loop.run_forever()
loop.close()


if __name__ == '__main__':
main()


This code works just fine, creating a pool of as many processes as I have CPU cores, and happily chugging along forever. I wanted to see how threads would compare to processes, but I could not get a working example. Here are the changes I made:

from concurrent.futures import ThreadPoolExecutor as pool

session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True)
Session = sa.orm.scoped_session(session_maker)


def task(value):
engine.dispose()
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session once the work is done
Session.remove()


This version runs for a while before a flood of "too many clients" exceptions:

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: sorry, too many clients already


What am I missing?

Answer

It turns out that the problem is engine.dispose(), which, in the words of Mike Bayer (zzzeek) "is leaving PG connections lying open to be garbage collected."

Source: https://groups.google.com/forum/#!topic/sqlalchemy/zhjCBNebnDY

So the updated task function looks like this:

def task(value):
    # create new session per thread
    session = Session()
    with session.begin():
        session.add(Example(text=value))
    # remove session once the work is done
    Session.remove()
Comments