Skip to content

hololinked.core.events.EventDispatcher

The actual worker which pushes the event. The separation is necessary between Event and EventDispatcher to allow class level definitions of the Event

Source code in .venv/lib/python3.13/site-packages/hololinked/core/events.py
class EventDispatcher:
    """
    The actual worker which pushes the event. The separation is necessary between `Event` and 
    `EventDispatcher` to allow class level definitions of the `Event` 
    """

    __slots__ = ['_unique_identifier', '_publisher', '_owner_inst', '_descriptor']

    def __init__(self, unique_identifier: str, publisher: "EventPublisher", owner_inst: ParameterizedMetaclass, descriptor: Event) -> None:
        self._unique_identifier = unique_identifier
        self._owner_inst = owner_inst
        self._descriptor = descriptor
        self.publisher = publisher

    @property
    def publisher(self) -> "EventPublisher": 
        """
        Event publishing PUB socket owning object.
        """
        return self._publisher

    @publisher.setter
    def publisher(self, value: "EventPublisher") -> None:
        if not hasattr(self, '_publisher'):
            self._publisher = value
        elif not isinstance(value, EventPublisher):
            raise AttributeError("Publisher must be of type EventPublisher. Given type: " + str(type(value)))
        if self._publisher is not None:
            self._publisher.register(self)

    def push(self, data: typing.Any) -> None:
        """
        publish the event. 

        Parameters
        ----------
        data: Any
            payload of the event
        """
        self.publisher.publish(self, data=data)

    def receive_acknowledgement(self, timeout : typing.Union[float, int, None]) -> bool:
        """
        Receive acknowlegement for event receive. When the timeout argument is present and not None, 
        it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
        """
        raise NotImplementedError("Event acknowledgement is not implemented yet.")
        return self._synchronize_event.wait(timeout=timeout)

    def _set_acknowledgement(self, *args, **kwargs) -> None:
        """
        Method to be called by RPC server when an acknowledgement is received. Not for user to be set.
        """
        raise NotImplementedError("Event acknowledgement is not implemented yet.")
        self._synchronize_event.set()

Attributes

publisher property writable

publisher: EventPublisher

Event publishing PUB socket owning object.

Functions

push

push(data: Any) -> None

publish the event.

Parameters:

Name Type Description Default

data

Any

payload of the event

required
Source code in .venv/lib/python3.13/site-packages/hololinked/core/events.py
def push(self, data: typing.Any) -> None:
    """
    publish the event. 

    Parameters
    ----------
    data: Any
        payload of the event
    """
    self.publisher.publish(self, data=data)

receive_acknowledgement

receive_acknowledgement(timeout: Union[float, int, None]) -> bool

Receive acknowlegement for event receive. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

Source code in .venv/lib/python3.13/site-packages/hololinked/core/events.py
def receive_acknowledgement(self, timeout : typing.Union[float, int, None]) -> bool:
    """
    Receive acknowlegement for event receive. When the timeout argument is present and not None, 
    it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
    """
    raise NotImplementedError("Event acknowledgement is not implemented yet.")
    return self._synchronize_event.wait(timeout=timeout)