diff --git a/engines/crawler_bbc_search.py b/engines/crawler_bbc_search.py index e69de29..9a15500 100644 --- a/engines/crawler_bbc_search.py +++ b/engines/crawler_bbc_search.py @@ -0,0 +1,175 @@ +from bs4 import BeautifulSoup as soup +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +import urllib.parse +import json +import time +import re + +BASE_URL = "https://www.bbc.com" +TARGET_URL = urllib.parse.urljoin(BASE_URL, "/news") + +# The maximum number of blog posts to crawl +MAX_BLOG_LIMIT = 15 + + +class Crawler_BBC: + def __init__(self, driver: webdriver) -> None: + self.driver = driver + self.selected_blogs = {"titles": [], "urls": []} + + def __search_topic(self, topic: str): + self.driver.get(BASE_URL) + WebDriverWait(self.driver, 10).until(EC.title_contains("BBC")) + + # open up side bar to show search bar + self.driver.find_element(By.CLASS_NAME, "sc-8a068d35-3.kvafkS").click() + + # input topic to be searched in search bar + search_bar = self.driver.find_element(By.CLASS_NAME, "sc-e1a87ea7-1.iARAvt") + search_bar.send_keys(topic) + + # click search button + search_submit_button = self.driver.find_element( + By.CLASS_NAME, "sc-f6c53a81-2.sc-f6c53a81-3.dyeOnJ.dQfGZm" + ) + search_submit_button.click() + + # wait for the page to load + WebDriverWait(self.driver, 10).until(EC.title_contains("BBC")) + + def __select_blog_in_search_page(self): + raw_blogs = self.driver.find_element(By.ID, "main-content").get_attribute( + "innerHTML" + ) + # to prevent dynamic class value of different articles generated by backend, use bs4 to trace tag + raw_blogs = soup(raw_blogs, features="html.parser") + raw_blogs = raw_blogs.findAll( + "div", attrs={"data-testid": "liverpool-card"}, recursive=True + ) + + if not raw_blogs: + return + + for raw_blog in raw_blogs: + title = ( + raw_blog.find( + "h2", attrs={"data-testid": "card-headline"}, recursive=True + ) + .text.replace("\n", "") + .lower() + ) + # prevent crawl duplicate blog from different source + if not title in self.selected_blogs.get("titles"): + # skip blogs that are not news + try: + url = raw_blog.find( + "a", attrs={"data-testid": "internal-link"}, recursive=True + )["href"] + except Exception: + continue + # skip blogs that are not news + if not "news" in url: + continue + self.selected_blogs["titles"].append(title) + # bbc's href links only contains path + if not urllib.parse.urlparse(url).netloc: + url = urllib.parse.urljoin(BASE_URL, url) + self.selected_blogs["urls"].append(url) + + if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT: + # go to next page + next_page_btn = self.driver.find_element( + By.CSS_SELECTOR, 'button[data-testid="pagination-next-button"]' + ) + try: + next_page_btn.click() + time.sleep(2) + self.__select_blog_in_search_page() + except: + import traceback + + print(traceback.format_exc()) + else: + self.selected_blogs["titles"] = self.selected_blogs["titles"][ + :MAX_BLOG_LIMIT + ] + self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT] + + def __retrieve_blog(self) -> dict: + blog_container = self.driver.find_element(By.ID, "main-content") + raw_blog = soup( + blog_container.get_attribute("innerHTML"), features="html.parser" + ) + + blog_title = raw_blog.find( + "div", attrs={"data-component": "headline-block"}, recursive=True + ).text + blog_time = raw_blog.find("time", recursive=True).text + blog_contributor = raw_blog.find( + "div", attrs={"data-testid": "byline-new-contributors"}, recursive=True + ).text + blog_meta = {"time": blog_time, "author": blog_contributor} + blog_content_blocks = raw_blog.find_all( + "div", attrs={"data-component": "text-block"}, recursive=True + ) + + blog_content = "" + + for block in blog_content_blocks: + blog_content += block.text + + return {"title": blog_title, "meta": blog_meta, "content": blog_content} + + def __get_and_save_blog(self, url: str): + self.driver.get(url) + time.sleep(3) + + blog = self.__retrieve_blog() + + blog_title = blog.get("title", "") + print(blog_title) + + # Remove invalid char in file_path_name on Windows + invalid_chars_pattern = r'[\\/:*?"<>|]' + blog_title = re.sub(invalid_chars_pattern, "", blog_title) + + file = open(f"./saved_articles/BBC_{blog_title}.json", "w") + json.dump(blog, file) + file.close() + time.sleep(2) + + def search_and_save(self, topic: str): + self.__search_topic(topic) + self.__select_blog_in_search_page() + url_list = self.selected_blogs.get("urls", []) + for url in url_list: + self.__get_and_save_blog(url) + + def direct_save(self, url: str): + self.__get_and_save_blog(url) + + def test(self): + try: + self.search_and_save("US election") + # self.direct_save("https://www.bbc.com/news/articles/c2edewgv2kpo") + except Exception as e: + import traceback + + print(traceback.format_exc()) + finally: + self.driver.quit() + + +if __name__ == "__main__": + from selenium.webdriver.chrome.options import Options + + chrome_options = Options() + chrome_options.add_argument("--headless") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + driver = webdriver.Chrome(options=chrome_options) + crawler = Crawler_BBC(driver) + crawler.test() diff --git a/engines/crawler_newscn_search.py b/engines/crawler_newscn_search.py new file mode 100644 index 0000000..13a69d0 --- /dev/null +++ b/engines/crawler_newscn_search.py @@ -0,0 +1,157 @@ +from bs4 import BeautifulSoup as soup +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.wait import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +import urllib.parse +import json +import time +import re + +BASE_URL = "https://english.news.cn" +XINHUA_OVERSEAS_REGIONS = ["asiapacific", "europe", "africa", "northamerica"] + +# The maximum number of blog posts to crawl +MAX_BLOG_LIMIT = 15 + + +class Crawler_NewsCN: + def __init__(self, driver: webdriver) -> None: + self.driver = driver + self.selected_blogs = {"titles": [], "urls": []} + + def __search_topic(self, topic: str): + self.driver.get(BASE_URL) + WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua")) + + # input topic to be searched in search bar + search_bar = self.driver.find_element(By.CLASS_NAME, "search-input") + search_bar.send_keys(topic) + + # click search button + search_submit_button = self.driver.find_element(By.ID, "searchSubmit") + search_submit_button.click() + + # close home window and switch to new window + self.driver.close() + self.driver.switch_to.window(self.driver.window_handles[0]) + WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua research")) + + def __select_blog_in_search_page(self): + raw_blogs = self.driver.find_element(By.CLASS_NAME, "content").get_attribute( + "innerHTML" + ) + raw_blogs = soup(raw_blogs, features="html.parser") + raw_blogs = raw_blogs.findAll("div", attrs={"class": "item"}, recursive=True) + + if not raw_blogs: + return + + for raw_blog in raw_blogs: + title = ( + raw_blog.find("div", class_="title") + .text.replace("\n", "") + .replace(" ", "") + .lower() + ) + # prevent crawl duplicate blog from different source + if not title in self.selected_blogs.get("titles"): + self.selected_blogs["titles"].append(title) + self.selected_blogs["urls"].append(raw_blog.find("a")["href"]) + + if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT: + # go to next page + next_page_btn = self.driver.find_element( + By.CLASS_NAME, "ant-pagination-next" + ) + if next_page_btn.get_attribute("aria-disabled") != "true": + try: + next_page_btn.click() + time.sleep(2) + self.__select_blog_in_search_page() + except: + import traceback + + print(traceback.format_exc()) + else: + self.selected_blogs["titles"] = self.selected_blogs["titles"][ + :MAX_BLOG_LIMIT + ] + self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT] + + # print(self.selected_blogs["urls"]) + + def __retrieve_overseas_blog(self) -> dict: + blog_container = self.driver.find_element(By.CLASS_NAME, "main.clearfix") + blog_title = blog_container.find_element(By.CLASS_NAME, "Btitle").text + blog_meta = blog_container.find_element(By.CLASS_NAME, "wzzy").text + blog_content = blog_container.find_element(By.ID, "detailContent").text + + return {"title": blog_title, "meta": blog_meta, "content": blog_content} + + def __retrieve_china_blog(self) -> dict: + blog_container = self.driver.find_element(By.CLASS_NAME, "conBox") + blog_title_meta_container = blog_container.find_element(By.CLASS_NAME, "conTop") + blog_title = blog_title_meta_container.find_element(By.TAG_NAME, "h1").text + blog_meta = blog_title_meta_container.find_element( + By.CLASS_NAME, "infoBox.clearfix" + ).text + blog_content_container = blog_container.find_element(By.CLASS_NAME, "conLeft") + blog_content = blog_content_container.find_element(By.ID, "detailContent").text + + return {"title": blog_title, "meta": blog_meta, "content": blog_content} + + def __get_and_save_blog(self, url: str): + self.driver.get(url) + WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua")) + region_code = urllib.parse.urlparse(url).path.split("/")[1] + if region_code in XINHUA_OVERSEAS_REGIONS: + blog = self.__retrieve_overseas_blog() + else: + blog = self.__retrieve_china_blog() + blog_title = blog.get("title", "") + print(blog_title) + + # Remove invalid char in file_path_name on Windows + invalid_chars_pattern = r'[\\/:*?"<>|]' + blog_title = re.sub(invalid_chars_pattern, "", blog_title) + + file = open(f"./saved_articles/Xinhua_{blog_title}.json", "w") + json.dump(blog, file) + file.close() + time.sleep(2) + + def search_and_save(self, topic: str): + self.__search_topic(topic) + self.__select_blog_in_search_page() + print(self.selected_blogs) + url_list = self.selected_blogs.get("urls", []) + for url in url_list: + self.__get_and_save_blog(url) + + def direct_save(self, url: str): + self.__get_and_save_blog(url) + + def test(self): + try: + # self.search_and_save("china") + self.search_and_save("xi jinping") + # self.direct_save("") + except Exception as e: + import traceback + + print(traceback.format_exc()) + finally: + self.driver.quit() + + +if __name__ == "__main__": + from selenium.webdriver.chrome.options import Options + + chrome_options = Options() + chrome_options.add_argument("--headless") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + driver = webdriver.Chrome(options=chrome_options) + crawler = Crawler_NewsCN(driver) + crawler.test() diff --git a/main.py b/main.py index 069f750..0b59654 100644 --- a/main.py +++ b/main.py @@ -18,6 +18,8 @@ class Main(QueueProcessor): # keyword = inputData["keyword"] # print("keyword:", keyword) match currentEngineId: + case 3000: # crawler_bbc_search + return CrawlerGoogleSearch.process(inputData) case 9000: # crawler_bbc_search return CrawlerGoogleSearch.process(inputData) case 10000: # crawler_bbc_search