system_state.py
1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Runtime registry for system startup and shutdown state."""
from __future__ import annotations
from typing import Dict
class SystemStateRegistry:
"""Explicit wrapper around system runtime state."""
def __init__(self) -> None:
self._state = {
"started": False,
"starting": False,
"shutdown_requested": False,
}
def snapshot(self) -> Dict[str, bool]:
return dict(self._state)
def set_state(self, *, started: bool | None = None, starting: bool | None = None) -> None:
if started is not None:
self._state["started"] = bool(started)
if starting is not None:
self._state["starting"] = bool(starting)
if started:
self._state["shutdown_requested"] = False
def prepare_start(self) -> tuple[bool, str]:
state = self.snapshot()
if state["starting"]:
return False, "系统正在启动,请稍候"
self.set_state(starting=True)
self._state["shutdown_requested"] = False
return True, "系统启动中"
def request_shutdown(self) -> bool:
if self._state["shutdown_requested"]:
return False
self._state["shutdown_requested"] = True
return True
def clear_shutdown_request(self) -> None:
self._state["shutdown_requested"] = False
SYSTEM_STATE_REGISTRY = SystemStateRegistry()
def get_system_state() -> Dict[str, bool]:
return SYSTEM_STATE_REGISTRY.snapshot()
def set_system_state(*, started: bool | None = None, starting: bool | None = None) -> None:
SYSTEM_STATE_REGISTRY.set_state(started=started, starting=starting)
def prepare_system_start() -> tuple[bool, str]:
return SYSTEM_STATE_REGISTRY.prepare_start()
def mark_shutdown_requested() -> bool:
return SYSTEM_STATE_REGISTRY.request_shutdown()
def clear_shutdown_request() -> None:
SYSTEM_STATE_REGISTRY.clear_shutdown_request()