Martin Richard, alwaysdata
asynctestPresentation of asynctest and how it helps you testing asyncio code.
TestCase and related featuresTestCase and the loopasynctest.TestCase overrides unittest.TestCaseself.loopTestCase support of coroutinesimport asynctest, piedpiper
class Test_MyFeature(asynctest.TestCase):
async def setUp(self):
...
async def tearDown(self):
...
async def test_a_case(self):
self.server = await piedpiper.start_server()
self.addCleanup(self.server.close_and_wait_closed)
TestCase: advicessetUp() and tearDown(),setUpClass() and tearDownClass() can't be coroutinesimport asyncio, asynctest, piedpiper
class Test_MyFeature(asynctest.TestCase):
# Don't issue a new loop for each test, use the result of
# asyncio.get_event_loop()
use_default_loop = True
def setUpClass(cls):
# set your customized loop
asyncio.set_event_loop(piedpiper.get_loop())
import asynctest, asyncio, piedpiper
class Test_MyFeature(asynctest.TestCase):
@asyncio.coroutine
def test_old_style_coroutine(self):
data = yield from piedpiper.compress()
self.assertIsSmall(data)
@asynctest.fail_on(unused_loop=False)
def test_a_feature_without_async(self):
self.assertTrue("middleOut", piedpiper.method())
ClockedTestCaseimport asynctest, piedpiper
class Test_PeriodicRefresh(asynctest.TestCase):
async def test_refreshed(self):
downloader = piedpiper.Downloader()
self.assertEqual("nb_calls: 0", downloader.data)
downloader.refresh(5)
await self.advance(5)
# 5 seconds after refresh(5) was set, data is updated
self.assertEqual("nb_calls: 1", downloader.data)
await self.advance(10)
# Updated two more times.
self.assertEqual("nb_calls: 3", downloader.data)
import asynctest, asyncio, piedpiper
@asynctest.fail_on(active_handles=True)
class Test_MyFeature(asynctest.TestCase):
async def test_with_a_callback(self):
self.loop.call_later(1, piedpiper.callback)
# this test will fail, as this callback
# will not run during the test
async def test_with_a_cancelled_callback(self):
handle = self.loop.call_later(1, piedpiper.callback)
handle.cancel()
# this test will not fail
async def download(self):
host, port, query, ssl = self.get_parsed_url()
reader, writer = await asyncio.open_connection(host, port, ssl=ssl)
try:
writer.write(self._build_request(host, query))
response_headers = await reader.readuntil(b"/r/n/r/n")
code, payload_size = self._parse_response_headers(response_headers)
if code != 200:
raise RuntimeError(
"Server answered with unsupported code {}".format(code))
self.data = await reader.read(payload_size)
finally:
writer.close()
return self.datadef create_mocks():
reader = asynctest.mock.Mock(asyncio.StreamReader)
writer = asynctest.mock.Mock(asyncio.StreamWriter)
reader.read.return_value = b"MiddleOut"
reader.readuntil.return_value = b"HTTP/1.1 200 OK\r\n..."
return reader, writer
asynctest.mock.Mock() uses its spec and detects coroutines,asynctest.mock.CoroutineMock()asynctest.mock.patch@patch("asyncio.open_connection", side_effect=create_mocks)
async def test_download_resource(self):
downloader = ResourceDownloader(
"http://piedpiper.com/compression")
payload = await downloader.download()
self.assertEqual(payload, b"MiddleOut")
@patch decorating a coroutineasync def test_download_resource(self):
with patch("asyncio.open_connection", side_effect=create_mocks):
downloader = ResourceDownloader(
"http://piedpiper.com/compression")
payload = await downloader.download()
self.assertEqual(payload, b"MiddleOut")
But what if a concurrent task must not be affected by the patch?
@patch decorating a coroutine@patch("asyncio.open_connection",
side_effect=create_mocks, scope=asynctest.LIMITED)
async def test_download_resource(self):
downloader = ResourceDownloader(
"http://piedpiper.com/compression")
payload = await downloader.download()
self.assertEqual(payload, b"MiddleOut")
The patch is disabled when the coroutine (task) yields to the scheduler.
import asyncio, asynctest
class Test_LowLevel(asynctest.TestCase):
async def test_using_selector(self):
mock_socket = asynctest.selector.SocketMock()
event = asyncio.Event()
self.loop.add_reader(mock_socket, event.set)
asynctest.selector.set_read_ready(mock_socket, self.loop)
try:
await asyncio.wait_for(event.wait(), timeout=1)
finally:
self.loop.remove_reader(mock_socket)
TestSelectorFileMock(), SSLSocketMock(), ...TestCase (but not with proactor)@asynctest.fail_on(active_selector_callbacks=True)
def test_using_selector(self):
mock_socket = asynctest.selector.SocketMock()
event = asyncio.Event()
self.loop.add_reader(mock_socket, event.set)
asynctest.selector.set_read_ready(mock_socket, self.loop)
try:
await asyncio.wait_for(event.wait(), timeout=1)
finally:
self.loop.remove_reader(mock_socket)
asynctestpytest-asyncio: less features, but you can use asynctest with itREADME or CONTRIBUTING file$ git clone$ python -m unittest (or nose, pytest, tox, ...)martius@martiusweb.nethttps://marti.usMartiusweb