__init__.py
6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 21:34
# @Desc :
import re
from typing import List
from var import source_keyword_var
from .weibo_store_media import *
from .weibo_store_impl import *
class WeibostoreFactory:
STORES = {
"csv": WeiboCsvStoreImplement,
"db": WeiboDbStoreImplement,
"json": WeiboJsonStoreImplement,
"sqlite": WeiboSqliteStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = WeibostoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
return store_class()
async def batch_update_weibo_notes(note_list: List[Dict]):
"""
Batch update weibo notes
Args:
note_list:
Returns:
"""
if not note_list:
return
for note_item in note_list:
await update_weibo_note(note_item)
async def update_weibo_note(note_item: Dict):
"""
Update weibo note
Args:
note_item:
Returns:
"""
if not note_item:
return
mblog: Dict = note_item.get("mblog")
user_info: Dict = mblog.get("user")
note_id = mblog.get("id")
content_text = mblog.get("text")
clean_text = re.sub(r"<.*?>", "", content_text)
save_content_item = {
# 微博信息
"note_id": note_id,
"content": clean_text,
"create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(mblog.get("created_at"))),
"liked_count": str(mblog.get("attitudes_count", 0)),
"comments_count": str(mblog.get("comments_count", 0)),
"shared_count": str(mblog.get("reposts_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://m.weibo.cn/detail/{note_id}",
"ip_location": mblog.get("region_name", "").replace("发布于 ", ""),
# 用户信息
"user_id": str(user_info.get("id")),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
"source_keyword": source_keyword_var.get(),
}
utils.logger.info(f"[store.weibo.update_weibo_note] weibo note id:{note_id}, title:{save_content_item.get('content')[:24]} ...")
await WeibostoreFactory.create_store().store_content(content_item=save_content_item)
async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]):
"""
Batch update weibo note comments
Args:
note_id:
comments:
Returns:
"""
if not comments:
return
for comment_item in comments:
await update_weibo_note_comment(note_id, comment_item)
async def update_weibo_note_comment(note_id: str, comment_item: Dict):
"""
Update weibo note comment
Args:
note_id: weibo note id
comment_item: weibo comment item
Returns:
"""
if not comment_item or not note_id:
return
comment_id = str(comment_item.get("id"))
user_info: Dict = comment_item.get("user")
content_text = comment_item.get("text")
clean_text = re.sub(r"<.*?>", "", content_text)
save_comment_item = {
"comment_id": comment_id,
"create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))),
"note_id": note_id,
"content": clean_text,
"sub_comment_count": str(comment_item.get("total_number", 0)),
"comment_like_count": str(comment_item.get("like_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"ip_location": comment_item.get("source", "").replace("来自", ""),
"parent_comment_id": comment_item.get("rootid", ""),
# 用户信息
"user_id": str(user_info.get("id")),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(f"[store.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {save_comment_item.get('content', '')[:24]} ...")
await WeibostoreFactory.create_store().store_comment(comment_item=save_comment_item)
async def update_weibo_note_image(picid: str, pic_content, extension_file_name):
"""
Save weibo note image to local
Args:
picid:
pic_content:
extension_file_name:
Returns:
"""
await WeiboStoreImage().store_image({"pic_id": picid, "pic_content": pic_content, "extension_file_name": extension_file_name})
async def save_creator(user_id: str, user_info: Dict):
"""
Save creator information to local
Args:
user_id:
user_info:
Returns:
"""
local_db_item = {
'user_id': user_id,
'nickname': user_info.get('screen_name'),
'gender': '女' if user_info.get('gender') == "f" else '男',
'avatar': user_info.get('avatar_hd'),
'desc': user_info.get('description'),
'ip_location': user_info.get("source", "").replace("来自", ""),
'follows': user_info.get('follow_count', ''),
'fans': user_info.get('followers_count', ''),
'tag_list': '',
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[store.weibo.save_creator] creator:{local_db_item}")
await WeibostoreFactory.create_store().store_creator(local_db_item)