Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

import socket 

import threading 

from typing import Union, Dict, Any 

 

positional_arguments = ["threshold", "target_phase", "target_frequency", "trigger"] 

 

 

def decode(msg: bytes) -> Union[Dict[str, int], None]: 

"parse the byte message from LuckyClient" 

status = msg.decode("ascii") 

if status[0] == "[" and status[-1] == "]": 

status = status[1:-1] 

items = status.strip().split(":") 

if items[0] in ["send_status", "shutdown"]: 

return {"meta": items[0]} 

else: 

items = items[1].split(",") 

cond = dict() 

if len(items) != len(positional_arguments): 

raise ValueError( 

f"{status} has the wrong number of arguments. should be {len(positional_arguments)}" 

) 

for k, v in zip(positional_arguments, items): 

if v == "=": 

cond[k.lower()] = v 

else: 

cond[k.lower()] = int(v) 

return cond 

else: 

return None 

 

 

def read_msg(client: socket.socket) -> Union[Dict[str, int], None]: 

"""parse the message until it is a valid Payload and return the first""" 

msg = bytearray(b"") 

while True: 

try: 

prt = client.recv(1) 

msg += prt 

status = decode(msg) 

if status is not None: 

return status 

except Exception as e: # pragma no cover 

print(e) 

return None 

 

 

def encode(status: Dict[str, int]) -> bytearray: 

msg = "" 

for k, v in status.items(): 

msg += f"{k}: {v} " 

return msg.encode("ascii") 

 

 

def kill(host: str = "127.0.0.1", port: int = 1219): 

from luckyloop.client import LuckyClient 

 

LuckyClient(host, port).shutdown() 

 

 

class LuckyServer(threading.Thread): 

"A mock version of the server running on LuckyLoop for testing and development purposes" 

 

status = { 

"threshold": 0, 

"target_phase": 0, 

"target_frequency": 20, 

"trigger": 0, 

} 

 

def __init__(self, host: str = "127.0.0.1", port: int = 1219): 

threading.Thread.__init__(self) 

self.host = host 

self.port = port 

self.is_running = threading.Event() 

 

def await_running(self): 

"wait until the server has started" 

while not self.is_running.is_set(): 

pass 

 

def kill(self): 

kill(self.host, self.port) 

 

def detrigger(self): 

self.status["trigger"] = 0 

 

def update_status(self, payload: Dict[str, Any]): 

print(f"Current status {self.status}", end="") 

for k, v in payload.items(): 

if k in self.status.keys(): 

if v == "=": 

continue 

else: 

self.status[k] = v 

else: # pragma no cover 

raise ValueError(f"Unknown status update {payload}") 

print(f" updated to {self.status}") 

 

def run(self): 

listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

listener.bind((self.host, self.port)) 

listener.listen(1) # two unaccepted client is allowed 

self.is_running.set() 

print(f"Mock-LuckyServer at {self.host}:{self.port} started") 

 

while self.is_running.is_set(): 

try: 

client, address = listener.accept() 

payload = read_msg(client) 

print(f"LuckyServer received: {payload} from {address}") 

if not payload: 

continue 

 

cmd = payload.get("meta", None) 

if cmd is not None: 

if cmd == "shutdown": 

self.is_running.clear() 

break 

elif cmd == "send_status": 

pass 

else: 

self.update_status(payload) 

 

parcel = encode(self.status) 

print(f"LuckyServer sends {parcel.decode()}") 

client.sendall(parcel) 

self.detrigger() 

 

except Exception as e: # pragma no cover 

print(e) 

finally: 

client.shutdown(socket.SHUT_RDWR) 

client.close() 

print("Shutting Mock-LuckyServer down") 

 

 

def mock(): 

"command-line entry to start a mock LuckyServer" 

import argparse 

 

parser = argparse.ArgumentParser(prog="luckyloop") 

parser.add_argument("--host", type=str, default="127.0.0.1") 

parser.add_argument("--port", type=int, default=1219) 

args, unknown = parser.parse_known_args() 

 

server = LuckyServer(host=args.host, port=args.port) 

server.start() 

server.await_running() 

return server