import asyncio import uuid from typing import Dict, List, Any from app.models.schemas import ScanTask, ScanStatus, ScanResult, Vulnerability from app.services.project_service import project_service from app.services.tool_service import tool_service from loguru import logger class AgenticOrchestrator: """ Manages the pentest workflow by breaking it down into tasks. Inspired by Penligent's agentic task tree. """ def __init__(self): self.active_scans: Dict[str, ScanResult] = {} self.task_queues: Dict[str, List[ScanTask]] = {} async def create_scan(self, target_url: str, scan_types: List[str], project_id: str = None) -> str: scan_id = str(uuid.uuid4()) self.active_scans[scan_id] = ScanResult(scan_id=scan_id, project_id=project_id, target=target_url, status=ScanStatus.PENDING) # Build task tree tasks = [] if "recon" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="Reconnaissance", status=ScanStatus.PENDING)) if "injection" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="Injection Analysis (SQL/NoSQL)", status=ScanStatus.PENDING)) if "xss" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="Cross-Site Scripting (XSS)", status=ScanStatus.PENDING)) if "auth" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="Auth & Authorization", status=ScanStatus.PENDING)) if "ssrf" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="SSRF & Internal Probing", status=ScanStatus.PENDING)) if "ssl" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="SSL/TLS Configuration", status=ScanStatus.PENDING)) if "cloud" in scan_types: tasks.append(ScanTask(id=str(uuid.uuid4()), name="Cloud & Bucket Exposure", status=ScanStatus.PENDING)) self.task_queues[scan_id] = tasks # Start background execution asyncio.create_task(self.run_scan(scan_id)) return scan_id async def run_scan(self, scan_id: str): result = self.active_scans[scan_id] result.status = ScanStatus.RUNNING tasks = self.task_queues[scan_id] logger.info(f"Starting agentic scan {scan_id} for {result.target}") for task in tasks: task.status = ScanStatus.RUNNING logger.info(f"Executing task: {task.name}") task.logs.append(f"Initiating {task.name}...") # Execute real tools based on task name if "Reconnaissance" in task.name: output = await tool_service.run_nmap(result.target) task.logs.append(output) elif "Injection" in task.name: output = await tool_service.run_sqlmap(result.target) task.logs.append(output) else: await asyncio.sleep(2) # Simulate others # Mocking finding a vulnerability for demonstration if "Reconnaissance" in task.name: result.vulnerabilities.append(Vulnerability( id=str(uuid.uuid4()), type="Information Disclosure", severity="info", description="Server header reveals technical stack: Nginx/1.18.0", remediation="Disable server tokens in nginx.conf" )) task.status = ScanStatus.COMPLETED task.progress = 100 result.status = ScanStatus.COMPLETED logger.info(f"Scan {scan_id} completed.") project_service.save_scan(result) orchestrator = AgenticOrchestrator()