summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/trio/tests/test_channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.9/site-packages/trio/tests/test_channel.py')
-rw-r--r--venv/lib/python3.9/site-packages/trio/tests/test_channel.py407
1 files changed, 407 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/trio/tests/test_channel.py b/venv/lib/python3.9/site-packages/trio/tests/test_channel.py
new file mode 100644
index 00000000..fd990fb3
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/trio/tests/test_channel.py
@@ -0,0 +1,407 @@
+import pytest
+
+from ..testing import wait_all_tasks_blocked, assert_checkpoints
+import trio
+from trio import open_memory_channel, EndOfChannel
+
+
+async def test_channel():
+ with pytest.raises(TypeError):
+ open_memory_channel(1.0)
+ with pytest.raises(ValueError):
+ open_memory_channel(-1)
+
+ s, r = open_memory_channel(2)
+ repr(s) # smoke test
+ repr(r) # smoke test
+
+ s.send_nowait(1)
+ with assert_checkpoints():
+ await s.send(2)
+ with pytest.raises(trio.WouldBlock):
+ s.send_nowait(None)
+
+ with assert_checkpoints():
+ assert await r.receive() == 1
+ assert r.receive_nowait() == 2
+ with pytest.raises(trio.WouldBlock):
+ r.receive_nowait()
+
+ s.send_nowait("last")
+ await s.aclose()
+ with pytest.raises(trio.ClosedResourceError):
+ await s.send("too late")
+ with pytest.raises(trio.ClosedResourceError):
+ s.send_nowait("too late")
+ with pytest.raises(trio.ClosedResourceError):
+ s.clone()
+ await s.aclose()
+
+ assert r.receive_nowait() == "last"
+ with pytest.raises(EndOfChannel):
+ await r.receive()
+ await r.aclose()
+ with pytest.raises(trio.ClosedResourceError):
+ await r.receive()
+ with pytest.raises(trio.ClosedResourceError):
+ await r.receive_nowait()
+ await r.aclose()
+
+
+async def test_553(autojump_clock):
+ s, r = open_memory_channel(1)
+ with trio.move_on_after(10) as timeout_scope:
+ await r.receive()
+ assert timeout_scope.cancelled_caught
+ await s.send("Test for PR #553")
+
+
+async def test_channel_multiple_producers():
+ async def producer(send_channel, i):
+ # We close our handle when we're done with it
+ async with send_channel:
+ for j in range(3 * i, 3 * (i + 1)):
+ await send_channel.send(j)
+
+ send_channel, receive_channel = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ # We hand out clones to all the new producers, and then close the
+ # original.
+ async with send_channel:
+ for i in range(10):
+ nursery.start_soon(producer, send_channel.clone(), i)
+
+ got = []
+ async for value in receive_channel:
+ got.append(value)
+
+ got.sort()
+ assert got == list(range(30))
+
+
+async def test_channel_multiple_consumers():
+ successful_receivers = set()
+ received = []
+
+ async def consumer(receive_channel, i):
+ async for value in receive_channel:
+ successful_receivers.add(i)
+ received.append(value)
+
+ async with trio.open_nursery() as nursery:
+ send_channel, receive_channel = trio.open_memory_channel(1)
+ async with send_channel:
+ for i in range(5):
+ nursery.start_soon(consumer, receive_channel, i)
+ await wait_all_tasks_blocked()
+ for i in range(10):
+ await send_channel.send(i)
+
+ assert successful_receivers == set(range(5))
+ assert len(received) == 10
+ assert set(received) == set(range(10))
+
+
+async def test_close_basics():
+ async def send_block(s, expect):
+ with pytest.raises(expect):
+ await s.send(None)
+
+ # closing send -> other send gets ClosedResourceError
+ s, r = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(send_block, s, trio.ClosedResourceError)
+ await wait_all_tasks_blocked()
+ await s.aclose()
+
+ # and it's persistent
+ with pytest.raises(trio.ClosedResourceError):
+ s.send_nowait(None)
+ with pytest.raises(trio.ClosedResourceError):
+ await s.send(None)
+
+ # and receive gets EndOfChannel
+ with pytest.raises(EndOfChannel):
+ r.receive_nowait()
+ with pytest.raises(EndOfChannel):
+ await r.receive()
+
+ # closing receive -> send gets BrokenResourceError
+ s, r = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(send_block, s, trio.BrokenResourceError)
+ await wait_all_tasks_blocked()
+ await r.aclose()
+
+ # and it's persistent
+ with pytest.raises(trio.BrokenResourceError):
+ s.send_nowait(None)
+ with pytest.raises(trio.BrokenResourceError):
+ await s.send(None)
+
+ # closing receive -> other receive gets ClosedResourceError
+ async def receive_block(r):
+ with pytest.raises(trio.ClosedResourceError):
+ await r.receive()
+
+ s, r = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(receive_block, r)
+ await wait_all_tasks_blocked()
+ await r.aclose()
+
+ # and it's persistent
+ with pytest.raises(trio.ClosedResourceError):
+ r.receive_nowait()
+ with pytest.raises(trio.ClosedResourceError):
+ await r.receive()
+
+
+async def test_close_sync():
+ async def send_block(s, expect):
+ with pytest.raises(expect):
+ await s.send(None)
+
+ # closing send -> other send gets ClosedResourceError
+ s, r = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(send_block, s, trio.ClosedResourceError)
+ await wait_all_tasks_blocked()
+ s.close()
+
+ # and it's persistent
+ with pytest.raises(trio.ClosedResourceError):
+ s.send_nowait(None)
+ with pytest.raises(trio.ClosedResourceError):
+ await s.send(None)
+
+ # and receive gets EndOfChannel
+ with pytest.raises(EndOfChannel):
+ r.receive_nowait()
+ with pytest.raises(EndOfChannel):
+ await r.receive()
+
+ # closing receive -> send gets BrokenResourceError
+ s, r = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(send_block, s, trio.BrokenResourceError)
+ await wait_all_tasks_blocked()
+ r.close()
+
+ # and it's persistent
+ with pytest.raises(trio.BrokenResourceError):
+ s.send_nowait(None)
+ with pytest.raises(trio.BrokenResourceError):
+ await s.send(None)
+
+ # closing receive -> other receive gets ClosedResourceError
+ async def receive_block(r):
+ with pytest.raises(trio.ClosedResourceError):
+ await r.receive()
+
+ s, r = open_memory_channel(0)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(receive_block, r)
+ await wait_all_tasks_blocked()
+ r.close()
+
+ # and it's persistent
+ with pytest.raises(trio.ClosedResourceError):
+ r.receive_nowait()
+ with pytest.raises(trio.ClosedResourceError):
+ await r.receive()
+
+
+async def test_receive_channel_clone_and_close():
+ s, r = open_memory_channel(10)
+
+ r2 = r.clone()
+ r3 = r.clone()
+
+ s.send_nowait(None)
+ await r.aclose()
+ with r2:
+ pass
+
+ with pytest.raises(trio.ClosedResourceError):
+ r.clone()
+
+ with pytest.raises(trio.ClosedResourceError):
+ r2.clone()
+
+ # Can still send, r3 is still open
+ s.send_nowait(None)
+
+ await r3.aclose()
+
+ # But now the receiver is really closed
+ with pytest.raises(trio.BrokenResourceError):
+ s.send_nowait(None)
+
+
+async def test_close_multiple_send_handles():
+ # With multiple send handles, closing one handle only wakes senders on
+ # that handle, but others can continue just fine
+ s1, r = open_memory_channel(0)
+ s2 = s1.clone()
+
+ async def send_will_close():
+ with pytest.raises(trio.ClosedResourceError):
+ await s1.send("nope")
+
+ async def send_will_succeed():
+ await s2.send("ok")
+
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(send_will_close)
+ nursery.start_soon(send_will_succeed)
+ await wait_all_tasks_blocked()
+ await s1.aclose()
+ assert await r.receive() == "ok"
+
+
+async def test_close_multiple_receive_handles():
+ # With multiple receive handles, closing one handle only wakes receivers on
+ # that handle, but others can continue just fine
+ s, r1 = open_memory_channel(0)
+ r2 = r1.clone()
+
+ async def receive_will_close():
+ with pytest.raises(trio.ClosedResourceError):
+ await r1.receive()
+
+ async def receive_will_succeed():
+ assert await r2.receive() == "ok"
+
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(receive_will_close)
+ nursery.start_soon(receive_will_succeed)
+ await wait_all_tasks_blocked()
+ await r1.aclose()
+ await s.send("ok")
+
+
+async def test_inf_capacity():
+ s, r = open_memory_channel(float("inf"))
+
+ # It's accepted, and we can send all day without blocking
+ with s:
+ for i in range(10):
+ s.send_nowait(i)
+
+ got = []
+ async for i in r:
+ got.append(i)
+ assert got == list(range(10))
+
+
+async def test_statistics():
+ s, r = open_memory_channel(2)
+
+ assert s.statistics() == r.statistics()
+ stats = s.statistics()
+ assert stats.current_buffer_used == 0
+ assert stats.max_buffer_size == 2
+ assert stats.open_send_channels == 1
+ assert stats.open_receive_channels == 1
+ assert stats.tasks_waiting_send == 0
+ assert stats.tasks_waiting_receive == 0
+
+ s.send_nowait(None)
+ assert s.statistics().current_buffer_used == 1
+
+ s2 = s.clone()
+ assert s.statistics().open_send_channels == 2
+ await s.aclose()
+ assert s2.statistics().open_send_channels == 1
+
+ r2 = r.clone()
+ assert s2.statistics().open_receive_channels == 2
+ await r2.aclose()
+ assert s2.statistics().open_receive_channels == 1
+
+ async with trio.open_nursery() as nursery:
+ s2.send_nowait(None) # fill up the buffer
+ assert s.statistics().current_buffer_used == 2
+ nursery.start_soon(s2.send, None)
+ nursery.start_soon(s2.send, None)
+ await wait_all_tasks_blocked()
+ assert s.statistics().tasks_waiting_send == 2
+ nursery.cancel_scope.cancel()
+ assert s.statistics().tasks_waiting_send == 0
+
+ # empty out the buffer again
+ try:
+ while True:
+ r.receive_nowait()
+ except trio.WouldBlock:
+ pass
+
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(r.receive)
+ await wait_all_tasks_blocked()
+ assert s.statistics().tasks_waiting_receive == 1
+ nursery.cancel_scope.cancel()
+ assert s.statistics().tasks_waiting_receive == 0
+
+
+async def test_channel_fairness():
+
+ # We can remove an item we just sent, and send an item back in after, if
+ # no-one else is waiting.
+ s, r = open_memory_channel(1)
+ s.send_nowait(1)
+ assert r.receive_nowait() == 1
+ s.send_nowait(2)
+ assert r.receive_nowait() == 2
+
+ # But if someone else is waiting to receive, then they "own" the item we
+ # send, so we can't receive it (even though we run first):
+
+ result = None
+
+ async def do_receive(r):
+ nonlocal result
+ result = await r.receive()
+
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(do_receive, r)
+ await wait_all_tasks_blocked()
+ s.send_nowait(2)
+ with pytest.raises(trio.WouldBlock):
+ r.receive_nowait()
+ assert result == 2
+
+ # And the analogous situation for send: if we free up a space, we can't
+ # immediately send something in it if someone is already waiting to do
+ # that
+ s, r = open_memory_channel(1)
+ s.send_nowait(1)
+ with pytest.raises(trio.WouldBlock):
+ s.send_nowait(None)
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(s.send, 2)
+ await wait_all_tasks_blocked()
+ assert r.receive_nowait() == 1
+ with pytest.raises(trio.WouldBlock):
+ s.send_nowait(3)
+ assert (await r.receive()) == 2
+
+
+async def test_unbuffered():
+ s, r = open_memory_channel(0)
+ with pytest.raises(trio.WouldBlock):
+ r.receive_nowait()
+ with pytest.raises(trio.WouldBlock):
+ s.send_nowait(1)
+
+ async def do_send(s, v):
+ with assert_checkpoints():
+ await s.send(v)
+
+ async with trio.open_nursery() as nursery:
+ nursery.start_soon(do_send, s, 1)
+ with assert_checkpoints():
+ assert await r.receive() == 1
+ with pytest.raises(trio.WouldBlock):
+ r.receive_nowait()