import socket
import threading
from typing import Union, Dict, Any
positional_arguments = ["threshold", "target_phase", "target_frequency", "trigger"]
def decode(msg: bytes) -> Union[Dict[str, int], None]:
"parse the byte message from LuckyClient"
status = msg.decode("ascii")
if status[0] == "[" and status[-1] == "]":
status = status[1:-1]
items = status.strip().split(":")
if items[0] in ["send_status", "shutdown"]:
return {"meta": items[0]}
else:
items = items[1].split(",")
cond = dict()
if len(items) != len(positional_arguments):
raise ValueError(
f"{status} has the wrong number of arguments. should be {len(positional_arguments)}"
)
for k, v in zip(positional_arguments, items):
if v == "=":
cond[k.lower()] = v
else:
cond[k.lower()] = int(v)
return cond
else:
return None
def read_msg(client: socket.socket) -> Union[Dict[str, int], None]:
"""parse the message until it is a valid Payload and return the first"""
msg = bytearray(b"")
while True:
try:
prt = client.recv(1)
msg += prt
status = decode(msg)
if status is not None:
return status
except Exception as e: # pragma no cover
print(e)
return None
def encode(status: Dict[str, int]) -> bytearray:
msg = ""
for k, v in status.items():
msg += f"{k}: {v} "
return msg.encode("ascii")
def kill(host: str = "127.0.0.1", port: int = 1219):
from luckyloop.client import LuckyClient
LuckyClient(host, port).shutdown()
class LuckyServer(threading.Thread):
"A mock version of the server running on LuckyLoop for testing and development purposes"
status = {
"threshold": 0,
"target_phase": 0,
"target_frequency": 20,
"trigger": 0,
}
def __init__(self, host: str = "127.0.0.1", port: int = 1219):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.is_running = threading.Event()
def await_running(self):
"wait until the server has started"
while not self.is_running.is_set():
pass
def kill(self):
kill(self.host, self.port)
def detrigger(self):
self.status["trigger"] = 0
def update_status(self, payload: Dict[str, Any]):
print(f"Current status {self.status}", end="")
for k, v in payload.items():
if k in self.status.keys():
if v == "=":
continue
else:
self.status[k] = v
else: # pragma no cover
raise ValueError(f"Unknown status update {payload}")
print(f" updated to {self.status}")
def run(self):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind((self.host, self.port))
listener.listen(1) # two unaccepted client is allowed
self.is_running.set()
print(f"Mock-LuckyServer at {self.host}:{self.port} started")
while self.is_running.is_set():
try:
client, address = listener.accept()
payload = read_msg(client)
print(f"LuckyServer received: {payload} from {address}")
if not payload:
continue
cmd = payload.get("meta", None)
if cmd is not None:
if cmd == "shutdown":
self.is_running.clear()
break
elif cmd == "send_status":
pass
else:
self.update_status(payload)
parcel = encode(self.status)
print(f"LuckyServer sends {parcel.decode()}")
client.sendall(parcel)
self.detrigger()
except Exception as e: # pragma no cover
print(e)
finally:
client.shutdown(socket.SHUT_RDWR)
client.close()
print("Shutting Mock-LuckyServer down")
def mock():
"command-line entry to start a mock LuckyServer"
import argparse
parser = argparse.ArgumentParser(prog="luckyloop")
parser.add_argument("--host", type=str, default="127.0.0.1")
parser.add_argument("--port", type=int, default=1219)
args, unknown = parser.parse_known_args()
server = LuckyServer(host=args.host, port=args.port)
server.start()
server.await_running()
return server
|