import asyncio from app.services.project_service import project_service from app.services.scraper import intel_scraper from app.models.schemas import IntelSource, VulnerabilityIntel import datetime from loguru import logger from typing import List, Dict class IntelService: def __init__(self): # Extended default sources as requested by user default_sources = [ IntelSource(name="NVD CVE Feed", url="https://nvd.nist.gov/vuln/data-feeds"), IntelSource(name="GitHub Advisory", url="https://github.com/advisories"), IntelSource(name="Ubuntu Security", url="https://ubuntu.com/security/cve.json"), IntelSource(name="Debian Security", url="https://security-tracker.debian.org/tracker/data/json"), IntelSource(name="Microsoft Security Updates", url="https://api.msrc.microsoft.com/cvrf/v2.0/updates"), IntelSource(name="Amazon Linux Security", url="https://alas.aws.amazon.com/alas2.rss"), IntelSource(name="Oracle Security Alerts", url="https://www.oracle.com/security-alerts/"), IntelSource(name="Cisco Security Advisories", url="https://tools.cisco.com/security/center/publicationListing.x"), IntelSource(name="Snyk Vulnerability DB", url="https://snyk.io/vuln/"), IntelSource(name="Vultotal (Community)", url="https://vultotal.com/api/v1/cve"), IntelSource(name="VMware Security Advisories", url="https://www.vmware.com/security/advisories.xml"), IntelSource(name="Apache Security Reports", url="https://archive.apache.org/dist/httpd/CHANGES_2.4"), IntelSource(name="Python Advisory Database", url="https://github.com/pypa/advisory-database"), IntelSource(name="Rust Security Advisory", url="https://rustsec.org/advisories/"), IntelSource(name="WPScan (WordPress)", url="https://wpscan.com/api/v3/"), IntelSource(name="IBM PSIRT", url="https://www.ibm.com/blogs/psirt/"), IntelSource(name="HackerOne Hacktivity", url="https://hackerone.com/hacktivity.rss"), IntelSource(name="Full Disclosure Mailing List", url="https://seclists.org/fulldisclosure/"), IntelSource(name="JPCERT (Japón)", url="https://www.jpcert.or.jp/english/at/"), IntelSource(name="CERT-FR (Francia)", url="https://www.cert.ssi.gouv.fr/feed/"), IntelSource(name="INCIBE (España)", url="https://www.incibe.es/rss/avisos-seguridad"), IntelSource(name="Tenable Research", url="https://www.tenable.com/plugins/search?q=cve") ] # Get existing sources from persistence persisted_sources = project_service.intel_sources if not persisted_sources: # First time initialization for s in default_sources: project_service.save_intel_source(s) else: # Migration/Sync: Add missing default sources existing_urls = {s.url for s in persisted_sources} for ds in default_sources: if ds.url not in existing_urls: project_service.save_intel_source(ds) # Re-fetch from project_service to ensure we have the synced list self.intel = VulnerabilityIntel(sources=project_service.intel_sources) self.knowledge_base: List[Dict] = [] async def start_periodic_refresh(self, interval_hours: int = 6): """ Background task to periodically refresh intelligence. """ logger.info(f"Starting periodic intel refresh every {interval_hours} hours") while True: await self.refresh_intel() await asyncio.sleep(interval_hours * 3600) async def refresh_intel(self): """ Trigger a scrape of all active intelligence sources. """ logger.info("Refreshing vulnerability intelligence knowledge base...") all_findings = [] for source in self.intel.sources: if source.active: try: findings = await intel_scraper.scrape_source(source) all_findings.extend(findings) except Exception as e: logger.error(f"Failed to scrape source {source.name}: {str(e)}") self.knowledge_base = all_findings self.intel.last_update = datetime.datetime.now() logger.success(f"Knowledge base updated. Total intelligence points: {len(self.knowledge_base)}") def add_source(self, source: IntelSource): project_service.save_intel_source(source) self.intel.last_update = datetime.datetime.now() def get_sources(self) -> List[IntelSource]: return project_service.intel_sources intel_service = IntelService()