process_registry.py
4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""Runtime registry for managed process state."""
from __future__ import annotations
from typing import Any, Iterable
ProcessEntry = dict[str, Any]
ProcessTable = dict[str, ProcessEntry]
def build_default_process_table() -> ProcessTable:
"""Build the default runtime process state table."""
return {
"insight": {
"process": None,
"port": 8501,
"status": "stopped",
"output": [],
"log_file": None,
"healthcheck_started_at": None,
},
"media": {
"process": None,
"port": 8502,
"status": "stopped",
"output": [],
"log_file": None,
"healthcheck_started_at": None,
},
"query": {
"process": None,
"port": 8503,
"status": "stopped",
"output": [],
"log_file": None,
"healthcheck_started_at": None,
},
"forum": {
"process": None,
"port": None,
"status": "stopped",
"output": [],
"log_file": None,
},
}
def _clone_entry(entry: ProcessEntry) -> ProcessEntry:
cloned: ProcessEntry = {}
for key, value in entry.items():
if isinstance(value, list):
cloned[key] = list(value)
elif isinstance(value, dict):
cloned[key] = dict(value)
else:
cloned[key] = value
return cloned
class ProcessRuntimeRegistry:
"""Explicit wrapper around the managed process runtime table."""
def __init__(self, table: ProcessTable | None = None) -> None:
self._table = table if table is not None else build_default_process_table()
@property
def table(self) -> ProcessTable:
return self._table
def items(self):
return self._table.items()
def contains(self, app_name: str) -> bool:
return app_name in self._table
def get_entry(self, app_name: str) -> ProcessEntry:
return self._table[app_name]
def snapshot(self, app_name: str) -> ProcessEntry:
return _clone_entry(self.get_entry(app_name))
def snapshot_all(self) -> ProcessTable:
return {name: self.snapshot(name) for name in self._table}
def status_snapshot(self, *, include_output_lines: bool = False) -> dict[str, dict[str, Any]]:
snapshot: dict[str, dict[str, Any]] = {}
for app_name, info in self._table.items():
entry = {
"status": info.get("status"),
"port": info.get("port"),
}
if include_output_lines:
entry["output_lines"] = len(info.get("output", []))
snapshot[app_name] = entry
return snapshot
def get_process(self, app_name: str):
return self.get_entry(app_name).get("process")
def set_process(self, app_name: str, process: Any) -> None:
self.get_entry(app_name)["process"] = process
def clear_process(self, app_name: str) -> None:
self.get_entry(app_name)["process"] = None
def get_port(self, app_name: str) -> int | None:
port = self.get_entry(app_name).get("port")
return int(port) if port is not None else None
def get_status(self, app_name: str) -> str:
return str(self.get_entry(app_name).get("status", ""))
def set_status(self, app_name: str, status: str) -> None:
self.get_entry(app_name)["status"] = status
def set_output(self, app_name: str, output: list[str]) -> None:
self.get_entry(app_name)["output"] = list(output)
def append_output(self, app_name: str, line: str) -> None:
self.get_entry(app_name).setdefault("output", []).append(line)
def set_healthcheck_started_at(self, app_name: str, started_at: float | None) -> None:
self.get_entry(app_name)["healthcheck_started_at"] = started_at
def get_healthcheck_started_at(self, app_name: str) -> float | None:
value = self.get_entry(app_name).get("healthcheck_started_at")
return float(value) if value is not None else None
def reset_runtime(
self,
app_name: str,
*,
status: str = "stopped",
clear_output: bool = False,
) -> None:
entry = self.get_entry(app_name)
entry["process"] = None
entry["status"] = status
if "healthcheck_started_at" in entry:
entry["healthcheck_started_at"] = None
if clear_output:
entry["output"] = []
def running_apps(self, app_names: Iterable[str]) -> list[str]:
return [name for name in app_names if self.contains(name) and self.get_status(name) == "running"]
def active_ports(self) -> list[str]:
ports: list[str] = []
for app_name, info in self._table.items():
port = info.get("port")
if port:
ports.append(f"{app_name}:{port}")
return ports
PROCESS_RUNTIME_REGISTRY = ProcessRuntimeRegistry()