Skip to content

hololinked.core.zmq.rpc_server.AsyncScheduler

Bases: Scheduler

Scheduler class to schedule the operations of a thing in an async loop.

Source code in .venv/lib/python3.13/site-packages/hololinked/core/zmq/rpc_server.py
class AsyncScheduler(Scheduler):
    """
    Scheduler class to schedule the operations of a thing in an async loop.
    """
    def __init__(self, instance: Thing, rpc_server: RPCServer) -> None:
        super().__init__(instance, rpc_server)
        self._job = None
        self._one_shot = True 
        self._operation_execution_ready_event = asyncio.Event()
        self._operation_execution_complete_event = asyncio.Event()

    @property 
    def has_job(self) -> bool:
        return self._job is not None

    @property
    def next_job(self) -> Scheduler.JobInvokationType:
        if self._job is None:
            raise RuntimeError("No job to execute")
        return self._job

    def dispatch_job(self, job: Scheduler.JobInvokationType) -> None:
        self._job = job
        eventloop = get_current_async_loop()
        eventloop.call_soon(lambda: asyncio.create_task(self.rpc_server.tunnel_message_to_things(self)))
        eventloop.call_soon(lambda: asyncio.create_task(self.rpc_server.run_thing_instance(self.instance, self)))
        self._job_queued_event.set()