log_stream.py
4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""运行时日志流与进程输出转发。"""
from __future__ import annotations
import time
from datetime import datetime
from pathlib import Path
from typing import Callable
from loguru import logger
def write_log_to_file(log_dir: Path, app_name: str, line: str) -> None:
"""Append a line to the app log file."""
try:
log_file_path = log_dir / f"{app_name}.log"
with open(log_file_path, "a", encoding="utf-8") as file:
file.write(line + "\n")
file.flush()
except Exception as exc:
logger.error(f"Error writing log for {app_name}: {exc}")
def read_log_from_file(log_dir: Path, app_name: str, tail_lines: int | None = None) -> list[str]:
"""Read log output from a file."""
try:
log_file_path = log_dir / f"{app_name}.log"
if not log_file_path.exists():
return []
with open(log_file_path, "r", encoding="utf-8") as file:
lines = file.readlines()
normalized = [line.rstrip("\n\r") for line in lines if line.strip()]
return normalized[-tail_lines:] if tail_lines else normalized
except Exception as exc:
logger.exception(f"Error reading log for {app_name}: {exc}")
return []
def emit_console_output(
emit_output: Callable[[str, dict], None],
app_name: str,
formatted_line: str,
) -> None:
emit_output(
"console_output",
{
"app": app_name,
"line": formatted_line,
},
)
def append_and_emit_log(
*,
log_dir: Path,
emit_output: Callable[[str, dict], None],
app_name: str,
line: str,
) -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_line = f"[{timestamp}] {line}"
write_log_to_file(log_dir, app_name, formatted_line)
emit_console_output(emit_output, app_name, formatted_line)
def flush_remaining_output(
*,
process,
log_dir: Path,
emit_output: Callable[[str, dict], None],
app_name: str,
) -> None:
remaining_output = process.stdout.read()
if not remaining_output:
return
lines = remaining_output.decode("utf-8", errors="replace").split("\n")
for line in lines:
cleaned = line.strip()
if cleaned:
append_and_emit_log(
log_dir=log_dir,
emit_output=emit_output,
app_name=app_name,
line=cleaned,
)
def read_process_output(
*,
process,
log_dir: Path,
emit_output: Callable[[str, dict], None],
app_name: str,
poll_interval: float = 0.1,
) -> None:
"""Stream subprocess output into the log file."""
import select
import sys
while True:
try:
if process.poll() is not None:
flush_remaining_output(
process=process,
log_dir=log_dir,
emit_output=emit_output,
app_name=app_name,
)
break
if sys.platform == "win32":
output = process.stdout.readline()
if output:
line = output.decode("utf-8", errors="replace").strip()
if line:
append_and_emit_log(
log_dir=log_dir,
emit_output=emit_output,
app_name=app_name,
line=line,
)
else:
time.sleep(poll_interval)
else:
ready, _, _ = select.select([process.stdout], [], [], poll_interval)
if ready:
output = process.stdout.readline()
if output:
line = output.decode("utf-8", errors="replace").strip()
if line:
append_and_emit_log(
log_dir=log_dir,
emit_output=emit_output,
app_name=app_name,
line=line,
)
except Exception as exc:
error_msg = f"Error reading output for {app_name}: {exc}"
logger.exception(error_msg)
write_log_to_file(
log_dir,
app_name,
f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}",
)
break