Wenkai Liang
Committed by GitHub

Merge pull request #7 from wjhgq/main

The new practice sequence model to complete the public opinion prediction function.
import numpy as np
import datetime
import matplotlib.pyplot as plt
import pandas as pd
from pmdarima import auto_arima
def datetime_to_number(date: str): # 格式化日期转换为 integer
def datetime_to_number(date: str):
"""Convert a date string 'YYYY-MM-DD' to a relative day number."""
date_number = datetime.datetime.strptime(date, "%Y-%m-%d")
base_number = datetime.datetime.strptime("2024-1-1", "%Y-%m-%d")
return (date_number - base_number).days
def predict_future_values(data, forecast_days=5):
"""
Use auto_arima from pmdarima to fit a suitable ARIMA/SARIMA model for the time series,
then predict future values for the specified number of days.
Parameters:
data: dict, keys are date strings 'YYYY-MM-DD', values are integer counts
forecast_days: int, number of days to predict into the future
Returns:
predictions: dict, keys are future date strings 'YYYY-MM-DD', values are predicted integers (≥0)
"""
if not data:
return {}
def predict_future_values(data):
# 提取并排序日期
sorted_dates = sorted(data.keys(), key=lambda date: datetime.datetime.strptime(date, "%Y-%m-%d"))
sorted_data = {k: data[k] for k in sorted_dates}
# Sort data by date
sorted_dates = sorted(data.keys(), key=lambda d: datetime.datetime.strptime(d, "%Y-%m-%d"))
start_date = sorted_dates[0]
end_date = sorted_dates[-1]
# 将日期转换为整数并提取相应的值
xs = np.array([datetime_to_number(date) for date in sorted_data.keys()])
ys = np.array([data[date] for date in sorted_data.keys()])
# Create a full date range to ensure continuity in the time series
full_range = pd.date_range(start=start_date, end=end_date, freq='D')
ts = pd.Series(0, index=full_range, dtype=float)
for d in data:
ts[pd.to_datetime(d)] = data[d]
# 拟合线性回归模型
fit = np.polyfit(xs, ys, 1)
fn = np.poly1d(fit)
# Simple smoothing: optional step to reduce noise (moving average over 3 days)
# This is a mild smoothing to handle noisy data. You can comment this out if not needed.
ts_smoothed = ts.rolling(window=3, min_periods=1).mean()
# 获取最新日期,并生成未来三天的日期
latest_date = sorted_dates[-1]
latest_date_obj = datetime.datetime.strptime(latest_date, "%Y-%m-%d")
future_dates = [(latest_date_obj + datetime.timedelta(days=i)).strftime("%Y-%m-%d") for i in range(1, 6)]
# Fit the time series with auto_arima to find the best parameters
model = auto_arima(ts_smoothed,
start_p=1, start_q=1,
max_p=5, max_q=5,
seasonal=False,
trace=False, error_action='ignore', suppress_warnings=True, stepwise=True)
# 预测未来日期的值
# Predict the future values
forecast = model.predict(n_periods=forecast_days)
# Construct future dates
last_date = pd.to_datetime(end_date)
future_dates = [last_date + datetime.timedelta(days=i) for i in range(1, forecast_days+1)]
# Convert forecast results to dict with non-negative integers
predictions = {}
for date in future_dates:
date_num = datetime_to_number(date)
if int(fn(date_num))<=0:
predictions[date] = 0
else:
predictions[date] = int(fn(date_num))
for d, v in zip(future_dates, forecast):
predictions[d.strftime("%Y-%m-%d")] = max(int(round(v)), 0)
return predictions
if __name__ == '__main__':
data = {'2024-06-15': 1, '2024-06-18': 1, '2024-06-22': 1, '2024-06-23': 1, '2024-07-01': 3, '2024-07-02': 4, '2024-07-03': 4, '2024-07-04': 14}
predictions = predict_future_values(data)
print(predictions)
# for date, value in predictions.items():
# print(f'{date} PREDICTION: {value}')
data = {
'2024-06-15': 1, '2024-06-18': 1, '2024-06-22': 1,
'2024-06-23': 1, '2024-07-01': 3, '2024-07-02': 4,
'2024-07-03': 4, '2024-07-04': 14
}
preds = predict_future_values(data)
print(preds)
... ...
from utils.getPublicData import *
from utils.predict import *
articleList = getAllArticleData()
commentList = getAllCommentsData()
from utils.predict import predict_future_values # Use the new function
import csv
import os
import datetime
def getTopicByArticle():# 返回文章内容的话题字典
articleTopicDic = {}
for i in articleList:
if i[14] != None:
if i[14] in articleTopicDic.keys():
articleTopicDic[i[14]] += 1
else:
articleTopicDic[i[14]] = 1
resultData = []
for key,value in articleTopicDic.items():
resultData.append({
'name':key,
'value':value
})
return resultData
def getTopicByComments():# 返回评论内容的话题字典
commentsTopicDic = {}
for i in commentList:
if i[9] != None:
if i[9] in commentsTopicDic:
commentsTopicDic[i[9]] += 1
else:
commentsTopicDic[i[9]] = 1
resultData = []
for key,value in commentsTopicDic.items():
resultData.append({
'name':key,
'value':value
})
return resultData
def mergeTopics(article_topics, comment_topics):# 合并话题
merged_dict = {}
for topic in article_topics + comment_topics:
if topic['name'] in merged_dict:
merged_dict[topic['name']] += topic['value']
else:
merged_dict[topic['name']] = topic['value']
merged_dict = sorted(merged_dict.items(), key=lambda item: item[1], reverse=True)
merged_list = [[key, str(value)] for key, value in merged_dict]
return merged_list
def getAllTopicData():
# 读取合并文件 merge.csv
# data = []
# df = pd.read_csv('./merged_topics.csv',encoding='utf8')
# for i in df.values:
# try:
# data.append([
# re.search('[\u4e00-\u9fa5]+',str(i)).group(),
# re.search('\d+',str(i)).group()
# ])
# except:
# continue
return mergeTopics(getTopicByArticle(), getTopicByComments())
import pandas as pd
def getTopicCreatedAtandpredictData(topic):# 统计特定话题的评论在每个日期的数量,并返回日期和对应的评论数量
def getTopicCreatedAtandpredictData(topic):
createdAt = {}
for i in articleList:
if i[14]==topic:
... ... @@ -75,30 +19,13 @@ def getTopicCreatedAtandpredictData(topic):# 统计特定话题的评论在每
createdAt[i[1]] += 1
else:
createdAt[i[1]] = 1
createdAt = {k: createdAt[k] for k in sorted(createdAt, key=lambda date: datetime.datetime.strptime(date, "%Y-%m-%d"))}
createdAt.update(predict_future_values(createdAt))
sorted_data = {k: createdAt[k] for k in sorted(createdAt, key=lambda date: datetime.datetime.strptime(date, "%Y-%m-%d"))}
# result_list = [0] * (len(sorted_data) - 5) + [1] * 5
print(list(createdAt.keys()),list(createdAt.values()))
return list(createdAt.keys()),list(createdAt.values())
def writeTopicsToCSV(topics, file_name):
# 检查文件是否存在,如果存在则附加写入,否则新建一个
file_exists = os.path.isfile(file_name)
# 按值的降序排序
sorted_topics = sorted(topics, key=lambda x: x['value'], reverse=True)
with open(file_name, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['name', 'value']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 如果文件不存在,则写入表头
if not file_exists:
writer.writeheader()
# 写入数据
for topic in sorted_topics:
writer.writerow(topic)
if __name__ == '__main__':
# 将话题数据写入 CSV 文件
# print(mergeTopics(getTopicByArticle(), getTopicByComments()))
# writeTopicsToCSV(merged_topics, 'merged_topics.csv')
print(getAllTopicData())
# Use the improved time series prediction approach
predictions = predict_future_values(createdAt, forecast_days=5)
# Merge historical data and predictions
combined_data = {**createdAt, **predictions}
combined_data = {k: combined_data[k] for k in sorted(combined_data, key=lambda date: datetime.datetime.strptime(date, "%Y-%m-%d"))}
print(list(combined_data.keys()), list(combined_data.values()))
return list(combined_data.keys()), list(combined_data.values())
... ...