diff options
Diffstat (limited to 'venv/lib/python3.9/site-packages/trio/tests/test_channel.py')
-rw-r--r-- | venv/lib/python3.9/site-packages/trio/tests/test_channel.py | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/trio/tests/test_channel.py b/venv/lib/python3.9/site-packages/trio/tests/test_channel.py new file mode 100644 index 00000000..fd990fb3 --- /dev/null +++ b/venv/lib/python3.9/site-packages/trio/tests/test_channel.py @@ -0,0 +1,407 @@ +import pytest + +from ..testing import wait_all_tasks_blocked, assert_checkpoints +import trio +from trio import open_memory_channel, EndOfChannel + + +async def test_channel(): + with pytest.raises(TypeError): + open_memory_channel(1.0) + with pytest.raises(ValueError): + open_memory_channel(-1) + + s, r = open_memory_channel(2) + repr(s) # smoke test + repr(r) # smoke test + + s.send_nowait(1) + with assert_checkpoints(): + await s.send(2) + with pytest.raises(trio.WouldBlock): + s.send_nowait(None) + + with assert_checkpoints(): + assert await r.receive() == 1 + assert r.receive_nowait() == 2 + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + + s.send_nowait("last") + await s.aclose() + with pytest.raises(trio.ClosedResourceError): + await s.send("too late") + with pytest.raises(trio.ClosedResourceError): + s.send_nowait("too late") + with pytest.raises(trio.ClosedResourceError): + s.clone() + await s.aclose() + + assert r.receive_nowait() == "last" + with pytest.raises(EndOfChannel): + await r.receive() + await r.aclose() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + with pytest.raises(trio.ClosedResourceError): + await r.receive_nowait() + await r.aclose() + + +async def test_553(autojump_clock): + s, r = open_memory_channel(1) + with trio.move_on_after(10) as timeout_scope: + await r.receive() + assert timeout_scope.cancelled_caught + await s.send("Test for PR #553") + + +async def test_channel_multiple_producers(): + async def producer(send_channel, i): + # We close our handle when we're done with it + async with send_channel: + for j in range(3 * i, 3 * (i + 1)): + await send_channel.send(j) + + send_channel, receive_channel = open_memory_channel(0) + async with trio.open_nursery() as nursery: + # We hand out clones to all the new producers, and then close the + # original. + async with send_channel: + for i in range(10): + nursery.start_soon(producer, send_channel.clone(), i) + + got = [] + async for value in receive_channel: + got.append(value) + + got.sort() + assert got == list(range(30)) + + +async def test_channel_multiple_consumers(): + successful_receivers = set() + received = [] + + async def consumer(receive_channel, i): + async for value in receive_channel: + successful_receivers.add(i) + received.append(value) + + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(1) + async with send_channel: + for i in range(5): + nursery.start_soon(consumer, receive_channel, i) + await wait_all_tasks_blocked() + for i in range(10): + await send_channel.send(i) + + assert successful_receivers == set(range(5)) + assert len(received) == 10 + assert set(received) == set(range(10)) + + +async def test_close_basics(): + async def send_block(s, expect): + with pytest.raises(expect): + await s.send(None) + + # closing send -> other send gets ClosedResourceError + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.ClosedResourceError) + await wait_all_tasks_blocked() + await s.aclose() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + s.send_nowait(None) + with pytest.raises(trio.ClosedResourceError): + await s.send(None) + + # and receive gets EndOfChannel + with pytest.raises(EndOfChannel): + r.receive_nowait() + with pytest.raises(EndOfChannel): + await r.receive() + + # closing receive -> send gets BrokenResourceError + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.BrokenResourceError) + await wait_all_tasks_blocked() + await r.aclose() + + # and it's persistent + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + with pytest.raises(trio.BrokenResourceError): + await s.send(None) + + # closing receive -> other receive gets ClosedResourceError + async def receive_block(r): + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_block, r) + await wait_all_tasks_blocked() + await r.aclose() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + r.receive_nowait() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + +async def test_close_sync(): + async def send_block(s, expect): + with pytest.raises(expect): + await s.send(None) + + # closing send -> other send gets ClosedResourceError + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.ClosedResourceError) + await wait_all_tasks_blocked() + s.close() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + s.send_nowait(None) + with pytest.raises(trio.ClosedResourceError): + await s.send(None) + + # and receive gets EndOfChannel + with pytest.raises(EndOfChannel): + r.receive_nowait() + with pytest.raises(EndOfChannel): + await r.receive() + + # closing receive -> send gets BrokenResourceError + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.BrokenResourceError) + await wait_all_tasks_blocked() + r.close() + + # and it's persistent + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + with pytest.raises(trio.BrokenResourceError): + await s.send(None) + + # closing receive -> other receive gets ClosedResourceError + async def receive_block(r): + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + s, r = open_memory_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_block, r) + await wait_all_tasks_blocked() + r.close() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + r.receive_nowait() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + +async def test_receive_channel_clone_and_close(): + s, r = open_memory_channel(10) + + r2 = r.clone() + r3 = r.clone() + + s.send_nowait(None) + await r.aclose() + with r2: + pass + + with pytest.raises(trio.ClosedResourceError): + r.clone() + + with pytest.raises(trio.ClosedResourceError): + r2.clone() + + # Can still send, r3 is still open + s.send_nowait(None) + + await r3.aclose() + + # But now the receiver is really closed + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + + +async def test_close_multiple_send_handles(): + # With multiple send handles, closing one handle only wakes senders on + # that handle, but others can continue just fine + s1, r = open_memory_channel(0) + s2 = s1.clone() + + async def send_will_close(): + with pytest.raises(trio.ClosedResourceError): + await s1.send("nope") + + async def send_will_succeed(): + await s2.send("ok") + + async with trio.open_nursery() as nursery: + nursery.start_soon(send_will_close) + nursery.start_soon(send_will_succeed) + await wait_all_tasks_blocked() + await s1.aclose() + assert await r.receive() == "ok" + + +async def test_close_multiple_receive_handles(): + # With multiple receive handles, closing one handle only wakes receivers on + # that handle, but others can continue just fine + s, r1 = open_memory_channel(0) + r2 = r1.clone() + + async def receive_will_close(): + with pytest.raises(trio.ClosedResourceError): + await r1.receive() + + async def receive_will_succeed(): + assert await r2.receive() == "ok" + + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_will_close) + nursery.start_soon(receive_will_succeed) + await wait_all_tasks_blocked() + await r1.aclose() + await s.send("ok") + + +async def test_inf_capacity(): + s, r = open_memory_channel(float("inf")) + + # It's accepted, and we can send all day without blocking + with s: + for i in range(10): + s.send_nowait(i) + + got = [] + async for i in r: + got.append(i) + assert got == list(range(10)) + + +async def test_statistics(): + s, r = open_memory_channel(2) + + assert s.statistics() == r.statistics() + stats = s.statistics() + assert stats.current_buffer_used == 0 + assert stats.max_buffer_size == 2 + assert stats.open_send_channels == 1 + assert stats.open_receive_channels == 1 + assert stats.tasks_waiting_send == 0 + assert stats.tasks_waiting_receive == 0 + + s.send_nowait(None) + assert s.statistics().current_buffer_used == 1 + + s2 = s.clone() + assert s.statistics().open_send_channels == 2 + await s.aclose() + assert s2.statistics().open_send_channels == 1 + + r2 = r.clone() + assert s2.statistics().open_receive_channels == 2 + await r2.aclose() + assert s2.statistics().open_receive_channels == 1 + + async with trio.open_nursery() as nursery: + s2.send_nowait(None) # fill up the buffer + assert s.statistics().current_buffer_used == 2 + nursery.start_soon(s2.send, None) + nursery.start_soon(s2.send, None) + await wait_all_tasks_blocked() + assert s.statistics().tasks_waiting_send == 2 + nursery.cancel_scope.cancel() + assert s.statistics().tasks_waiting_send == 0 + + # empty out the buffer again + try: + while True: + r.receive_nowait() + except trio.WouldBlock: + pass + + async with trio.open_nursery() as nursery: + nursery.start_soon(r.receive) + await wait_all_tasks_blocked() + assert s.statistics().tasks_waiting_receive == 1 + nursery.cancel_scope.cancel() + assert s.statistics().tasks_waiting_receive == 0 + + +async def test_channel_fairness(): + + # We can remove an item we just sent, and send an item back in after, if + # no-one else is waiting. + s, r = open_memory_channel(1) + s.send_nowait(1) + assert r.receive_nowait() == 1 + s.send_nowait(2) + assert r.receive_nowait() == 2 + + # But if someone else is waiting to receive, then they "own" the item we + # send, so we can't receive it (even though we run first): + + result = None + + async def do_receive(r): + nonlocal result + result = await r.receive() + + async with trio.open_nursery() as nursery: + nursery.start_soon(do_receive, r) + await wait_all_tasks_blocked() + s.send_nowait(2) + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + assert result == 2 + + # And the analogous situation for send: if we free up a space, we can't + # immediately send something in it if someone is already waiting to do + # that + s, r = open_memory_channel(1) + s.send_nowait(1) + with pytest.raises(trio.WouldBlock): + s.send_nowait(None) + async with trio.open_nursery() as nursery: + nursery.start_soon(s.send, 2) + await wait_all_tasks_blocked() + assert r.receive_nowait() == 1 + with pytest.raises(trio.WouldBlock): + s.send_nowait(3) + assert (await r.receive()) == 2 + + +async def test_unbuffered(): + s, r = open_memory_channel(0) + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + with pytest.raises(trio.WouldBlock): + s.send_nowait(1) + + async def do_send(s, v): + with assert_checkpoints(): + await s.send(v) + + async with trio.open_nursery() as nursery: + nursery.start_soon(do_send, s, 1) + with assert_checkpoints(): + assert await r.receive() == 1 + with pytest.raises(trio.WouldBlock): + r.receive_nowait() |