import select import sys from typing import TYPE_CHECKING import outcome from contextlib import contextmanager import attr import errno from .. import _core from ._run import _public from ._wakeup_socketpair import WakeupSocketpair assert not TYPE_CHECKING or (sys.platform != "linux" and sys.platform != "win32") @attr.s(slots=True, eq=False, frozen=True) class _KqueueStatistics: tasks_waiting = attr.ib() monitors = attr.ib() backend = attr.ib(default="kqueue") @attr.s(slots=True, eq=False) class KqueueIOManager: _kqueue = attr.ib(factory=select.kqueue) # {(ident, filter): Task or UnboundedQueue} _registered = attr.ib(factory=dict) _force_wakeup = attr.ib(factory=WakeupSocketpair) _force_wakeup_fd = attr.ib(default=None) def __attrs_post_init__(self): force_wakeup_event = select.kevent( self._force_wakeup.wakeup_sock, select.KQ_FILTER_READ, select.KQ_EV_ADD ) self._kqueue.control([force_wakeup_event], 0) self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno() def statistics(self): tasks_waiting = 0 monitors = 0 for receiver in self._registered.values(): if type(receiver) is _core.Task: tasks_waiting += 1 else: monitors += 1 return _KqueueStatistics(tasks_waiting=tasks_waiting, monitors=monitors) def close(self): self._kqueue.close() self._force_wakeup.close() def force_wakeup(self): self._force_wakeup.wakeup_thread_and_signal_safe() def get_events(self, timeout): # max_events must be > 0 or kqueue gets cranky # and we generally want this to be strictly larger than the actual # number of events we get, so that we can tell that we've gotten # all the events in just 1 call. max_events = len(self._registered) + 1 events = [] while True: batch = self._kqueue.control([], max_events, timeout) events += batch if len(batch) < max_events: break else: timeout = 0 # and loop back to the start return events def process_events(self, events): for event in events: key = (event.ident, event.filter) if event.ident == self._force_wakeup_fd: self._force_wakeup.drain() continue receiver = self._registered[key] if event.flags & select.KQ_EV_ONESHOT: del self._registered[key] if type(receiver) is _core.Task: _core.reschedule(receiver, outcome.Value(event)) else: receiver.put_nowait(event) # kevent registration is complicated -- e.g. aio submission can # implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will # automatically register filters for child processes. So our lowlevel # API is *very* low-level: we expose the kqueue itself for adding # events or sticking into AIO submission structs, and split waiting # off into separate methods. It's your responsibility to make sure # that handle_io never receives an event without a corresponding # registration! This may be challenging if you want to be careful # about e.g. KeyboardInterrupt. Possibly this API could be improved to # be more ergonomic... @_public def current_kqueue(self): return self._kqueue @contextmanager @_public def monitor_kevent(self, ident, filter): key = (ident, filter) if key in self._registered: raise _core.BusyResourceError( "attempt to register multiple listeners for same ident/filter pair" ) q = _core.UnboundedQueue() self._registered[key] = q try: yield q finally: del self._registered[key] @_public async def wait_kevent(self, ident, filter, abort_func): key = (ident, filter) if key in self._registered: raise _core.BusyResourceError( "attempt to register multiple listeners for same ident/filter pair" ) self._registered[key] = _core.current_task() def abort(raise_cancel): r = abort_func(raise_cancel) if r is _core.Abort.SUCCEEDED: del self._registered[key] return r return await _core.wait_task_rescheduled(abort) async def _wait_common(self, fd, filter): if not isinstance(fd, int): fd = fd.fileno() flags = select.KQ_EV_ADD | select.KQ_EV_ONESHOT event = select.kevent(fd, filter, flags) self._kqueue.control([event], 0) def abort(_): event = select.kevent(fd, filter, select.KQ_EV_DELETE) try: self._kqueue.control([event], 0) except OSError as exc: # kqueue tracks individual fds (*not* the underlying file # object, see _io_epoll.py for a long discussion of why this # distinction matters), and automatically deregisters an event # if the fd is closed. So if kqueue.control says that it # doesn't know about this event, then probably it's because # the fd was closed behind our backs. (Too bad we can't ask it # to wake us up when this happens, versus discovering it after # the fact... oh well, you can't have everything.) # # FreeBSD reports this using EBADF. macOS uses ENOENT. if exc.errno in (errno.EBADF, errno.ENOENT): # pragma: no branch pass else: # pragma: no cover # As far as we know, this branch can't happen. raise return _core.Abort.SUCCEEDED await self.wait_kevent(fd, filter, abort) @_public async def wait_readable(self, fd): await self._wait_common(fd, select.KQ_FILTER_READ) @_public async def wait_writable(self, fd): await self._wait_common(fd, select.KQ_FILTER_WRITE) @_public def notify_closing(self, fd): if not isinstance(fd, int): fd = fd.fileno() for filter in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]: key = (fd, filter) receiver = self._registered.get(key) if receiver is None: continue if type(receiver) is _core.Task: event = select.kevent(fd, filter, select.KQ_EV_DELETE) self._kqueue.control([event], 0) exc = _core.ClosedResourceError("another task closed this fd") _core.reschedule(receiver, outcome.Error(exc)) del self._registered[key] else: # XX this is an interesting example of a case where being able # to close a queue would be useful... raise NotImplementedError( "can't close an fd that monitor_kevent is using" )