forked from iCON/CrawlerEngines
dev
parent
e842ac9ed7
commit
576340ef20
|
|
@ -0,0 +1,175 @@
|
|||
from bs4 import BeautifulSoup as soup
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
import urllib.parse
|
||||
import json
|
||||
import time
|
||||
import re
|
||||
|
||||
BASE_URL = "https://www.bbc.com"
|
||||
TARGET_URL = urllib.parse.urljoin(BASE_URL, "/news")
|
||||
|
||||
# The maximum number of blog posts to crawl
|
||||
MAX_BLOG_LIMIT = 15
|
||||
|
||||
|
||||
class Crawler_BBC:
|
||||
def __init__(self, driver: webdriver) -> None:
|
||||
self.driver = driver
|
||||
self.selected_blogs = {"titles": [], "urls": []}
|
||||
|
||||
def __search_topic(self, topic: str):
|
||||
self.driver.get(BASE_URL)
|
||||
WebDriverWait(self.driver, 10).until(EC.title_contains("BBC"))
|
||||
|
||||
# open up side bar to show search bar
|
||||
self.driver.find_element(By.CLASS_NAME, "sc-8a068d35-3.kvafkS").click()
|
||||
|
||||
# input topic to be searched in search bar
|
||||
search_bar = self.driver.find_element(By.CLASS_NAME, "sc-e1a87ea7-1.iARAvt")
|
||||
search_bar.send_keys(topic)
|
||||
|
||||
# click search button
|
||||
search_submit_button = self.driver.find_element(
|
||||
By.CLASS_NAME, "sc-f6c53a81-2.sc-f6c53a81-3.dyeOnJ.dQfGZm"
|
||||
)
|
||||
search_submit_button.click()
|
||||
|
||||
# wait for the page to load
|
||||
WebDriverWait(self.driver, 10).until(EC.title_contains("BBC"))
|
||||
|
||||
def __select_blog_in_search_page(self):
|
||||
raw_blogs = self.driver.find_element(By.ID, "main-content").get_attribute(
|
||||
"innerHTML"
|
||||
)
|
||||
# to prevent dynamic class value of different articles generated by backend, use bs4 to trace tag
|
||||
raw_blogs = soup(raw_blogs, features="html.parser")
|
||||
raw_blogs = raw_blogs.findAll(
|
||||
"div", attrs={"data-testid": "liverpool-card"}, recursive=True
|
||||
)
|
||||
|
||||
if not raw_blogs:
|
||||
return
|
||||
|
||||
for raw_blog in raw_blogs:
|
||||
title = (
|
||||
raw_blog.find(
|
||||
"h2", attrs={"data-testid": "card-headline"}, recursive=True
|
||||
)
|
||||
.text.replace("\n", "")
|
||||
.lower()
|
||||
)
|
||||
# prevent crawl duplicate blog from different source
|
||||
if not title in self.selected_blogs.get("titles"):
|
||||
# skip blogs that are not news
|
||||
try:
|
||||
url = raw_blog.find(
|
||||
"a", attrs={"data-testid": "internal-link"}, recursive=True
|
||||
)["href"]
|
||||
except Exception:
|
||||
continue
|
||||
# skip blogs that are not news
|
||||
if not "news" in url:
|
||||
continue
|
||||
self.selected_blogs["titles"].append(title)
|
||||
# bbc's href links only contains path
|
||||
if not urllib.parse.urlparse(url).netloc:
|
||||
url = urllib.parse.urljoin(BASE_URL, url)
|
||||
self.selected_blogs["urls"].append(url)
|
||||
|
||||
if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT:
|
||||
# go to next page
|
||||
next_page_btn = self.driver.find_element(
|
||||
By.CSS_SELECTOR, 'button[data-testid="pagination-next-button"]'
|
||||
)
|
||||
try:
|
||||
next_page_btn.click()
|
||||
time.sleep(2)
|
||||
self.__select_blog_in_search_page()
|
||||
except:
|
||||
import traceback
|
||||
|
||||
print(traceback.format_exc())
|
||||
else:
|
||||
self.selected_blogs["titles"] = self.selected_blogs["titles"][
|
||||
:MAX_BLOG_LIMIT
|
||||
]
|
||||
self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT]
|
||||
|
||||
def __retrieve_blog(self) -> dict:
|
||||
blog_container = self.driver.find_element(By.ID, "main-content")
|
||||
raw_blog = soup(
|
||||
blog_container.get_attribute("innerHTML"), features="html.parser"
|
||||
)
|
||||
|
||||
blog_title = raw_blog.find(
|
||||
"div", attrs={"data-component": "headline-block"}, recursive=True
|
||||
).text
|
||||
blog_time = raw_blog.find("time", recursive=True).text
|
||||
blog_contributor = raw_blog.find(
|
||||
"div", attrs={"data-testid": "byline-new-contributors"}, recursive=True
|
||||
).text
|
||||
blog_meta = {"time": blog_time, "author": blog_contributor}
|
||||
blog_content_blocks = raw_blog.find_all(
|
||||
"div", attrs={"data-component": "text-block"}, recursive=True
|
||||
)
|
||||
|
||||
blog_content = ""
|
||||
|
||||
for block in blog_content_blocks:
|
||||
blog_content += block.text
|
||||
|
||||
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
|
||||
|
||||
def __get_and_save_blog(self, url: str):
|
||||
self.driver.get(url)
|
||||
time.sleep(3)
|
||||
|
||||
blog = self.__retrieve_blog()
|
||||
|
||||
blog_title = blog.get("title", "")
|
||||
print(blog_title)
|
||||
|
||||
# Remove invalid char in file_path_name on Windows
|
||||
invalid_chars_pattern = r'[\\/:*?"<>|]'
|
||||
blog_title = re.sub(invalid_chars_pattern, "", blog_title)
|
||||
|
||||
file = open(f"./saved_articles/BBC_{blog_title}.json", "w")
|
||||
json.dump(blog, file)
|
||||
file.close()
|
||||
time.sleep(2)
|
||||
|
||||
def search_and_save(self, topic: str):
|
||||
self.__search_topic(topic)
|
||||
self.__select_blog_in_search_page()
|
||||
url_list = self.selected_blogs.get("urls", [])
|
||||
for url in url_list:
|
||||
self.__get_and_save_blog(url)
|
||||
|
||||
def direct_save(self, url: str):
|
||||
self.__get_and_save_blog(url)
|
||||
|
||||
def test(self):
|
||||
try:
|
||||
self.search_and_save("US election")
|
||||
# self.direct_save("https://www.bbc.com/news/articles/c2edewgv2kpo")
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
self.driver.quit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
crawler = Crawler_BBC(driver)
|
||||
crawler.test()
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
from bs4 import BeautifulSoup as soup
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
import urllib.parse
|
||||
import json
|
||||
import time
|
||||
import re
|
||||
|
||||
BASE_URL = "https://english.news.cn"
|
||||
XINHUA_OVERSEAS_REGIONS = ["asiapacific", "europe", "africa", "northamerica"]
|
||||
|
||||
# The maximum number of blog posts to crawl
|
||||
MAX_BLOG_LIMIT = 15
|
||||
|
||||
|
||||
class Crawler_NewsCN:
|
||||
def __init__(self, driver: webdriver) -> None:
|
||||
self.driver = driver
|
||||
self.selected_blogs = {"titles": [], "urls": []}
|
||||
|
||||
def __search_topic(self, topic: str):
|
||||
self.driver.get(BASE_URL)
|
||||
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua"))
|
||||
|
||||
# input topic to be searched in search bar
|
||||
search_bar = self.driver.find_element(By.CLASS_NAME, "search-input")
|
||||
search_bar.send_keys(topic)
|
||||
|
||||
# click search button
|
||||
search_submit_button = self.driver.find_element(By.ID, "searchSubmit")
|
||||
search_submit_button.click()
|
||||
|
||||
# close home window and switch to new window
|
||||
self.driver.close()
|
||||
self.driver.switch_to.window(self.driver.window_handles[0])
|
||||
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua research"))
|
||||
|
||||
def __select_blog_in_search_page(self):
|
||||
raw_blogs = self.driver.find_element(By.CLASS_NAME, "content").get_attribute(
|
||||
"innerHTML"
|
||||
)
|
||||
raw_blogs = soup(raw_blogs, features="html.parser")
|
||||
raw_blogs = raw_blogs.findAll("div", attrs={"class": "item"}, recursive=True)
|
||||
|
||||
if not raw_blogs:
|
||||
return
|
||||
|
||||
for raw_blog in raw_blogs:
|
||||
title = (
|
||||
raw_blog.find("div", class_="title")
|
||||
.text.replace("\n", "")
|
||||
.replace(" ", "")
|
||||
.lower()
|
||||
)
|
||||
# prevent crawl duplicate blog from different source
|
||||
if not title in self.selected_blogs.get("titles"):
|
||||
self.selected_blogs["titles"].append(title)
|
||||
self.selected_blogs["urls"].append(raw_blog.find("a")["href"])
|
||||
|
||||
if len(self.selected_blogs["urls"]) < MAX_BLOG_LIMIT:
|
||||
# go to next page
|
||||
next_page_btn = self.driver.find_element(
|
||||
By.CLASS_NAME, "ant-pagination-next"
|
||||
)
|
||||
if next_page_btn.get_attribute("aria-disabled") != "true":
|
||||
try:
|
||||
next_page_btn.click()
|
||||
time.sleep(2)
|
||||
self.__select_blog_in_search_page()
|
||||
except:
|
||||
import traceback
|
||||
|
||||
print(traceback.format_exc())
|
||||
else:
|
||||
self.selected_blogs["titles"] = self.selected_blogs["titles"][
|
||||
:MAX_BLOG_LIMIT
|
||||
]
|
||||
self.selected_blogs["urls"] = self.selected_blogs["urls"][:MAX_BLOG_LIMIT]
|
||||
|
||||
# print(self.selected_blogs["urls"])
|
||||
|
||||
def __retrieve_overseas_blog(self) -> dict:
|
||||
blog_container = self.driver.find_element(By.CLASS_NAME, "main.clearfix")
|
||||
blog_title = blog_container.find_element(By.CLASS_NAME, "Btitle").text
|
||||
blog_meta = blog_container.find_element(By.CLASS_NAME, "wzzy").text
|
||||
blog_content = blog_container.find_element(By.ID, "detailContent").text
|
||||
|
||||
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
|
||||
|
||||
def __retrieve_china_blog(self) -> dict:
|
||||
blog_container = self.driver.find_element(By.CLASS_NAME, "conBox")
|
||||
blog_title_meta_container = blog_container.find_element(By.CLASS_NAME, "conTop")
|
||||
blog_title = blog_title_meta_container.find_element(By.TAG_NAME, "h1").text
|
||||
blog_meta = blog_title_meta_container.find_element(
|
||||
By.CLASS_NAME, "infoBox.clearfix"
|
||||
).text
|
||||
blog_content_container = blog_container.find_element(By.CLASS_NAME, "conLeft")
|
||||
blog_content = blog_content_container.find_element(By.ID, "detailContent").text
|
||||
|
||||
return {"title": blog_title, "meta": blog_meta, "content": blog_content}
|
||||
|
||||
def __get_and_save_blog(self, url: str):
|
||||
self.driver.get(url)
|
||||
WebDriverWait(self.driver, 10).until(EC.title_contains("Xinhua"))
|
||||
region_code = urllib.parse.urlparse(url).path.split("/")[1]
|
||||
if region_code in XINHUA_OVERSEAS_REGIONS:
|
||||
blog = self.__retrieve_overseas_blog()
|
||||
else:
|
||||
blog = self.__retrieve_china_blog()
|
||||
blog_title = blog.get("title", "")
|
||||
print(blog_title)
|
||||
|
||||
# Remove invalid char in file_path_name on Windows
|
||||
invalid_chars_pattern = r'[\\/:*?"<>|]'
|
||||
blog_title = re.sub(invalid_chars_pattern, "", blog_title)
|
||||
|
||||
file = open(f"./saved_articles/Xinhua_{blog_title}.json", "w")
|
||||
json.dump(blog, file)
|
||||
file.close()
|
||||
time.sleep(2)
|
||||
|
||||
def search_and_save(self, topic: str):
|
||||
self.__search_topic(topic)
|
||||
self.__select_blog_in_search_page()
|
||||
print(self.selected_blogs)
|
||||
url_list = self.selected_blogs.get("urls", [])
|
||||
for url in url_list:
|
||||
self.__get_and_save_blog(url)
|
||||
|
||||
def direct_save(self, url: str):
|
||||
self.__get_and_save_blog(url)
|
||||
|
||||
def test(self):
|
||||
try:
|
||||
# self.search_and_save("china")
|
||||
self.search_and_save("xi jinping")
|
||||
# self.direct_save("<an url>")
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
self.driver.quit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
crawler = Crawler_NewsCN(driver)
|
||||
crawler.test()
|
||||
2
main.py
2
main.py
|
|
@ -18,6 +18,8 @@ class Main(QueueProcessor):
|
|||
# keyword = inputData["keyword"]
|
||||
# print("keyword:", keyword)
|
||||
match currentEngineId:
|
||||
case 3000: # crawler_bbc_search
|
||||
return CrawlerGoogleSearch.process(inputData)
|
||||
case 9000: # crawler_bbc_search
|
||||
return CrawlerGoogleSearch.process(inputData)
|
||||
case 10000: # crawler_bbc_search
|
||||
|
|
|
|||
Loading…
Reference in New Issue