forked from iCON/CrawlerEngines
242 lines
9.4 KiB
Python
242 lines
9.4 KiB
Python
from bs4 import BeautifulSoup as soup
|
||
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.wait import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
import urllib.parse
|
||
import json
|
||
import time
|
||
import re
|
||
import os
|
||
from utils.logger import logger
|
||
|
||
# from utils.logger import logger
|
||
|
||
BASE_URL_EN = "https://english.news.cn"
|
||
BASE_URL_CN = "https://so.news.cn/"
|
||
XINHUA_OVERSEAS_REGIONS = ["asiapacific", "europe", "africa", "northamerica", "german"]
|
||
|
||
# The maximum number of blog posts to crawl
|
||
# C:\Users\MillionZhang\.cache\selenium\chromedriver\win64
|
||
MAX_BLOG_LIMIT = 8
|
||
|
||
|
||
# 新华网英文站在线搜索
|
||
class Crawler_NewsCN:
|
||
def __init__(self) -> None:
|
||
from selenium.webdriver.chrome.options import Options
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_options.add_argument("--ignore-certificate-errors")
|
||
chrome_options.add_argument("--ignore-ssl-errors=yes")
|
||
chrome_options.add_argument("--headless")
|
||
chrome_options.add_argument("--no-sandbox")
|
||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||
self.driver = webdriver.Chrome(options=chrome_options)
|
||
self.selected_blogs = {"titles": [], "urls": []}
|
||
|
||
def __search_topic(self, topic: str, lang):
|
||
if lang == "en":
|
||
self.driver.get(BASE_URL_EN)
|
||
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua"))
|
||
|
||
# input topic to be searched in search bar
|
||
search_bar = self.driver.find_element(By.CLASS_NAME, "search-input")
|
||
search_bar.send_keys(topic)
|
||
|
||
# click search button
|
||
search_submit_button = self.driver.find_element(By.ID, "searchSubmit")
|
||
search_submit_button.click()
|
||
|
||
# close home window and switch to new window
|
||
self.driver.close()
|
||
self.driver.switch_to.window(self.driver.window_handles[0])
|
||
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua research"))
|
||
else:
|
||
if lang == "cn":
|
||
self.driver.get(BASE_URL_CN)
|
||
WebDriverWait(self.driver, 10).until(EC.title_contains("新华搜索"))
|
||
|
||
# input topic to be searched in search bar
|
||
search_bar = self.driver.find_element(By.CLASS_NAME, "input")
|
||
search_bar.send_keys(topic)
|
||
|
||
# click search button
|
||
# search_submit_button = self.driver.find_element(By.ID, "searchSubmit")
|
||
search_submit_button = self.driver.find_element(
|
||
By.CLASS_NAME, "search-button"
|
||
)
|
||
search_submit_button.click()
|
||
self.driver.switch_to.window(self.driver.window_handles[0])
|
||
WebDriverWait(self.driver, 10).until(EC.title_contains("新华搜索"))
|
||
|
||
def __select_blog_in_search_page(self):
|
||
raw_blogs = self.driver.find_element(By.CLASS_NAME, "content").get_attribute(
|
||
"innerHTML"
|
||
)
|
||
raw_blogs = soup(raw_blogs, features="html.parser")
|
||
raw_blogs = raw_blogs.findAll("div", attrs={"class": "item"}, recursive=True)
|
||
|
||
if not raw_blogs:
|
||
return
|
||
|
||
for raw_blog in raw_blogs:
|
||
title = (
|
||
raw_blog.find("div", class_="title")
|
||
.text.replace("\n", "")
|
||
.replace(" ", "")
|
||
.lower()
|
||
)
|
||
# prevent crawl duplicate blog from different source
|
||
if not title in self.selected_blogs.get("titles"):
|
||
self.selected_blogs["titles"].append(title)
|
||
self.selected_blogs["urls"].append(raw_blog.find("a")["href"])
|
||
|
||
if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT:
|
||
# go to next page
|
||
if self.driver.find_elements(By.CLASS_NAME, "ant-pagination-next"):
|
||
next_page_btn = self.driver.find_element(
|
||
By.CLASS_NAME, "ant-pagination-next"
|
||
)
|
||
if next_page_btn.get_attribute("aria-disabled") != "true":
|
||
try:
|
||
next_page_btn.click()
|
||
time.sleep(2)
|
||
self.__select_blog_in_search_page()
|
||
except:
|
||
import traceback
|
||
|
||
print(traceback.format_exc())
|
||
else:
|
||
self.selected_blogs["titles"] = self.selected_blogs["titles"][
|
||
:MAX_BLOG_LIMIT
|
||
]
|
||
self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT]
|
||
|
||
# print(self.selected_blogs["urls"])
|
||
|
||
def __retrieve_overseas_blog(self) -> dict:
|
||
blog_container = self.driver.find_element(By.CLASS_NAME, "main.clearfix")
|
||
blog_title = blog_container.find_element(By.CLASS_NAME, "Btitle").text
|
||
blog_meta = blog_container.find_element(By.CLASS_NAME, "wzzy").text
|
||
blog_content = blog_container.find_element(By.ID, "detailContent").text
|
||
|
||
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
|
||
|
||
def __retrieve_china_blog(self) -> dict:
|
||
blog_container = self.driver.find_element(By.CLASS_NAME, "conBox")
|
||
blog_title_meta_container = blog_container.find_element(By.CLASS_NAME, "conTop")
|
||
blog_title = blog_title_meta_container.find_element(By.TAG_NAME, "h1").text
|
||
blog_meta = blog_title_meta_container.find_element(
|
||
By.CLASS_NAME, "infoBox.clearfix"
|
||
).text
|
||
blog_content_container = blog_container.find_element(By.CLASS_NAME, "conLeft")
|
||
blog_content = blog_content_container.find_element(By.ID, "detailContent").text
|
||
|
||
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
|
||
|
||
# 当lang=cn时
|
||
def __retrieve_cn_blog(self) -> dict:
|
||
blog_title = (
|
||
self.driver.find_element(By.CLASS_NAME, "head-line.clearfix")
|
||
.find_element(By.CLASS_NAME, "title")
|
||
.text
|
||
)
|
||
|
||
blog_meta = (
|
||
self.driver.find_element(By.CLASS_NAME, "header-cont.clearfix")
|
||
.find_element(By.CLASS_NAME, "source")
|
||
.text
|
||
)
|
||
|
||
blog_content = self.driver.find_element(By.ID, "detailContent").text
|
||
|
||
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
|
||
|
||
def __get_and_save_blog(self, url: str, lang):
|
||
self.driver.get(url)
|
||
|
||
if lang == "en":
|
||
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua"))
|
||
# time.sleep(1)
|
||
|
||
# 特殊情况:https://german.news.cn/20241016/93ca92839e1b44dc8f6dca21f9c80902/c.html
|
||
# region_code = urllib.parse.urlparse(url).path.split("/")[1]
|
||
# if region_code in XINHUA_OVERSEAS_REGIONS:
|
||
# blog = self.__retrieve_overseas_blog()
|
||
# else:
|
||
# blog = self.__retrieve_china_blog()
|
||
|
||
if self.driver.find_elements(By.CLASS_NAME, "conBox"):
|
||
blog = self.__retrieve_china_blog()
|
||
else:
|
||
if self.driver.find_elements(By.CLASS_NAME, "main.clearfix"):
|
||
blog = self.__retrieve_overseas_blog()
|
||
|
||
# div = WebDriverWait(self.driver, 10).until(
|
||
# EC.presence_of_element_located((By.CLASS_NAME, "detailContent"))
|
||
# )
|
||
# blog = self.__retrieve_overseas_blog()
|
||
|
||
# blog_title = blog.get("title", "")
|
||
else:
|
||
if lang == "cn":
|
||
WebDriverWait(self.driver, 10).until(EC.title_contains("新华"))
|
||
blog = self.__retrieve_cn_blog()
|
||
blog_title = blog.get("title", "")
|
||
# print(blog_title)
|
||
|
||
# Remove invalid char in file_path_name on Windows
|
||
# invalid_chars_pattern = r'[\\/:*?"<>|]'
|
||
# blog_title = re.sub(invalid_chars_pattern, "", blog_title)
|
||
|
||
# file = open(os.path.join("", f"Xinhua_{blog_title}.json"), "w")
|
||
# json.dump(blog, file)
|
||
# file.close()
|
||
# time.sleep(2)
|
||
return blog
|
||
|
||
def search_and_save(self, topic: str, lang):
|
||
content_list = []
|
||
logger.warning(f"Crawler_NewsCN start search topic {topic} {lang}")
|
||
self.__search_topic(topic, lang)
|
||
time.sleep(1)
|
||
logger.warning("Crawler_NewsCN start select blog in search page")
|
||
self.__select_blog_in_search_page()
|
||
# print(self.selected_blogs)
|
||
url_list = self.selected_blogs.get("urls", [])
|
||
logger.warning(f"Crawler_NewsCN url_list {str(url_list)}")
|
||
|
||
idx = 1
|
||
for url in url_list:
|
||
logger.warning(f"Crawler_NewsCN {idx}/{len(url_list)} url:{url}")
|
||
content = self.__get_and_save_blog(url, lang)
|
||
content_list.append(content)
|
||
idx += 1
|
||
return content_list
|
||
|
||
def direct_save(self, url: str):
|
||
self.__get_and_save_blog(url)
|
||
|
||
def process(self, inputData):
|
||
logger.warning("Crawler_NewsCN / inputData", inputData)
|
||
keyword = inputData["keyword"]
|
||
lang = inputData["lang"]
|
||
|
||
result = []
|
||
try:
|
||
result = self.search_and_save(keyword, lang)
|
||
# print("result", result)
|
||
except Exception as e:
|
||
import traceback
|
||
|
||
logger.warning(f"Crawler_NewsCN {traceback.format_exc()}")
|
||
finally:
|
||
self.driver.quit()
|
||
|
||
logger.warning(
|
||
f"【Crawler_NewsCN process completed】, keyword={keyword}, result len={len(result)}"
|
||
)
|
||
return result
|