import httpx import re from bs4 import BeautifulSoup from typing import List, Dict from app.models.schemas import IntelSource from loguru import logger class IntelScraper: """ Engine responsible for scraping vulnerability intelligence from public sources. """ def __init__(self): self.client = httpx.AsyncClient(timeout=10.0, follow_redirects=True) # Regex to find CVE patterns (e.g. CVE-2023-1234) self.cve_pattern = re.compile(r"CVE-\d{4}-\d{4,7}") async def scrape_source(self, source: IntelSource) -> List[Dict[str, str]]: """ Fetches a source and extracts potential vulnerability data. """ logger.info(f"Scraping intelligence from: {source.name} ({source.url})") try: resp = await self.client.get(str(source.url)) if resp.status_code != 200: logger.warning(f"Failed to fetch {source.url}: Status {resp.status_code}") return [] content = resp.text soup = BeautifulSoup(content, "html.parser") # Extract CVEs found in text cves = self.cve_pattern.findall(content) unique_cves = list(set(cves)) logger.info(f"Found {len(unique_cves)} CVEs in {source.name}") # Simplified: just returning the CVE codes for now return [{"cve": cve, "source": source.name} for cve in unique_cves] except Exception as e: logger.error(f"Error scraping {source.url}: {str(e)}") return [] async def close(self): await self.client.aclose() intel_scraper = IntelScraper()