yelingdenghe
Committed by GitHub

Fixing dy data scraping (#400)

* Fixing dy data scraping

* chore: revert base_config.py to defaul values and modify some comments

---------

Co-authored-by: yling <yling@test.com>
... ... @@ -104,7 +104,7 @@ class DouyinAweme(Base):
ip_location = Column(Text)
add_ts = Column(BigInteger)
last_modify_ts = Column(BigInteger)
aweme_id = Column(BigInteger, index=True)
aweme_id = Column(String(64), index=True)
aweme_type = Column(Text)
title = Column(Text)
desc = Column(Text)
... ... @@ -133,8 +133,8 @@ class DouyinAwemeComment(Base):
ip_location = Column(Text)
add_ts = Column(BigInteger)
last_modify_ts = Column(BigInteger)
comment_id = Column(BigInteger, index=True)
aweme_id = Column(BigInteger, index=True)
comment_id = Column(String(64), index=True)
aweme_id = Column(String(64), index=True)
content = Column(Text)
create_time = Column(BigInteger)
sub_comment_count = Column(Text)
... ... @@ -431,4 +431,4 @@ class ZhihuCreator(Base):
column_count = Column(Integer, default=0)
get_voteup_count = Column(Integer, default=0)
add_ts = Column(BigInteger)
last_modify_ts = Column(BigInteger)
\ No newline at end of file
last_modify_ts = Column(BigInteger)
... ...
... ... @@ -151,24 +151,33 @@ class DouYinClient(AbstractApiClient):
:return:
"""
query_params = {
'search_channel': search_channel.value,
'enable_history': '1',
'device_platform': 'webapp',
'aid': '6383',
'channel': 'channel_pc_web',
'search_channel': 'aweme_general',
'sort_type': '0',
'publish_time': '0',
'keyword': keyword,
'search_source': 'tab_search',
'search_source': 'normal_search',
'query_correct_type': '1',
'is_filter_search': '0',
'from_group_id': '7378810571505847586',
# 'from_group_id': '', # 删掉或留空,不要硬编码过期的 ID
'offset': offset,
'count': '15',
'count': '10',
'need_filter_settings': '1',
'list_type': 'multi',
# 'list_type': 'multi', 注释掉
'search_id': search_id,
'pc_client_type': '1',
'version_code': '190600',
'version_name': '19.6.0',
'cookie_enabled': 'true',
'platform': 'PC',
'downlink': '10',
}
if sort_type.value != SearchSortType.GENERAL.value or publish_time.value != PublishTimeType.UNLIMITED.value:
query_params["filter_selected"] = json.dumps({"sort_type": str(sort_type.value), "publish_time": str(publish_time.value)})
query_params["is_filter_search"] = 1
query_params["search_source"] = "tab_search"
referer_url = f"https://www.douyin.com/search/{keyword}?aid=f594bbd9-a0e2-4651-9319-ebe3cb6298c1&type=general"
referer_url = f"https://www.douyin.com/search/{keyword}?aid=6383&type=general"
headers = copy.copy(self.headers)
headers["Referer"] = urllib.parse.quote(referer_url, safe=':/')
return await self.get("/aweme/v1/web/general/search/single/", query_params, headers=headers)
... ...
... ... @@ -129,8 +129,18 @@ class DouYinCrawler(AbstractCrawler):
publish_time=PublishTimeType(config.PUBLISH_TIME_TYPE),
search_id=dy_search_id,
)
# === 新增调试代码 START ===
# 打印返回的所有 Key,看看有没有 'data' 或者 'aweme_list'
utils.logger.info(f"[DEBUG] 接口返回的字段 keys: {list(posts_res.keys())}")
# 如果返回里直接有 aweme_list,说明结构变了
if "aweme_list" in posts_res and "data" not in posts_res:
utils.logger.info("[DEBUG] 检测到 aweme_list 在根节点,正在修正数据结构...")
posts_res["data"] = [{"aweme_info": item} for item in posts_res["aweme_list"]]
# === 新增调试代码 END ===
if posts_res.get("data") is None or posts_res.get("data") == []:
utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`")
utils.logger.info(f"[DouYinCrawler.search] 结果为空。Status: {posts_res.get('status_code')}, Msg: {posts_res.get('status_msg')}")
break
except DataFetchError:
utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} failed")
... ...
... ... @@ -33,6 +33,23 @@ from tools import utils, words
from var import crawler_type_var
def _sanitize_strings(data: Dict) -> Dict:
"""
Remove PostgreSQL-incompatible control characters (e.g., NULL) from all string fields.
Args:
data: original dict
Returns:
A new dict with sanitized string values
"""
cleaned = {}
for key, value in data.items():
if isinstance(value, str):
cleaned[key] = value.replace('\x00', '')
else:
cleaned[key] = value
return cleaned
class BiliCsvStoreImplement(AbstractStore):
def __init__(self):
self.file_writer = AsyncFileWriter(
... ... @@ -122,6 +139,8 @@ class BiliDbStoreImplement(AbstractStore):
# 确保 video_id 为整数类型,匹配数据库 BigInteger 字段
if video_id is not None:
video_id = int(video_id) if not isinstance(video_id, int) else video_id
content_item["video_id"] = video_id
content_item = _sanitize_strings(content_item)
async with get_session() as session:
result = await session.execute(select(BilibiliVideo).where(BilibiliVideo.video_id == video_id))
video_detail = result.scalar_one_or_none()
... ... @@ -145,6 +164,8 @@ class BiliDbStoreImplement(AbstractStore):
# 确保 comment_id 为整数类型,匹配数据库 BigInteger 字段
if comment_id is not None:
comment_id = int(comment_id) if not isinstance(comment_id, int) else comment_id
comment_item["comment_id"] = comment_id
comment_item = _sanitize_strings(comment_item)
async with get_session() as session:
result = await session.execute(select(BilibiliVideoComment).where(BilibiliVideoComment.comment_id == comment_id))
comment_detail = result.scalar_one_or_none()
... ... @@ -168,6 +189,8 @@ class BiliDbStoreImplement(AbstractStore):
# 确保 creator_id 为整数类型,匹配数据库 BigInteger 字段
if creator_id is not None:
creator_id = int(creator_id) if not isinstance(creator_id, int) else creator_id
creator["user_id"] = creator_id
creator = _sanitize_strings(creator)
async with get_session() as session:
result = await session.execute(select(BilibiliUpInfo).where(BilibiliUpInfo.user_id == creator_id))
creator_detail = result.scalar_one_or_none()
... ... @@ -192,8 +215,11 @@ class BiliDbStoreImplement(AbstractStore):
# 确保 up_id 和 fan_id 为整数类型,匹配数据库 BigInteger 字段
if up_id is not None:
up_id = int(up_id) if not isinstance(up_id, int) else up_id
contact_item["up_id"] = up_id
if fan_id is not None:
fan_id = int(fan_id) if not isinstance(fan_id, int) else fan_id
contact_item["fan_id"] = fan_id
contact_item = _sanitize_strings(contact_item)
async with get_session() as session:
result = await session.execute(
select(BilibiliContactInfo).where(BilibiliContactInfo.up_id == up_id, BilibiliContactInfo.fan_id == fan_id)
... ... @@ -216,6 +242,7 @@ class BiliDbStoreImplement(AbstractStore):
dynamic_item: dynamic item dict
"""
dynamic_id = dynamic_item.get("dynamic_id")
dynamic_item = _sanitize_strings(dynamic_item)
async with get_session() as session:
result = await session.execute(select(BilibiliUpDynamic).where(BilibiliUpDynamic.dynamic_id == dynamic_id))
dynamic_detail = result.scalar_one_or_none()
... ...
... ... @@ -206,7 +206,7 @@ async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict):
"nickname": user_info.get("nickname"),
"avatar": avatar_info.get("url_list", [""])[0],
"sub_comment_count": str(comment_item.get("reply_comment_total", 0)),
"like_count": (comment_item.get("digg_count") if comment_item.get("digg_count") else 0),
"like_count": str(comment_item.get("digg_count") or 0),
"last_modify_ts": utils.get_current_timestamp(),
"parent_comment_id": parent_comment_id,
"pictures": ",".join(_extract_comment_image_list(comment_item)),
... ...
... ... @@ -121,7 +121,7 @@ class DouyinAweme(Base):
ip_location: Mapped[str | None] = mapped_column(Text, nullable=True)
add_ts: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_modify_ts: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
aweme_id: Mapped[int | None] = mapped_column(BigInteger, index=True, nullable=True)
aweme_id: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
aweme_type: Mapped[str | None] = mapped_column(Text, nullable=True)
title: Mapped[str | None] = mapped_column(Text, nullable=True)
desc: Mapped[str | None] = mapped_column(Text, nullable=True)
... ... @@ -152,8 +152,8 @@ class DouyinAwemeComment(Base):
ip_location: Mapped[str | None] = mapped_column(Text, nullable=True)
add_ts: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_modify_ts: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
comment_id: Mapped[int | None] = mapped_column(BigInteger, index=True, nullable=True)
aweme_id: Mapped[int | None] = mapped_column(BigInteger, index=True, nullable=True)
comment_id: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
aweme_id: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
content: Mapped[str | None] = mapped_column(Text, nullable=True)
create_time: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
sub_comment_count: Mapped[str | None] = mapped_column(Text, nullable=True)
... ...