Te-jé Rodgers Te-jé Rodgers - 1 month ago 17
Python Question

how can I asynchronously map/filter an asynchronous iterable?

Let's say I have an asynchronous iterable that I can pass over using

async for
, how then can I then map and filter it to a new asynchronous iterator? The following code which is an adaptation of how I'd do the same thing with a synchronous iterable doesn't work, since
yield
isn't allowed inside
async def
s.

async def mapfilter(aiterable, p, func):
async for payload in aiterable:
if p(payload):

# This part isn't allowed, but hopefully it should be clear
# what I'm trying to accomplish.
yield func(payload)

Answer

You can't use yield inside coroutines. To implement your idea, only way I see is to implement Asynchronous Iterator. If I'm right, something like that:

class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()  # StopAsyncIteration would be raise here on no new values
            if self.p(payload):
                return self.func(payload)

Let's test it. Here's complete example with helper arange class (I took it from here):

import asyncio


class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration


class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()
            if self.p(payload):
                return self.func(payload)


async def main():
    aiterable = arange(5)
    p = lambda x: bool(x>2)
    func = lambda x: x*2

    async for i in MapFilter(aiterable, p, func):
        print(i)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

6
8