cache_manager.py
4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import json
import os
import time
from datetime import datetime, timedelta
import threading
import queue
class PredictionCache:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(PredictionCache, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.cache_dir = 'cache/predictions'
self.cache_duration = timedelta(hours=24) # 缓存24小时
self.cache = {}
self.cache_queue = queue.Queue()
self.initialized = True
# 确保缓存目录存在
os.makedirs(self.cache_dir, exist_ok=True)
# 启动缓存清理线程
self.cleanup_thread = threading.Thread(target=self._cleanup_old_cache, daemon=True)
self.cleanup_thread.start()
# 加载现有缓存
self._load_cache()
def _load_cache(self):
"""加载磁盘上的缓存文件"""
try:
for filename in os.listdir(self.cache_dir):
if filename.endswith('.json'):
filepath = os.path.join(self.cache_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
# 检查缓存是否过期
if self._is_cache_valid(cache_data['timestamp']):
topic = filename[:-5] # 移除.json后缀
self.cache[topic] = cache_data
else:
# 删除过期缓存文件
os.remove(filepath)
except Exception as e:
print(f"加载缓存失败: {e}")
def _cleanup_old_cache(self):
"""定期清理过期缓存的后台线程"""
while True:
try:
# 检查并清理内存缓存
current_time = datetime.now()
expired_topics = []
for topic, cache_data in self.cache.items():
if not self._is_cache_valid(cache_data['timestamp']):
expired_topics.append(topic)
# 删除过期缓存
for topic in expired_topics:
del self.cache[topic]
cache_file = os.path.join(self.cache_dir, f"{topic}.json")
if os.path.exists(cache_file):
os.remove(cache_file)
# 休眠1小时后再次检查
time.sleep(3600)
except Exception as e:
print(f"清理缓存时出错: {e}")
time.sleep(3600) # 发生错误时也等待1小时
def _is_cache_valid(self, timestamp):
"""检查缓存是否有效"""
cache_time = datetime.fromtimestamp(timestamp)
return datetime.now() - cache_time < self.cache_duration
def get(self, topic):
"""获取话题的预测缓存"""
if topic in self.cache and self._is_cache_valid(self.cache[topic]['timestamp']):
return self.cache[topic]['prediction']
return None
def set(self, topic, prediction):
"""设置话题的预测缓存"""
cache_data = {
'prediction': prediction,
'timestamp': datetime.now().timestamp()
}
# 更新内存缓存
self.cache[topic] = cache_data
# 异步保存到磁盘
self.cache_queue.put((topic, cache_data))
threading.Thread(target=self._save_cache_to_disk, daemon=True).start()
def _save_cache_to_disk(self):
"""异步保存缓存到磁盘"""
try:
while not self.cache_queue.empty():
topic, cache_data = self.cache_queue.get()
cache_file = os.path.join(self.cache_dir, f"{topic}.json")
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存缓存到磁盘失败: {e}")
# 创建全局缓存实例
prediction_cache = PredictionCache()