Tagc Tagc - 3 months ago 17
Python Question

How can I notify RxPY observers on separate threads using asyncio?

(Note: The background for this problem is pretty verbose, but there's an SSCCE at the bottom that can be skipped to)

Background



I'm trying to develop a Python-based CLI to interact with a web service. In my codebase I have a
CommunicationService
class that handles all direct communication with the web service. It exposes a
received_response
property that returns an
Observable
(from RxPY) that other objects can subscribe to in order to be notified when responses are received back from the web service.

I've based my CLI logic on the
click
library, where one of my subcommands is implemented as below:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)
if response_handler is None:
return None

while True:
response = await self.on_response
success, value = response_handler(response)
print(success, value)
if success:
return value


What's happening here (in the case that
response_handler
is not
None
) is that the subcommand is behaving as a coroutine that awaits responses from the web service (
self.on_response == CommunicationService.received_response
) and returns some processed value from the first response it can handle.

I'm trying to test the behaviour of my CLI by creating test cases in which
CommunicationService
is completely mocked; a fake
Subject
is created (which can act as an
Observable
) and
CommunicationService.received_response
is mocked to return it. As part of the test, the subject's
on_next
method is invoked to pass mock web service responses back to the production code:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
context.mock_received_response_subject.on_next(context.text)


I use a click 'result callback' function that gets invoked at the end of the CLI invocation and blocks until the coroutine (the subcommand) is done:

@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
if task:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task)
loop.close()
print('RESULT:', result)


Problem



At the start of the test, I run
CliRunner.invoke
to fire off the whole shebang. The problem is that this is a blocking call and will block the thread until the CLI has finished and returned a result, which isn't helpful if I need my test thread to carry on so it can produce mock web service responses concurrently with it.

What I guess I need to do is run
CliRunner.invoke
on a new thread using
ThreadPoolExecutor
. This allows the test logic to continue on the original thread and execute the
@when
step posted above. However, notifications published with
mock_received_response_subject.on_next
do not seem to trigger execution to continue within the subcommand.

I believe the solution would involve making use of RxPY's
AsyncIOScheduler
, but I'm finding the documentation on this a little sparse and unhelpful.

SSCCE



The snippet below captures what I hope is the essence of the problem. If it can be modified to work, I should be able to apply the same solution to my actual code to get it to behave as I want.

import asyncio
import logging
import sys
import time

import click
from click.testing import CliRunner
from rx.subjects import Subject

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()

thread_loop = asyncio.new_event_loop()


@click.group()
def cli():
asyncio.set_event_loop(thread_loop)


@cli.resultcallback()
def result_handler(task, **_):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task) # Should block until subject publishes value
loop.close()

print(result)


@cli.command()
async def get_web_response():
return await web_response_observable


def test():
runner = CliRunner()
future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
time.sleep(1)
web_response_subject.on_next('foo') # Simulate reception of web response.
time.sleep(1)
result = future.result()
print(result.output)

logging.basicConfig(
level=logging.DEBUG,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)

test()


Current Behaviour

The program hangs when run, blocking at
result = loop.run_until_complete(task)
.

Acceptance Criteria

The program terminates and prints
foo
on
stdout
.

Update 1



Based on Vincent's help I've made some changes to my code.

Relay.enabled
(the subcommand that awaits responses from the web service in order to process them) is now implemented like this:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)

if response_handler is None:
return None

return await self.on_response \
.select(response_handler) \
.where(lambda result, i: result[0]) \
.select(lambda result, index: result[1]) \
.first()


I wasn't quite sure how
await
would behave with
RxPY
observables - would they return execution to the caller on each element generated, or only when the observable has completed (or errored?). I now know it's the latter, which honestly feels like the more natural choice and has allowed me to make the implementation of this function feel a lot more elegant and reactive.

I've also modified the test step that generates mock web service responses:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = asyncio.get_event_loop()
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)


Unfortunately, this will not work as it stands, since the CLI is being invoked in its own thread...

@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
loop = asyncio.get_event_loop()
if 'async.cli' in context.tags:
context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
else:
...


And the CLI creates its own thread-private event loop when invoked...

def cli(context, hostname, port):
_initialize_logging(context.meta['click_log.core.logger']['level'])

# Create a new event loop for processing commands asynchronously on.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
...


What I think I need is a way to allow my test steps to invoke the CLI on a new thread and then fetch the event loop it's using:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = _get_cli_event_loop() # Needs to be implemented.
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)


Update 2



There doesn't seem to be an easy way to get the event loop that a particular thread creates and uses for itself, so instead I took Victor's advice and mocked
asyncio.new_event_loop
to return an event loop that my test code creates and stores:

