ourlord ourlord - 5 months ago 41
Python Question

Python asyncio task got bad yield

I am confused about how to play around with the

asyncio
module in Python 3.4. I have a
searching
API for a search engine, and want to each search request to be run either parallel, or asynchronously, so that I don't have to wait for one search finish to start another.

Here is my high-level searching API to build some objects with the raw search results. The search engine itself is using some kind of asyncio mechanism, so I won't bother with that.

# No asyncio module used here now
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
# do some raw searching according to args and kwargs and build the wrapped results
...
return ret


To try to async the requests, I wrote following test case to test how I can interact my stuff with the
asyncio
module.

# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
r = yield from f(*args, **kwargs)
return r

s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()


By running with pytest, it will return a
RuntimeError: Task got bad yield : {results from searching...}
, when it hits the line
r = yield from ...
.

I also tried another way.

# same handle as above
def handle(..):
....
s = search()
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(handle(s.searching, arg11, arg12, ...)),
asyncio.async(handle(s.searching, arg21, arg22, ...)),
...
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


By running this test case by pytest, it passes but some weird exception from the search engine will raise. And it says
Future/Task exception was never retrieved
.

Things I wish to ask:


  1. For my 1st try, is that the right way to use
    yield from
    , by returning the actual result from a function call?

  2. I think I need to add some sleep to my 2nd test case to wait for the task finish, but how should I do that? And how can I get my function calls to return in my 2nd test case?

  3. Is that a good way to implement asyncio with an existing module, by creating an async handler to handle requests?

  4. If the answer to question 2 is NO, does every client calls to the class
    search
    needs to include
    loop = get_event_loop()
    this kind of stuffs to async the requests?


Answer

The problem is that you can't just call existing synchronous code as if it was an asyncio.coroutine and get asynchronous behavior. When you call yield from searching(...), you're only going to get asynchronous behavior if searching itself is actually an asyncio.coroutine, or at least returns an asyncio.Future. Right now, searching is just a regular synchronous function, so calling yield from searching(...) is just going to throw an error, because it doesn't return a Future or coroutine.

To get the behavior you want, you'll need to have an asynchronous version of searching in addition to a synchronous version (or just drop the synchronous version altogether if you don't need it). You have a few options to support both:

  1. Rewrite searching as an asyncio.coroutine that it uses asyncio-compatible calls to do its I/O, rather than blocking I/O. This will make it work in an asyncio context, but it means you won't be able to call it directly in a synchronous context anymore. Instead, you'd need to also provide an alternative synchronous searching method that starts an asyncio event loop and calls return loop.run_until_complete(self.searching(...)). See this question for more details on that.
  2. Keep your synchronous implementation of searching, and provide an alternative asynchronous API that uses BaseEventLoop.run_in_executor to run your the searching method in a background thread:

    class search(object):
      ...
      self.s = some_search_engine()
      ...
      def searching(self, *args, **kwargs):
        ret = {}
        ...
        return ret
    
       @asyncio.coroutine
       def searching_async(self, *args, **kwargs):
          loop = kwargs.get('loop', asyncio.get_event_loop())
          try:
              del kwargs['loop']  # assuming searching doesn't take loop as an arg
          except KeyError:
              pass
          r = yield from loop.run_in_executor(None, self.searching, *args)  # Passing None tells asyncio to use the default ThreadPoolExecutor
          return r
    

    Testing script:

    s = search()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s.searching_async(arg1, arg2, ...))
    loop.close()
    

    This way, you can keep your synchronous code as is, and at least provide methods that can be used in asyncio code without blocking the event loop. It's not as clean a solution as it would be if you actually used asynchronous I/O in your code, but its better than nothing.

  3. Provide two completely separate versions of searching, one that uses blocking I/O, and one that's asyncio-compatible. This gives ideal implementations for both contexts, but requires twice the work.