Hannu Hannu - 2 years ago 98
Python Question

SQLAlchemy mass deletion from related tables

I posted a couple of days ago a question about a Python script hogging a lot of memory Understanding Python memory usage

After a bit of trial and error I have managed to localise the growth in memory use to a function that does a database cleanup. My simplified schema is as follows (orm):

class A(Base):
id = Column(Integer, primary_key=True)
foo = Column(String)
bar = Column(DateTime)
replication_confirmed = Column(Boolean, default=False)

class B(Base):
id = Column(Integer, primary_key=True)
xyzzy = Column(Integer)
barf = Column(String)
replication_confirmed = Column(Boolean, default=False)
aref = Column(Integer, ForeignKey("A.id"))


To make this simple, this script acts as a kind of a replication server between computers. Another program sends RabbitMQ messages to this, requests for updates and gets updated and new rows from A and B in json messages. There are several other processes that actually modify A and B, and every time they update something, they change replication confirmed to False, and every time data is sent to the "main" server, this flag is changed to True.

To avoid clutter in this purely transitional database, I do a cleanup now and then, and this is where the memory growth happens. What I intend to do is to delete all rows from both tables that have
replication_confirmed = True
, while preserving integrity as the replication is likely to be out of sync between the tables.

This is my deletion code (simplified, it does have error handling etc). It works, but I assume it now loads a lot of stuff into memory from tables that have grown massively since the last cleanup, and this causes the process to request a lot of memory, which it then never frees. As there are many processes operating the same tables, I use
with_for_update()
to lock the to-be-affected rows and prevent the writer processes modifying these tables while doing the cleanup.

_todc = session.query(A).filter(A.replication_confirmed == True).\
with_for_update().all()
_aids = [i.id for i in _todc]

_todb = session.query(B).filter(B.aref in_(_aids),
B.replication_confirmed == True).all()
for _tb in _todb:
_session.delete(_tb)
_remaining = session.query(B).all()
_remaining_ids = [i.id for i in _remaining]
_adelete = session.query(A).filter(A.replication_confirmed == True,
A.id.notin_(_remaining_ids)).all()
for _ta in _adelete:
session.delete(_ta)
session.commit()


This all works but there must be a better way of doing this. Due to the nature of the replication process it is possible that the tables are not entirely in sync in terms of replication. It is possible that rows from B have been replicated, while referenced rows from A are still pending. Or vice versa.

What would be the more efficient way of doing this? This now reads everything from the affected tables to memory, but I probably could do this all on database level without processing data in memory at all. I just do not know how to do it.

Any ideas? This is Python 2.7, Sqlalchemy and postgresql9.5.

Answer Source

If I've understood you correctly, then 2 deletes with joins should do:

In [11]: session.query(B).\
    ...:     filter(B.replication_confirmed,
    ...:            session.query(A).
    ...:                filter(A.replication_confirmed,
    ...:                       A.id == B.aref).
    ...:                exists()).\
    ...:     delete(synchronize_session=False)

followed by

In [12]: session.query(A).\
    ...:     filter(A.replication_confirmed,
    ...:            ~session.query(B).
    ...:                filter(B.aref == A.id).
    ...:                exists()).\
    ...:     delete(synchronize_session=False)

though here rows in A have not been locked and depending on your isolation level might change between the 2 queries.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download