def _apply_mock_event_loop_patch(context):
# Close any already-existing exit stacks.
if hasattr(context, 'mock_event_loop_exit_stack'):
context.mock_event_loop_exit_stack.close()

context.test_loop = asyncio.new_event_loop()
print(context.test_loop)
context.mock_event_loop_exit_stack = ExitStack()
context.mock_event_loop_exit_stack.enter_context(
patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))


I change my 'mock web response received' test step to do the following:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = context.test_loop
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)


The great news is that I'm actually getting the
Relay.enabled
coroutine to trigger when this step gets executed!

The only problem now is the final test step in which I await the future I got from executing the CLI in its own thread and validate that the CLI is sending this on
stdout
:

@then('the CLI should print "{output}"')
def step_impl(context, output):
if 'async.cli' in context.tags:
loop = asyncio.get_event_loop() # main loop, not test loop
result = loop.run_until_complete(context.async_result)
else:
result = context.result
assert_that(result.output, equal_to(output))


I've tried playing around with this but I can't seem to get
context.async_result
(which stores the future from
loop.run_in_executor
) to transition nicely to
done
and return the result. With the current implementation, I get an error for the first test (
1.1
) and indefinite hanging for the second (
1.2
):

@mock.comms @async.cli @wip
Scenario Outline: Querying relay enable state -- @1.1 # testcube/tests/features/relay.feature:45
When the user queries the enable state of relay 0 # testcube/tests/features/steps/relay.py:17 0.003s
Then the CLI should query the web service about the enable state of relay 0 # testcube/tests/features/steps/relay.py:48 0.000s
When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
"""
{'module':'relays','path':'relays[0].enabled','data':[True]}'
"""
Then the CLI should print "True" # testcube/tests/features/steps/core.py:94 0.003s
Traceback (most recent call last):
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
match.run(runner.context)
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
self.func(context, *args, **kwargs)
File "testcube/tests/features/steps/core.py", line 99, in step_impl
result = loop.run_until_complete(context.async_result)
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
output = out.getvalue()
ValueError: I/O operation on closed file.

Captured stdout:
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>

@mock.comms @async.cli @wip
Scenario Outline: Querying relay enable state -- @1.2 # testcube/tests/features/relay.feature:46
When the user queries the enable state of relay 1 # testcube/tests/features/steps/relay.py:17 0.005s
Then the CLI should query the web service about the enable state of relay 1 # testcube/tests/features/steps/relay.py:48 0.001s
When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
"""
{'module':'relays','path':'relays[1].enabled','data':[False]}'
"""
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
Then the CLI should print "False" # testcube/tests/features/steps/core.py:94


Chapter 3: Finale



Screw all this asynchronous multi-threaded stuff, I'm too dumb for it.

First off, instead of describing the scenario like this...

When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
"""
{"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
"""
Then the CLI should print "<relay_enabled>"


We describe it like this:

Given the communications service will respond to requests:
"""
{"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
"""
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"


Implement the new given step:

@given('the communications service will respond to requests')
def step_impl(context):
response = context.text

def publish_mock_response(_):
loop = context.test_loop
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)

# Configure the mock comms service to publish a mock response when a request is made.
instance = context.mock_comms.return_value
instance.send_request.on_next.side_effect = publish_mock_response


BOOM

2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s

Answer

I can see two problems with your code:

  • asyncio is not thread-safe, unless you use call_soon_threadsafe or run_coroutine_threadsafe. RxPy doesn't use any of those in Observable.to_future, so you have to access RxPy objects in the same thread that runs the asyncio event loop.
  • RxPy sets the result of the future when on_completed is called, so that awaiting for an observable returns the last object emitted. This means you have to call both on_next and on_completed to get await to return.

Here is a working example:

import click
import asyncio
from rx.subjects import Subject
from click.testing import CliRunner

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
main_loop = asyncio.get_event_loop()

@click.group()
def cli():
    pass

@cli.resultcallback()
def result_handler(task, **_):
    future = asyncio.run_coroutine_threadsafe(task, main_loop)
    print(future.result())

@cli.command()
async def get_web_response():
    return await web_response_observable

def test():
    runner = CliRunner()
    future = main_loop.run_in_executor(
        None, runner.invoke, cli, ['get_web_response'])
    main_loop.call_later(1, web_response_subject.on_next, 'foo')
    main_loop.call_later(2, web_response_subject.on_completed)
    result = main_loop.run_until_complete(future)
    print(result.output, end='')

if __name__ == '__main__':
    test()