Eli Courtwright Eli Courtwright - 7 months ago 41
Python Question

Can SQLAlchemy events be used to update a denormalized data cache?

For performance reasons, I've got a denormalized database where some tables contain data which has been aggregated from many rows in other tables. I'd like to maintain this denormalized data cache by using SQLAlchemy events. As an example, suppose I was writing forum software and wanted each

Thread
to have a column tracking the combined word count of all comments in the thread in order to efficiently display that information:

class Thread(Base):
id = Column(UUID, primary_key=True, default=uuid.uuid4)
title = Column(UnicodeText(), nullable=False)
word_count = Column(Integer, nullable=False, default=0)

class Comment(Base):
id = Column(UUID, primary_key=True, default=uuid.uuid4)
thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False)
thread = relationship('Thread', backref='comments')
message = Column(UnicodeText(), nullable=False)

@property
def word_count(self):
return len(self.message.split())


So every time a comment is inserted (for the sake of simplicity let's say that comments are never edited or deleted), we want to update the
word_count
attribute on the associated
Thread
object. So I'd want to do something like

def after_insert(mapper, connection, target):
thread = target.thread
thread.word_count = sum(c.word_count for c in thread.comments)
print "updated cached word count to", thread.word_count

event.listen(Comment, "after_insert", after_insert)


So when I insert a
Comment
, I can see the event firing and see that it has correctly calculated the word count, but that change is not saved to the
Thread
row in the database. I don't see any caveats about updated other tables in the after_insert documentation, though I do see some caveats in some of the others, such as after_delete.

So is there a supported way to do this with SQLAlchemy events? I'm already using SQLAlchemy events for lots of other things, so I'l like to do everything that way instead of having to write database triggers.

Answer

the after_insert() event is one way to do this, and you might notice it is passed a SQLAlchemy Connection object, instead of a Session as is the case with other flush related events. The mapper-level flush events are intended to be used normally to invoke SQL directly on the given Connection:

@event.listens_for(Comment, "after_insert")
def after_insert(mapper, connection, target):
    thread_table = Thread.__table__
    thread = target.thread
    connection.execute(
            thread_table.update().
             where(thread_table.c.id==thread.id).
             values(word_count=sum(c.word_count for c in thread.comments))
    )
    print "updated cached word count to", thread.word_count

what is notable here is that invoking an UPDATE statement directly is also a lot more performant than running that attribute change through the whole unit of work process again.

However, an event like after_insert() isn't really needed here, as we know the value of "word_count" before the flush even happens. We actually know it as Comment and Thread objects are associated with each other, and we could just as well keep Thread.word_count completely fresh in memory at all times using attribute events:

def _word_count(msg):
    return len(msg.split())

@event.listens_for(Comment.message, "set")
def set(target, value, oldvalue, initiator):
    if target.thread is not None:
        target.thread.word_count += (_word_count(value) - _word_count(oldvalue))

@event.listens_for(Comment.thread, "set")
def set(target, value, oldvalue, initiator):
    # the new Thread, if any
    if value is not None:
        value.word_count += _word_count(target.message)

    # the old Thread, if any
    if oldvalue is not None:
        oldvalue.word_count -= _word_count(target.message)

the great advantage of this method is that there's also no need to iterate through thread.comments, which for an unloaded collection means another SELECT is emitted.

still another method is to do it in before_flush(). Below is a quick and dirty version, which can be refined to more carefully analyze what has changed in order to determine if the word_count needs to be updated or not:

@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for obj in session.new | session.dirty:
        if isinstance(obj, Thread):
            obj.word_count = sum(c.word_count for c in obj.comments)
        elif isinstance(obj, Comment):
            obj.thread.word_count = sum(c.word_count for c in obj.comments)

I'd go with the attribute event method as it is the most performant and up-to-date.