Source code for luckyloop.mock

"""
MockServer
----------
"""
import socket
import threading
from typing import Union, Dict, Any
import json

_status = {
    "threshold": 0,
    "target_phase": 0,
    "target_frequency": 20,
    "trigger": 0,
    "trigger_method": "passing",
    "session_id": "default",
    "version": "0.0.0",
}
positional_arguments = [k for k in _status.keys()]


def decode(msg: bytes) -> Union[Dict[str, int], None]:
    "parse the byte message from LuckyClient"
    status = msg.decode("ascii")
    if status[0] == "[" and status[-1] == "]":
        status = status[1:-1]
        items = status.strip().split(":")
        cmd = items[0]
        if cmd in ["send_status", "shutdown", "send_condition"]:
            return {"cmd": cmd}
        elif cmd == "set_condition":
            cond = {"cmd": cmd}
            items = items[1].split(",")
            if len(items) != len(positional_arguments):
                raise ValueError(
                    f"{status} has the wrong number of arguments. should be {len(positional_arguments)}"
                )
            for k, v in zip(positional_arguments, items):
                if v == "=":
                    cond[k.lower()] = v
                else:
                    cond[k.lower()] = int(v)
            return cond
        else:
            return {"cmd": "Invalid"}
    else:
        return None


def read_msg(client: socket.socket) -> Union[Dict[str, int], None]:
    """parse the message until it is a valid Payload and return the first"""
    msg = bytearray(b"")
    while True:
        try:
            prt = client.recv(1)
            msg += prt
            status = decode(msg)
            if status is not None:
                return status
        except Exception as e:  # pragma no cover
            print(e)
            return None


def encode(status: Dict[str, int]) -> bytearray:
    """Encodes the status as JSON"""
    msg = json.dumps(status)
    return msg.encode("ascii")


def encode_condition(status: Dict[str, int]) -> bytearray:
    """Encodes the condition as JSON"""
    condition = dict()
    for condition_param in positional_arguments:
        condition[condition_param] = status[condition_param]
    msg = json.dumps(condition)
    return msg.encode("ascii")


def kill(host: str = "127.0.0.1", port: int = 1219):
    from luckyloop.client import LuckyClient

    LuckyClient(host, port).shutdown()


class LuckyServer(threading.Thread):
    """Mock LuckyServer for testing and development purposes
    """

    status = _status.copy()

    def __init__(self, host: str = "127.0.0.1", port: int = 1219):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.is_running = threading.Event()

    def await_running(self):
        "wait until the server has started"
        while not self.is_running.is_set():
            pass

    def kill(self):
        kill(self.host, self.port)

    def detrigger(self):
        self.status["trigger"] = 0

    def update_status(self, payload: Dict[str, Any]):
        print(f"Current status {self.status}", end="")
        for k, v in payload.items():
            if k in self.status.keys():
                if v == "=":
                    continue
                else:
                    self.status[k] = v
            else:  # pragma no cover
                raise ValueError(f"Unknown status update {k}:{v}")
        print(f" updated to {self.status}")

    def run(self):
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        listener.bind((self.host, self.port))
        listener.listen(1)  # two  unaccepted client is allowed
        self.is_running.set()
        print(f"Mock-LuckyServer at {self.host}:{self.port} started")

        while self.is_running.is_set():
            try:
                client, address = listener.accept()
                payload = read_msg(client)
                print(f"LuckyServer received: {payload} from {address}")
                if not payload:
                    continue

                cmd = payload.pop("cmd", None)
                if cmd is not None:
                    if cmd == "shutdown":
                        self.is_running.clear()
                        parcel = encode(self.status)
                        print(f"LuckyServer sends {parcel.decode()}")
                        client.sendall(parcel)
                        break
                    elif cmd == "send_status":
                        parcel = encode(self.status)
                    elif cmd == "send_condition":
                        parcel = encode_condition(self.status)
                    elif cmd == "set_condition":
                        print("command set cond")
                        self.update_status(payload)
                        parcel = encode_condition(self.status)
                else:
                    parcel = encode_condition(self.status)

                print(f"LuckyServer sends {parcel.decode()}")
                client.sendall(parcel)

                self.detrigger()

            except Exception as e:  # pragma no cover
                print(e)
            finally:
                client.shutdown(socket.SHUT_RDWR)
                client.close()
        print("Shutting Mock-LuckyServer down")


[docs]def mock(): """start a mock LuckyServer for testing and development either from the command line with .. code-block:: bash luckymock or from within python with .. code-block:: python from luckyloop.mock import mock server = mock() server.await_running() # kill it later with server.kill() """ import argparse parser = argparse.ArgumentParser(prog="luckyloop") parser.add_argument("--host", type=str, default="127.0.0.1") parser.add_argument("--port", type=int, default=1219) args, unknown = parser.parse_known_args() server = LuckyServer(host=args.host, port=args.port) server.start() server.await_running() return server
if __name__ == "__main__": mock()