from typing import List from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from app.services.report_generator import report_generator from app.services.project_service import project_service from app.services.validation import validation_service from app.services.intel_service import intel_service from app.models.schemas import ScanRequest, ScanResult, IntelSource, VulnerabilityStatus, Project, AIAgent, User, LoginRequest from app.services.agent_service import agent_service from app.services.user_service import user_service from app.core.orchestrator import orchestrator from app.config.settings import settings import uvicorn import os app = FastAPI(title=settings.APP_NAME) @app.on_event("startup") async def startup_event(): import asyncio asyncio.create_task(intel_service.start_periodic_refresh(interval_hours=12)) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request, exc): return JSONResponse( status_code=422, content={"detail": "Invalid input data. Please check your target URL format.", "errors": exc.errors()}, ) # CORS for frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount frontend app.mount("/static", StaticFiles(directory="frontend"), name="static") @app.get("/") async def read_index(): from fastapi.responses import FileResponse return FileResponse("frontend/index.html") @app.post("/api/v1/scans", response_model=dict) async def start_scan(request: ScanRequest): scan_id = await orchestrator.create_scan(str(request.target_url), request.scan_types, request.project_id) return {"scan_id": scan_id, "status": "accepted"} @app.get("/api/v1/scans/{scan_id}", response_model=ScanResult) async def get_scan_results(scan_id: str): if scan_id not in orchestrator.active_scans: raise HTTPException(status_code=404, detail="Scan not found") return orchestrator.active_scans[scan_id] @app.post("/api/v1/scans/{scan_id}/vulnerabilities/{vuln_id}/verify") async def verify_vulnerability(scan_id: str, vuln_id: str): if scan_id not in orchestrator.active_scans: raise HTTPException(status_code=404, detail="Scan not found") scan = orchestrator.active_scans[scan_id] vuln = next((v for v in scan.vulnerabilities if v.id == vuln_id), None) if not vuln: raise HTTPException(status_code=404, detail="Vulnerability not found") new_status = await validation_service.verify_vulnerability(vuln) vuln.status = new_status return {"status": "verified", "new_status": new_status} @app.get("/api/v1/scans/{scan_id}/report/pdf") async def download_report_pdf(scan_id: str): if scan_id not in orchestrator.active_scans: # Check persistent storage if not active scan = project_service.scans.get(scan_id) if not scan: raise HTTPException(status_code=404, detail="Scan not found") else: scan = orchestrator.active_scans[scan_id] pdf_buffer = report_generator.generate_scan_pdf(scan) return StreamingResponse( pdf_buffer, media_type="application/pdf", headers={"Content-Disposition": f"attachment; filename=vulnera_report_{scan_id}.pdf"} ) @app.get("/api/v1/projects", response_model=List[Project]) async def list_projects(): return project_service.get_projects() @app.post("/api/v1/projects", response_model=Project) async def create_project(project: Project): project_service.save_project(project) return project @app.get("/api/v1/scans", response_model=List[ScanResult]) async def list_scans(): # Return all historically saved scans plus any currently active in orchestrator all_scans = list(project_service.scans.values()) # Add active scans that might not be saved yet for sid, scan in orchestrator.active_scans.items(): if sid not in project_service.scans: all_scans.append(scan) return sorted(all_scans, key=lambda x: x.created_at, reverse=True) @app.get("/api/v1/intel/sources", response_model=List[IntelSource]) async def get_intel_sources(): return intel_service.get_sources() @app.post("/api/v1/intel/sources") async def add_intel_source(source: IntelSource): intel_service.add_source(source) return {"status": "added"} @app.post("/api/v1/intel/refresh") async def refresh_intel(background_tasks: BackgroundTasks): background_tasks.add_task(intel_service.refresh_intel) return {"status": "refresh_initiated"} @app.get("/api/v1/intel/knowledge") async def get_knowledge(): return intel_service.knowledge_base @app.get("/api/v1/agents", response_model=List[AIAgent]) async def list_agents(): return agent_service.get_agents() @app.post("/api/v1/agents") async def add_agent(agent: AIAgent): agent_service.add_agent(agent) return {"status": "added", "agent_id": agent.id} @app.delete("/api/v1/agents/{agent_id}") async def delete_agent(agent_id: str): agent_service.delete_agent(agent_id) return {"status": "deleted"} @app.get("/health") async def health_check(): return {"status": "healthy"} # User Management Routes @app.post("/api/v1/auth/login") async def login(request: LoginRequest): user = user_service.authenticate(request.username, request.password) if not user: raise HTTPException(status_code=401, detail="Invalid credentials") return {"status": "success", "user": {"id": user.id, "username": user.username, "role": user.role}} @app.get("/api/v1/users", response_model=List[User]) async def list_users(): return user_service.get_all_users() @app.post("/api/v1/users") async def create_user(user: User): new_user = user_service.add_user(user) if not new_user: raise HTTPException(status_code=400, detail="Could not create user (max 10 or duplicate username)") return new_user @app.delete("/api/v1/users/{user_id}") async def delete_user(user_id: str): if user_service.delete_user(user_id): return {"status": "deleted"} raise HTTPException(status_code=400, detail="Cannot delete last user") if __name__ == "__main__": uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)