1
0
Fork 0
CrawlerEngines/engines/crawler_bbc_search.py

213 lines
7.6 KiB
Python

from bs4 import BeautifulSoup as soup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import urllib.parse
import json
import time
import re
from utils.logger import logger
BASE_URL = "https://www.bbc.com"
TARGET_URL = urllib.parse.urljoin(BASE_URL, "/news")
# The maximum number of blog posts to crawl
MAX_BLOG_LIMIT = 8
# bbc在线搜索
class Crawler_BBCSearch:
def __init__(self) -> None:
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--ignore-ssl-errors=yes")
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
self.driver = webdriver.Chrome(options=chrome_options)
self.selected_blogs = {"titles": [], "urls": []}
def __search_topic(self, topic: str):
self.driver.get(BASE_URL)
WebDriverWait(self.driver, 10).until(EC.title_contains("BBC"))
# open up side bar to show search bar
self.driver.find_element(By.CLASS_NAME, "sc-8a068d35-3.kvafkS").click()
time.sleep(1)
# input topic to be searched in search bar
search_bar = self.driver.find_element(By.CLASS_NAME, "sc-e1a87ea7-1.iARAvt")
search_bar.send_keys(topic)
# click search button
# search_submit_button = self.driver.find_element(
# By.CLASS_NAME, "sc-f6c53a81-2.sc-f6c53a81-3.dyeOnJ.dQfGZm"
# )
search_submit_button = self.driver.find_element(
By.CSS_SELECTOR, '[data-testid="search-input-search-button"]'
)
search_submit_button.click()
# wait for the page to load
WebDriverWait(self.driver, 10).until(EC.title_contains("BBC"))
def __select_blog_in_search_page(self):
raw_blogs = self.driver.find_element(By.ID, "main-content").get_attribute(
"innerHTML"
)
# to prevent dynamic class value of different articles generated by backend, use bs4 to trace tag
raw_blogs = soup(raw_blogs, features="html.parser")
raw_blogs = raw_blogs.findAll(
"div", attrs={"data-testid": "liverpool-card"}, recursive=True
)
if not raw_blogs:
return
for raw_blog in raw_blogs:
title = (
raw_blog.find(
"h2", attrs={"data-testid": "card-headline"}, recursive=True
)
.text.replace("\n", "")
.lower()
)
# prevent crawl duplicate blog from different source
if not title in self.selected_blogs.get("titles"):
# skip blogs that are not news
try:
url = raw_blog.find(
"a", attrs={"data-testid": "internal-link"}, recursive=True
)["href"]
except Exception:
continue
# skip blogs that are not news
if "news" in url and "/videos/" not in url:
self.selected_blogs["titles"].append(title)
# bbc's href links only contains path
if not urllib.parse.urlparse(url).netloc:
url = urllib.parse.urljoin(BASE_URL, url)
self.selected_blogs["urls"].append(url)
if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT:
# go to next page
if self.driver.find_elements(
By.CSS_SELECTOR, 'button[data-testid="pagination-next-button"]'
):
next_page_btn = self.driver.find_element(
By.CSS_SELECTOR, 'button[data-testid="pagination-next-button"]'
)
try:
next_page_btn.click()
time.sleep(2)
self.__select_blog_in_search_page()
except:
import traceback
print(traceback.format_exc())
else:
self.selected_blogs["titles"] = self.selected_blogs["titles"][
:MAX_BLOG_LIMIT
]
self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT]
def __retrieve_blog(self) -> dict:
blog_container = self.driver.find_element(By.ID, "main-content")
raw_blog = soup(
blog_container.get_attribute("innerHTML"), features="html.parser"
)
blog_title = raw_blog.find(
"div", attrs={"data-component": "headline-block"}, recursive=True
).text
blog_time = raw_blog.find("time", recursive=True).text
blog_contributor_el = raw_blog.find(
"div",
attrs={"data-testid": "byline-new-contributors"},
recursive=True,
)
blog_contributor = ""
if blog_contributor_el != None:
blog_contributor = blog_contributor_el.text
blog_content_blocks = raw_blog.find_all(
"div", attrs={"data-component": "text-block"}, recursive=True
)
blog_meta = {"time": blog_time, "author": blog_contributor}
blog_content = ""
for block in blog_content_blocks:
blog_content += block.text
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
def __get_and_save_blog(self, url: str):
self.driver.get(url)
# WebDriverWait(self.driver, 10).until(EC.title_contains("新华"))
div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "main-content"))
)
time.sleep(1)
blog = self.__retrieve_blog()
return blog
blog_title = blog.get("title", "")
print(blog_title)
# Remove invalid char in file_path_name on Windows
invalid_chars_pattern = r'[\\/:*?"<>|]'
blog_title = re.sub(invalid_chars_pattern, "", blog_title)
file = open(f"./saved_articles/BBC_{blog_title}.json", "w")
json.dump(blog, file)
file.close()
time.sleep(2)
def search_and_save(self, topic: str):
content_list = []
logger.warning(f"Crawler_BBCSearch start search topic {topic}")
self.__search_topic(topic)
time.sleep(1)
logger.warning("Crawler_BBCSearch start select blog in search page")
self.__select_blog_in_search_page()
url_list = self.selected_blogs.get("urls", [])
logger.warning(f"Crawler_BBCSearch url_list {str(url_list)}")
idx = 1
for url in url_list:
logger.warning(f"Crawler_BBCSearch {idx}/{len(url_list)} url:{url}")
content = self.__get_and_save_blog(url)
content_list.append(content)
idx += 1
return content_list
def direct_save(self, url: str):
self.__get_and_save_blog(url)
def process(self, inputData):
logger.warning("Crawler_BBCSearch / inputData", inputData)
keyword = inputData["keyword"]
result = []
try:
result = self.search_and_save(keyword)
# print("result", result)
except Exception as e:
import traceback
logger.warning(f"Crawler_BBCSearch {traceback.format_exc()}")
finally:
self.driver.quit()
logger.warning(
f"Crawler_BBCSearch process completed】, keyword={keyword}, result len={len(result)}"
)
return result