1
0
Fork 0
CrawlerEngines/engines/crawler_newscn_search.py

242 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from bs4 import BeautifulSoup as soup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import urllib.parse
import json
import time
import re
import os
from utils.logger import logger
# from utils.logger import logger
BASE_URL_EN = "https://english.news.cn"
BASE_URL_CN = "https://so.news.cn/"
XINHUA_OVERSEAS_REGIONS = ["asiapacific", "europe", "africa", "northamerica", "german"]
# The maximum number of blog posts to crawl
# C:\Users\MillionZhang\.cache\selenium\chromedriver\win64
MAX_BLOG_LIMIT = 8
# 新华网英文站在线搜索
class Crawler_NewsCN:
def __init__(self) -> None:
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--ignore-ssl-errors=yes")
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
self.driver = webdriver.Chrome(options=chrome_options)
self.selected_blogs = {"titles": [], "urls": []}
def __search_topic(self, topic: str, lang):
if lang == "en":
self.driver.get(BASE_URL_EN)
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua"))
# input topic to be searched in search bar
search_bar = self.driver.find_element(By.CLASS_NAME, "search-input")
search_bar.send_keys(topic)
# click search button
search_submit_button = self.driver.find_element(By.ID, "searchSubmit")
search_submit_button.click()
# close home window and switch to new window
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua research"))
else:
if lang == "cn":
self.driver.get(BASE_URL_CN)
WebDriverWait(self.driver, 10).until(EC.title_contains("新华搜索"))
# input topic to be searched in search bar
search_bar = self.driver.find_element(By.CLASS_NAME, "input")
search_bar.send_keys(topic)
# click search button
# search_submit_button = self.driver.find_element(By.ID, "searchSubmit")
search_submit_button = self.driver.find_element(
By.CLASS_NAME, "search-button"
)
search_submit_button.click()
self.driver.switch_to.window(self.driver.window_handles[0])
WebDriverWait(self.driver, 10).until(EC.title_contains("新华搜索"))
def __select_blog_in_search_page(self):
raw_blogs = self.driver.find_element(By.CLASS_NAME, "content").get_attribute(
"innerHTML"
)
raw_blogs = soup(raw_blogs, features="html.parser")
raw_blogs = raw_blogs.findAll("div", attrs={"class": "item"}, recursive=True)
if not raw_blogs:
return
for raw_blog in raw_blogs:
title = (
raw_blog.find("div", class_="title")
.text.replace("\n", "")
.replace(" ", "")
.lower()
)
# prevent crawl duplicate blog from different source
if not title in self.selected_blogs.get("titles"):
self.selected_blogs["titles"].append(title)
self.selected_blogs["urls"].append(raw_blog.find("a")["href"])
if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT:
# go to next page
if self.driver.find_elements(By.CLASS_NAME, "ant-pagination-next"):
next_page_btn = self.driver.find_element(
By.CLASS_NAME, "ant-pagination-next"
)
if next_page_btn.get_attribute("aria-disabled") != "true":
try:
next_page_btn.click()
time.sleep(2)
self.__select_blog_in_search_page()
except:
import traceback
print(traceback.format_exc())
else:
self.selected_blogs["titles"] = self.selected_blogs["titles"][
:MAX_BLOG_LIMIT
]
self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT]
# print(self.selected_blogs["urls"])
def __retrieve_overseas_blog(self) -> dict:
blog_container = self.driver.find_element(By.CLASS_NAME, "main.clearfix")
blog_title = blog_container.find_element(By.CLASS_NAME, "Btitle").text
blog_meta = blog_container.find_element(By.CLASS_NAME, "wzzy").text
blog_content = blog_container.find_element(By.ID, "detailContent").text
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
def __retrieve_china_blog(self) -> dict:
blog_container = self.driver.find_element(By.CLASS_NAME, "conBox")
blog_title_meta_container = blog_container.find_element(By.CLASS_NAME, "conTop")
blog_title = blog_title_meta_container.find_element(By.TAG_NAME, "h1").text
blog_meta = blog_title_meta_container.find_element(
By.CLASS_NAME, "infoBox.clearfix"
).text
blog_content_container = blog_container.find_element(By.CLASS_NAME, "conLeft")
blog_content = blog_content_container.find_element(By.ID, "detailContent").text
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
# 当lang=cn时
def __retrieve_cn_blog(self) -> dict:
blog_title = (
self.driver.find_element(By.CLASS_NAME, "head-line.clearfix")
.find_element(By.CLASS_NAME, "title")
.text
)
blog_meta = (
self.driver.find_element(By.CLASS_NAME, "header-cont.clearfix")
.find_element(By.CLASS_NAME, "source")
.text
)
blog_content = self.driver.find_element(By.ID, "detailContent").text
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
def __get_and_save_blog(self, url: str, lang):
self.driver.get(url)
if lang == "en":
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua"))
# time.sleep(1)
# 特殊情况https://german.news.cn/20241016/93ca92839e1b44dc8f6dca21f9c80902/c.html
# region_code = urllib.parse.urlparse(url).path.split("/")[1]
# if region_code in XINHUA_OVERSEAS_REGIONS:
# blog = self.__retrieve_overseas_blog()
# else:
# blog = self.__retrieve_china_blog()
if self.driver.find_elements(By.CLASS_NAME, "conBox"):
blog = self.__retrieve_china_blog()
else:
if self.driver.find_elements(By.CLASS_NAME, "main.clearfix"):
blog = self.__retrieve_overseas_blog()
# div = WebDriverWait(self.driver, 10).until(
# EC.presence_of_element_located((By.CLASS_NAME, "detailContent"))
# )
# blog = self.__retrieve_overseas_blog()
# blog_title = blog.get("title", "")
else:
if lang == "cn":
WebDriverWait(self.driver, 10).until(EC.title_contains("新华"))
blog = self.__retrieve_cn_blog()
blog_title = blog.get("title", "")
# print(blog_title)
# Remove invalid char in file_path_name on Windows
# invalid_chars_pattern = r'[\\/:*?"<>|]'
# blog_title = re.sub(invalid_chars_pattern, "", blog_title)
# file = open(os.path.join("", f"Xinhua_{blog_title}.json"), "w")
# json.dump(blog, file)
# file.close()
# time.sleep(2)
return blog
def search_and_save(self, topic: str, lang):
content_list = []
logger.warning(f"Crawler_NewsCN start search topic {topic} {lang}")
self.__search_topic(topic, lang)
time.sleep(1)
logger.warning("Crawler_NewsCN start select blog in search page")
self.__select_blog_in_search_page()
# print(self.selected_blogs)
url_list = self.selected_blogs.get("urls", [])
logger.warning(f"Crawler_NewsCN url_list {str(url_list)}")
idx = 1
for url in url_list:
logger.warning(f"Crawler_NewsCN {idx}/{len(url_list)} url:{url}")
content = self.__get_and_save_blog(url, lang)
content_list.append(content)
idx += 1
return content_list
def direct_save(self, url: str):
self.__get_and_save_blog(url)
def process(self, inputData):
logger.warning("Crawler_NewsCN / inputData", inputData)
keyword = inputData["keyword"]
lang = inputData["lang"]
result = []
try:
result = self.search_and_save(keyword, lang)
# print("result", result)
except Exception as e:
import traceback
logger.warning(f"Crawler_NewsCN {traceback.format_exc()}")
finally:
self.driver.quit()
logger.warning(
f"【Crawler_NewsCN process completed】, keyword={keyword}, result len={len(result)}"
)
return result