Source code for luckyloop.client

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Python API
----------

LuckyClient is the python interface to remote control LuckyLoop over TCP/IP

    Example
    -------

    .. code-block:: python

        from luckyloop.client import LuckyClient

        lucky = LuckyClient()
        lucky.frequency = 20  sets the target frequency to 20
        lucky.phase = 2 # sets the target phase to 2
        lucky.trigger() 
        # triggers as soon as the target phase for the target frequency is met


"""

from typing import Dict, Any, Union
import socket
import json
from luckyloop.mock import positional_arguments

no_argument_sign = "="


[docs]class LuckyClient: """Allows to remote control the LuckyLoop args ---- host: str defaults to 127.0.0.1, but should be set in production to the ip-adress of the LuckyLoop device port: int defaults to 1219 because LuckyServer has the acronym LS and L is 12th character in alphabet and S is the 19th. .. note:: For safety reasons, parameters like target frequency and phase are not set during initialization of the client, as this could interrupt an ongoing process on LuckyLoop when an user tries to reconnect. All parameters have be set explicitly after initialization. """ __slots__ = [ "_host", "_port", "_socket", ] _buffer_size = 8192 def __init__(self, host: str = "127.0.0.1", port: int = 1219): self._host = host self._port = port self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._enquire() @property def phase(self) -> int: """the phase at which LuckyLoop triggers. set the corresponding target frequency with :meth:`~.frequency` args ---- target_phase an integer between -180 and +180 """ status = self._enquire() return status["target_phase"] @phase.setter def phase(self, target_phase: int): if target_phase < -180 or target_phase > 180: raise ValueError("Target phase should be between -180 and 180") if type(target_phase) != int: raise ValueError("Target phase must be an integer") condition = { "threshold": no_argument_sign, "target_phase": target_phase, "target_frequency": no_argument_sign, "trigger": no_argument_sign, } self.set_condition(condition) @property def trigger_method(self) -> int: """the trigger_method according to which LuckyLoop would trigger args ---- trigger_method a str with one of the following values: - "passing" - 'now' - 'epsilon(<float>)' """ status = self._enquire_condition() return status["trigger_method"] @trigger_method.setter def trigger_method(self, trigger_method: str = "passing"): condition = { "threshold": no_argument_sign, "target_phase": no_argument_sign, "target_frequency": no_argument_sign, "trigger": no_argument_sign, "trigger_method": trigger_method, } self.set_condition(condition) @property def threshold(self) -> int: """the power threshold for which LuckyLoop would trigger args ---- threshold an integer larger or equal to 0 .. note:: Currently not being used by LuckyLoop """ status = self._enquire_condition() return status["threshold"] @threshold.setter def threshold(self, threshold: int): if type(threshold) != int or threshold < 0: raise ValueError( "threshold must be an integer larger or equal to 0" ) condition = { "threshold": threshold, "target_phase": no_argument_sign, "target_frequency": no_argument_sign, "trigger": no_argument_sign, } self.set_condition(condition) @property def frequency(self) -> int: """The frequency for which LuckyLoop calculates phase and power LuckyLoop calculates phase and amplitude continually for a specific frequency. Change this target_frequency args ---- target_frequency an integer of at least 1 Hz """ status = self._enquire_condition() return status["target_frequency"] @frequency.setter def frequency(self, target_frequency: int): if target_frequency <= 0: raise ValueError("Target frequency must be higher than 0 Hz") if type(target_frequency) != int: raise ValueError("Target frequency must be an integer") condition = { "threshold": no_argument_sign, "target_phase": no_argument_sign, "target_frequency": target_frequency, "trigger": no_argument_sign, } self.set_condition(condition) @property def session_id(self) -> int: """the lsl session ID args ---- session_ID an string with less than 50 characters """ status = self._enquire() return status["session_id"] @session_id.setter def session_id(self, session_id: str = "default"): "sets the LSL session ID in a config file. Change will apply after restart" if type(session_id) != str or 0 < len(session_id) >= 50: raise ValueError( "Session_ID must be an string shorter than 50 characters" ) message = f"[set_lsl_session_ID: {session_id}]\0".encode("ascii") print( f"Set LSL session-ID to {session_id}. Restart the device to apply change!" ) self._query(message)
[docs] def trigger(self, method: Union[str, None] = None): """LuckyLoop will trigger once the conditions are met While set, LuckyLoop will, once the required conditions are met, pull OUT1 to 5V for one cycle (i.e. 1ms). After LuckyLoop has triggered, trigger status field is set to 0. .. note:: The effective target frequency and phase can be changed with(:meth:`~.frequency` & :meth:`~phase`) anytime. To prevent a racing condition, set them prior to calling :meth:`~trigger`. .. tip:: You can see whether LuckyLoop has triggered by calling the private :meth:`~._enquire` continually until the trigger status field is 0, e.g. with :code:`client.enquire()["trigger"] == 0` """ condition = { "threshold": no_argument_sign, "target_phase": no_argument_sign, "target_frequency": no_argument_sign, "trigger": 1, # trigger PIN 1 "trigger_method": no_argument_sign if method is None else method, } return self.set_condition(condition)
def _set_condition(self, condition: Dict[str, Any]) -> Dict[str, Any]: return self._query(self._encode(condition))
[docs] def shutdown(self): "shuts LuckyLoop down gracefully" message = "[shutdown:]\0".encode("ascii") self._query(message)
def _enquire(self) -> Dict[str, int]: "enquire about the current status and version of LuckyLoop" message = "[send_status:]\0".encode("ascii") return self._query(message) def _enquire_condition(self) -> Dict[str, int]: "enquire about the current condition of LuckyLoop" message = "[send_condition:]\0".encode("ascii") return self._query(message) def _query(self, message: bytes) -> Dict[str, Any]: "send and receive a message to LuckyLoop" self._connect() self._socket.sendall(message) status = self._socket.recv(self._buffer_size) self._close() return self._decode(status) def _connect(self): "connect with LuckyLoop" self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self._socket.connect((self._host, self._port)) self._socket.settimeout(1) except ConnectionRefusedError: raise ConnectionError( "It is not possible to reach LuckyLoop. Restart the device and make sure that IP and port are correct." ) def _close(self): "close the connection to LuckyLoop" # self.socket.shutdown(1) self._socket.close() @staticmethod def _encode(condition: Dict[str, Any]) -> bytearray: "serialize a status dictionary for sending" message = "[set_condition:" for k in positional_arguments: try: message += f"{condition[k]}," except KeyError: message += f"{no_argument_sign}," message = message[:-1] message += "]" return message.encode("ascii") @staticmethod def _decode(status: bytearray) -> Dict[str, int]: "deserialize a message from LuckyServer and return it as a dictionary" message = status.decode("ascii").replace("'", '"') try: message_as_dict = json.loads(message) except json.JSONDecodeError as e: print("Message:", message) print("Raw Message:", status) raise e return message_as_dict