项目概述
CAI(Cybersecurity AI)是一个开源的网络安全AI框架,旨在构建能够自主执行完整网络安全攻击链的智能体系统。该项目通过模块化的AI智能体架构,实现了从侦察到权限提升的全流程自动化安全测试,在CTF挑战中表现优异,平均比人类专家快11倍。
功能特性
- 多智能体架构:包含红队、蓝队、DFIR、代码执行、内存分析等专业安全智能体
- 自主攻击链执行:支持完整的网络安全攻击链自动化,包括侦察、漏洞利用、权限提升
- 基准测试体系:集成CyberMetric、SecEval、CTIBench等多个安全评估基准
- 隐私保护评估:提供CyberPII-Bench基准,专门评估LLM在安全场景中的PII处理能力
- 并行执行模式:支持多智能体并行执行和协作模式
- 记忆管理系统:基于RAG的记忆存储和检索,支持跨安全演练的知识复用
- 工具集成丰富:集成Shodan、Nmap、Burp Suite等主流安全工具
安装指南
环境要求
- Python 3.8+
- Docker(可选,用于虚拟化环境)
- 主流操作系统(Linux/macOS/Windows)
安装步骤
- 克隆项目仓库:
git clone https://github.com/aliasrobotics/CAI.git
cd CAI
- 安装依赖包:
pip install setuptools wheel twine build
- 安装核心框架:
python3 -m build
pip install -e .
- 安装基准测试依赖:
pip install cvss
git submodule update --init --recursive
- 配置环境变量(在.env文件中):
OPENAI_API_KEY="your_api_key"
ANTHROPIC_API_KEY="your_anthropic_key"
OPENROUTER_API_KEY="your_openrouter_key"
OLLAMA_API_BASE="http://localhost:11434"
使用说明
基本使用
运行基准测试评估:
python benchmarks/eval.py --model ollama/qwen2.5:14b \--dataset_file benchmarks/cybermetric/CyberMetric-80-v1.json \--eval cybermetric --backend ollama
启动CLI交互界面:
python -m cai.cli
智能体使用示例
红队智能体执行渗透测试:
from cai.agents.red_teamer import redteam_agent# 执行侦察任务
result = redteam_agent.run("对目标网络进行端口扫描和服务识别")
蓝队智能体进行防御分析:
from cai.agents.blue_team import blueteam_agent# 执行安全监控
result = blueteam_agent.run("检查系统日志中的异常登录行为")
并行执行模式
配置并行智能体执行:
from cai.repl.commands.parallel import ParallelConfig# 配置红蓝队并行执行
parallel_configs = [ParallelConfig("redteam_agent"),ParallelConfig("blueteam_agent")
]
核心代码
红队智能体实现
"""Red Team Base Agent"""
import os
from dotenv import load_dotenv
from cai.sdk.agents import Agent, OpenAIChatCompletionsModel
from openai import AsyncOpenAI
from cai.util import load_prompt_template, create_system_prompt_renderer
from cai.tools.command_and_control.sshpass import run_ssh_command_with_credentials
from cai.tools.reconnaissance.generic_linux_command import generic_linux_command
from cai.tools.web.search_web import make_google_search
from cai.tools.reconnaissance.exec_code import execute_code
from cai.tools.reconnaissance.shodan import shodan_search, shodan_host_infoload_dotenv()# 加载系统提示词
bug_bounter_system_prompt = load_prompt_template("prompts/system_bug_bounter.md")# 定义工具列表
tools = [generic_linux_command,execute_code,shodan_search,shodan_host_info
]if os.getenv('GOOGLE_SEARCH_API_KEY') and os.getenv('GOOGLE_SEARCH_CX'):tools.append(make_google_search)# 创建红队智能体实例
bug_bounter_agent = Agent(name="Bug Bounter",instructions=create_system_prompt_renderer(bug_bounter_system_prompt),description="""Agent that specializes in bug bounty hunting and vulnerability discovery.Expert in web security, API testing, and responsible disclosure.""",tools=tools,model=OpenAIChatCompletionsModel(model=os.getenv('CAI_MODEL', "alias0"),openai_client=AsyncOpenAI(),)
)
代码执行智能体
"""A Coding Agent (CodeAgent)"""
import copy
import platform
import re
import signal
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, Unionfrom cai.agents.meta.local_python_executor import LocalPythonInterpreter
from cai.sdk.agents import Agent, Result, OpenAIChatCompletionsModel
from dotenv import load_dotenv
from openai import AsyncOpenAIclass CodeAgentException(Exception):"""Base exception class for CodeAgent-related errors."""passclass CodeAgent(Agent):"""CodeAgent that uses executable Python code to consolidate LLM agents' actions."""def __init__(self, model_name: str = "alias0"):# 初始化Python解释器self.interpreter = LocalPythonInterpreter()# 初始化父类super().__init__(name="CodeAgent",description="Agent that uses executable Python code for security testing",instructions="""You are a CodeAgent specialized in cybersecurity tasks.Use Python code to analyze, exploit, and assess security vulnerabilities.""",model=OpenAIChatCompletionsModel(model=model_name,openai_client=AsyncOpenAI(),),tools=[self.execute_python_code])def execute_python_code(self, code: str) -> str:"""Execute Python code in a secure environment."""try:result = self.interpreter.execute(code)return str(result)except Exception as e:return f"Execution error: {str(e)}"
基准测试评估系统
"""Benchmark Evaluation Script"""
import argparse
import json
from typing import Dict, Listdef evaluate_model(model_name: str, dataset_file: str, eval_type: str, backend: str):"""评估语言模型在网络安全相关多选题基准上的表现Args:model_name: 模型名称(如 "gpt-4", "qwen2.5:14b")dataset_file: 数据集文件路径(JSON或TSV格式)eval_type: 评估类型("cybermetric", "seceval", "cti_bench")backend: 后端类型("openai", "openrouter", "ollama", "anthropic")"""# 加载数据集if dataset_file.endswith('.json'):with open(dataset_file, 'r') as f:dataset = json.load(f)else:dataset = load_tsv_dataset(dataset_file)# 初始化评估器evaluator = get_evaluator(eval_type)# 运行评估results = []for item in dataset:response = query_model(model_name, item['question'], backend)score = evaluator.evaluate(response, item['answer'])results.append(score)# 计算总体得分final_score = sum(results) / len(results)return final_scoredef main():parser = argparse.ArgumentParser(description='网络安全AI基准评估')parser.add_argument('-m', '--model', required=True, help='要评估的模型名称')parser.add_argument('-d', '--dataset_file', required=True, help='数据集文件路径')parser.add_argument('-e', '--eval', required=True, help='评估基准类型')parser.add_argument('-B', '--backend', required=True, help='后端类型')args = parser.parse_args()score = evaluate_model(args.model, args.dataset_file, args.eval, args.backend)print(f"模型 {args.model} 在 {args.eval} 基准上的得分: {score:.2%}")
记忆管理系统
"""Memory agent for CAI with RAG support"""
import os
from typing import Dict, List
from cai.sdk.agents import Agent
from cai.rag.vector_db import VectorDatabaseclass MemoryAgent(Agent):"""记忆管理智能体,支持长期经验存储和检索"""def __init__(self):# 初始化向量数据库self.vector_db = VectorDatabase()super().__init__(name="Memory Agent",description="Manages long-term memory experiences across security exercises",instructions="""You manage episodic and semantic memory for security exercises.Store experiences in vector database and retrieve using RAG pipeline.""",tools=[self.store_memory, self.retrieve_memory])def store_memory(self, experience: str, exercise_type: str) -> str:"""存储安全演练经验到记忆库"""embedding = self.vector_db.embed_text(experience)memory_id = self.vector_db.store(embedding, experience, exercise_type)return f"Memory stored with ID: {memory_id}"def retrieve_memory(self, query: str, exercise_type: str = None) -> List[Dict]:"""从记忆库检索相关经验"""query_embedding = self.vector_db.embed_text(query)results = self.vector_db.search(query_embedding, exercise_type)return results
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
公众号二维码