from bs4 import BeautifulSoup as soup from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import urllib.parse import json import time import re from utils.logger import logger BASE_URL = "https://www.bbc.com" TARGET_URL = urllib.parse.urljoin(BASE_URL, "/news") # The maximum number of blog posts to crawl MAX_BLOG_LIMIT = 8 # bbc在线搜索 class Crawler_BBCSearch: def __init__(self) -> None: from selenium.webdriver.chrome.options import Options chrome_options = Options() chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--ignore-certificate-errors") chrome_options.add_argument("--ignore-ssl-errors=yes") chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") self.driver = webdriver.Chrome(options=chrome_options) self.selected_blogs = {"titles": [], "urls": []} def __search_topic(self, topic: str): self.driver.get(BASE_URL) WebDriverWait(self.driver, 10).until(EC.title_contains("BBC")) # open up side bar to show search bar self.driver.find_element(By.CLASS_NAME, "sc-8a068d35-3.kvafkS").click() time.sleep(1) # input topic to be searched in search bar search_bar = self.driver.find_element(By.CLASS_NAME, "sc-e1a87ea7-1.iARAvt") search_bar.send_keys(topic) # click search button # search_submit_button = self.driver.find_element( # By.CLASS_NAME, "sc-f6c53a81-2.sc-f6c53a81-3.dyeOnJ.dQfGZm" # ) search_submit_button = self.driver.find_element( By.CSS_SELECTOR, '[data-testid="search-input-search-button"]' ) search_submit_button.click() # wait for the page to load WebDriverWait(self.driver, 10).until(EC.title_contains("BBC")) def __select_blog_in_search_page(self): raw_blogs = self.driver.find_element(By.ID, "main-content").get_attribute( "innerHTML" ) # to prevent dynamic class value of different articles generated by backend, use bs4 to trace tag raw_blogs = soup(raw_blogs, features="html.parser") raw_blogs = raw_blogs.findAll( "div", attrs={"data-testid": "liverpool-card"}, recursive=True ) if not raw_blogs: return for raw_blog in raw_blogs: title = ( raw_blog.find( "h2", attrs={"data-testid": "card-headline"}, recursive=True ) .text.replace("\n", "") .lower() ) # prevent crawl duplicate blog from different source if not title in self.selected_blogs.get("titles"): # skip blogs that are not news try: url = raw_blog.find( "a", attrs={"data-testid": "internal-link"}, recursive=True )["href"] except Exception: continue # skip blogs that are not news if "news" in url and "/videos/" not in url: self.selected_blogs["titles"].append(title) # bbc's href links only contains path if not urllib.parse.urlparse(url).netloc: url = urllib.parse.urljoin(BASE_URL, url) self.selected_blogs["urls"].append(url) if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT: # go to next page if self.driver.find_elements( By.CSS_SELECTOR, 'button[data-testid="pagination-next-button"]' ): next_page_btn = self.driver.find_element( By.CSS_SELECTOR, 'button[data-testid="pagination-next-button"]' ) try: next_page_btn.click() time.sleep(2) self.__select_blog_in_search_page() except: import traceback print(traceback.format_exc()) else: self.selected_blogs["titles"] = self.selected_blogs["titles"][ :MAX_BLOG_LIMIT ] self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT] def __retrieve_blog(self) -> dict: blog_container = self.driver.find_element(By.ID, "main-content") raw_blog = soup( blog_container.get_attribute("innerHTML"), features="html.parser" ) blog_title = raw_blog.find( "div", attrs={"data-component": "headline-block"}, recursive=True ).text blog_time = raw_blog.find("time", recursive=True).text blog_contributor_el = raw_blog.find( "div", attrs={"data-testid": "byline-new-contributors"}, recursive=True, ) blog_contributor = "" if blog_contributor_el != None: blog_contributor = blog_contributor_el.text blog_content_blocks = raw_blog.find_all( "div", attrs={"data-component": "text-block"}, recursive=True ) blog_meta = {"time": blog_time, "author": blog_contributor} blog_content = "" for block in blog_content_blocks: blog_content += block.text return {"title": blog_title, "meta": blog_meta, "content": blog_content} def __get_and_save_blog(self, url: str): self.driver.get(url) # WebDriverWait(self.driver, 10).until(EC.title_contains("新华")) div = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.ID, "main-content")) ) time.sleep(1) blog = self.__retrieve_blog() return blog blog_title = blog.get("title", "") print(blog_title) # Remove invalid char in file_path_name on Windows invalid_chars_pattern = r'[\\/:*?"<>|]' blog_title = re.sub(invalid_chars_pattern, "", blog_title) file = open(f"./saved_articles/BBC_{blog_title}.json", "w") json.dump(blog, file) file.close() time.sleep(2) def search_and_save(self, topic: str): content_list = [] logger.warning(f"Crawler_BBCSearch start search topic {topic}") self.__search_topic(topic) time.sleep(1) logger.warning("Crawler_BBCSearch start select blog in search page") self.__select_blog_in_search_page() url_list = self.selected_blogs.get("urls", []) logger.warning(f"Crawler_BBCSearch url_list {str(url_list)}") idx = 1 for url in url_list: logger.warning(f"Crawler_BBCSearch {idx}/{len(url_list)} url:{url}") content = self.__get_and_save_blog(url) content_list.append(content) idx += 1 return content_list def direct_save(self, url: str): self.__get_and_save_blog(url) def process(self, inputData): logger.warning("Crawler_BBCSearch / inputData", inputData) keyword = inputData["keyword"] result = [] try: result = self.search_and_save(keyword) # print("result", result) except Exception as e: import traceback logger.warning(f"Crawler_BBCSearch {traceback.format_exc()}") finally: self.driver.quit() logger.warning( f"Crawler_BBCSearch process completed】, keyword={keyword}, result len={len(result)}" ) return result