SecGPT-14B实战教程用LangChainSecGPT-14B构建企业专属网络安全知识代理1. 引言当网络安全遇上大模型想象一下这个场景凌晨三点你的手机突然收到告警一个可疑的IP正在尝试暴力破解公司服务器的登录密码。你睡眼惺忪地打开电脑需要快速判断这是普通扫描还是定向攻击然后给出处置建议。这时候如果有一个24小时在线的“网络安全专家”能立刻帮你分析是不是能让你多睡两小时这就是我们今天要聊的SecGPT-14B。它不是一个普通的聊天机器人而是一个专门为网络安全领域训练的大语言模型。简单来说它就像一个懂网络安全的“学霸”能回答安全技术问题、分析攻击日志、甚至帮你写安全脚本。但直接问问题还不够酷。在这篇教程里我要带你做一件更有意思的事用LangChain这个“胶水”框架把SecGPT-14B变成你公司的专属网络安全知识代理。它能记住你们公司的网络架构、安全策略还能调用外部工具帮你查漏洞、分析威胁。我会手把手带你从零开始搭建一个能真正用起来的系统。即使你之前没玩过大模型跟着步骤走也能搞定。2. 环境准备5分钟快速部署SecGPT-14B2.1 为什么选择这个镜像在开始之前我们先看看为什么推荐用CSDN星图镜像广场提供的SecGPT-14B镜像。这能帮你省掉很多麻烦开箱即用模型已经预装好了你不用自己下载几十GB的文件。双卡优化镜像针对双4090显卡24G显存做了优化跑起来更流畅。两种使用方式既有网页聊天界面也有标准的API接口想怎么用都行。服务稳定用Supervisor管理服务挂了会自动重启。2.2 一键访问与初体验部署简单到只需要一步打开浏览器访问这个链接https://gpu-hwg3q2zvdb-7860.web.gpu.csdn.net/页面打开后你会看到一个干净的聊天界面。先别急着写代码我们来试试它的本事。在输入框里问几个网络安全相关的问题“用大白话解释一下什么是XSS攻击”“给我写一个简单的Python脚本检测目录遍历漏洞。”“分析这段Apache日志看看有没有可疑行为。”随便贴一段日志试试你会看到SecGPT-14B的回答确实很专业不是那种泛泛而谈的通用回答。它真的懂安全。页面上还有几个参数可以调temperature温度值越高回答越随机、有创意值越低回答越确定、保守。安全分析建议用0.3左右。top_p控制回答的多样性一般用0.9。max_tokens回答的最大长度根据问题复杂度调整。2.3 API调用测试网页界面适合手动测试但我们要做自动化系统得用API。好在镜像已经提供了OpenAI兼容的API。打开你的终端运行下面这个命令看看API能不能通curl http://127.0.0.1:8000/v1/models如果返回类似下面的信息说明API服务正常{ object: list, data: [ { id: SecGPT-14B, object: model, created: 1677610602, owned_by: clouditera } ] }再试试真正的对话请求curl http://127.0.0.1:8000/v1/chat/completions \ -H Content-Type: application/json \ -d { model: SecGPT-14B, messages: [ {role: user, content: 一句话解释什么是XSS攻击} ], temperature: 0.3, max_tokens: 256 }你会得到一个JSON格式的回答。到这里SecGPT-14B的基础环境就准备好了。3. LangChain入门你的AI应用“脚手架”3.1 LangChain是什么为什么需要它你可能想问我直接调用SecGPT-14B的API不就行了吗为什么还要用LangChain我打个比方SecGPT-14B就像一台性能强大的发动机但LangChain是整辆车的底盘、传动系统和控制系统。直接调用API你每次都得重新造轮子用LangChain很多复杂功能它已经帮你做好了。具体来说LangChain能帮你管理对话历史让AI记住之前的聊天内容实现多轮对话。连接外部工具让AI不仅能聊天还能查数据库、调用API、执行命令。处理长文本把超长的文档拆分成小块让AI能理解。构建复杂流程把多个AI调用、工具调用组合成一个工作流。3.2 安装与基础配置首先创建一个新的Python环境建议用Python 3.9然后安装LangChain# 创建虚拟环境可选但推荐 python -m venv secgpt-env source secgpt-env/bin/activate # Linux/Mac # 或 secgpt-env\Scripts\activate # Windows # 安装LangChain和相关依赖 pip install langchain langchain-community langchain-openai pip install python-dotenv # 用于管理环境变量创建一个配置文件.env存放你的API信息# .env 文件 SECGPT_API_BASEhttp://127.0.0.1:8000/v1 SECGPT_MODELSecGPT-14B SECGPT_API_KEYyour-api-key-here # 如果SecGPT需要的话注意我们用的SecGPT-14B镜像默认不需要API Key但留个配置项更规范。3.3 第一个LangChain应用安全问答助手现在我们来写第一个简单的LangChain应用连接SecGPT-14B# secgpt_basic.py import os from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage # 加载环境变量 load_dotenv() # 创建SecGPT-14B的LangChain客户端 # 注意我们通过api_base参数指向本地的SecGPT服务 secgpt_llm ChatOpenAI( modelos.getenv(SECGPT_MODEL, SecGPT-14B), openai_api_baseos.getenv(SECGPT_API_BASE, http://127.0.0.1:8000/v1), openai_api_keyos.getenv(SECGPT_API_KEY, not-needed), temperature0.3, max_tokens1024, ) # 定义一个系统提示词告诉AI它的角色 system_prompt 你是一个专业的网络安全专家擅长 1. 分析安全漏洞和攻击手法 2. 提供实用的防护建议 3. 解释复杂的安全概念 4. 编写安全脚本和配置 请用专业但易懂的方式回答用户的问题。如果遇到不确定的情况请明确说明。 def ask_security_question(question): 向SecGPT-14B提问安全相关问题 messages [ SystemMessage(contentsystem_prompt), HumanMessage(contentquestion) ] try: response secgpt_llm.invoke(messages) return response.content except Exception as e: return f请求失败: {str(e)} # 测试一下 if __name__ __main__: # 测试几个安全问题 test_questions [ 如何检测一个网站是否存在SQL注入漏洞, 给我一个简单的Python脚本用于检测开放端口, 解释一下CSRF攻击的原理和防护方法 ] for i, question in enumerate(test_questions, 1): print(f\n{*50}) print(f问题 {i}: {question}) print(f{*50}) answer ask_security_question(question) print(f回答:\n{answer}) print(f{*50})运行这个脚本你会看到SecGPT-14B的专业回答。这比直接调用API代码更简洁而且LangChain帮我们处理了消息格式、错误处理等细节。4. 构建企业专属网络安全知识代理4.1 什么是知识代理它能做什么知识代理Knowledge Agent不是简单的问答机器人。它结合了三个能力长期记忆能记住你们公司的网络架构、安全策略、历史事件。工具使用能调用外部工具比如漏洞扫描器、威胁情报API。推理决策能分析情况决定下一步该做什么。举个例子当员工报告“收到可疑钓鱼邮件”时一个基础问答机器人只能给出通用建议。但知识代理可以查数据库看这个发件人是不是黑名单里的调用沙箱API分析邮件附件是否恶意根据公司策略建议是否要全网预警生成处置报告自动发给安全团队4.2 第一步给代理添加“记忆”记忆是代理的核心。我们先用最简单的方式——对话历史记忆# secgpt_with_memory.py from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationChain # 创建记忆存储 memory ConversationBufferMemory( memory_keyhistory, return_messagesTrue ) # 创建对话链 conversation ConversationChain( llmsecgpt_llm, memorymemory, verboseTrue # 显示详细过程调试用 ) # 测试多轮对话 print(第一轮询问XSS攻击) response1 conversation.predict(input什么是XSS攻击) print(f回答: {response1}\n) print(第二轮基于上文询问防护) response2 conversation.predict(input针对存储型XSS有什么具体的防护建议) print(f回答: {response2}\n) print(第三轮询问我们刚才讨论的内容) response3 conversation.predict(input我们刚才讨论了哪几种XSS攻击) print(f回答: {response3}) # 查看记忆内容 print(\n当前对话历史) print(memory.load_memory_variables({}))运行这个脚本你会发现AI能记住之前的对话内容。但这还不够我们还需要让AI记住公司的专属知识。4.3 第二步加载企业专属知识库假设你们公司有这些文档网络拓扑图说明安全运维手册历史安全事件报告员工安全守则我们可以用LangChain的文档加载器和向量数据库让AI学习这些知识# knowledge_base.py import os from langchain_community.document_loaders import TextLoader, DirectoryLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings # 注意SecGPT-14B没有提供嵌入模型我们用开源的嵌入模型 # 这里用HuggingFace的嵌入模型你也可以用OpenAI的text-embedding-ada-002 from langchain_community.embeddings import HuggingFaceEmbeddings class CompanySecurityKnowledgeBase: def __init__(self, knowledge_dir./company_knowledge): 初始化企业安全知识库 self.knowledge_dir knowledge_dir self.vectorstore None # 使用开源的嵌入模型 self.embeddings HuggingFaceEmbeddings( model_namesentence-transformers/all-MiniLM-L6-v2 ) def load_documents(self): 加载公司安全文档 if not os.path.exists(self.knowledge_dir): print(f知识库目录不存在: {self.knowledge_dir}) return [] # 加载所有文本文件 loader DirectoryLoader( self.knowledge_dir, glob**/*.txt, loader_clsTextLoader ) documents loader.load() print(f加载了 {len(documents)} 个文档) return documents def create_knowledge_base(self): 创建向量知识库 # 1. 加载文档 documents self.load_documents() if not documents: print(没有找到文档创建空知识库) return None # 2. 分割文档 text_splitter RecursiveCharacterTextSplitter( chunk_size500, # 每个块500字符 chunk_overlap50, # 块之间重叠50字符 length_functionlen, ) splits text_splitter.split_documents(documents) print(f文档分割为 {len(splits)} 个块) # 3. 创建向量存储 self.vectorstore Chroma.from_documents( documentssplits, embeddingself.embeddings, persist_directory./chroma_db ) print(知识库创建完成) return self.vectorstore def search_similar(self, query, k3): 搜索相关知识 if self.vectorstore is None: print(请先创建知识库) return [] results self.vectorstore.similarity_search(query, kk) return results # 使用示例 if __name__ __main__: # 先创建一些示例文档 os.makedirs(./company_knowledge, exist_okTrue) # 示例公司网络拓扑说明 with open(./company_knowledge/network_topology.txt, w) as f: f.write(公司网络分为三个区域 1. DMZ区对外服务包括Web服务器192.168.1.10-20、邮件服务器192.168.1.30 2. 内网区员工办公网络IP段 10.0.0.0/24 3. 数据区数据库服务器IP段 172.16.0.0/24 安全策略 - DMZ到内网只允许特定端口 - 所有服务器必须安装EDR终端防护 - 数据库访问需要双因素认证) # 示例安全运维手册 with open(./company_knowledge/security_manual.txt, w) as f: f.write(安全事件响应流程 1. 发现事件员工报告或监控告警 2. 初步分析确认事件类型和影响范围 3. 应急处置隔离受影响系统 4. 根因分析调查攻击路径 5. 恢复整改修复漏洞恢复服务 6. 总结报告记录经验教训 漏洞修复SLA - 高危漏洞24小时内修复 - 中危漏洞7天内修复 - 低危漏洞30天内修复) # 创建知识库 kb CompanySecurityKnowledgeBase() vectorstore kb.create_knowledge_base() # 测试搜索 query 我们公司的数据库服务器在哪里 results kb.search_similar(query) print(f\n搜索查询: {query}) print(相关结果:) for i, doc in enumerate(results, 1): print(f{i}. {doc.page_content[:100]}...)这个知识库现在能理解你们公司的专属信息了。当员工问“数据库服务器怎么访问”时AI能基于公司文档给出准确回答。4.4 第三步让代理使用工具知识库让AI更懂你们公司工具让AI能“动手做事”。我们给代理添加几个实用的安全工具# security_tools.py import subprocess import requests import json from typing import Dict, Any from langchain.tools import tool from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder class SecurityTools: 网络安全工具集 tool def scan_ports(self, target: str, ports: str 1-1000) - str: 扫描目标主机的开放端口 Args: target: 目标IP或域名 ports: 端口范围如 1-1000 或 80,443,8080 Returns: 开放端口列表 try: # 使用nmap扫描需要系统安装nmap cmd fnmap -p {ports} {target} result subprocess.run( cmd, shellTrue, capture_outputTrue, textTrue, timeout30 ) if result.returncode 0: return result.stdout else: return f扫描失败: {result.stderr} except Exception as e: return f工具执行错误: {str(e)} tool def check_vulnerability(self, cve_id: str) - str: 查询CVE漏洞信息 Args: cve_id: CVE编号如 CVE-2021-44228 Returns: 漏洞详情和修复建议 try: # 使用公开的CVE数据库API url fhttps://cve.circl.lu/api/cve/{cve_id} response requests.get(url, timeout10) if response.status_code 200: data response.json() summary data.get(summary, 无描述) cvss data.get(cvss, 未知) return fCVE: {cve_id} 严重程度: CVSS {cvss} 描述: {summary} 建议: 请及时更新受影响软件到最新版本 else: return f未找到CVE信息: {cve_id} except Exception as e: return f查询失败: {str(e)} tool def analyze_log(self, log_text: str) - str: 分析安全日志中的可疑行为 Args: log_text: 日志文本 Returns: 分析结果和可疑项 # 这里我们可以让SecGPT-14B来分析日志 # 先简单实现实际中可以更复杂 suspicious_patterns [ failed password, invalid user, brute force, sql injection, xss, unauthorized, access denied ] findings [] lines log_text.lower().split(\n) for i, line in enumerate(lines, 1): for pattern in suspicious_patterns: if pattern in line: findings.append(f第{i}行: 发现{pattern} - {line[:100]}...) break if findings: return 发现可疑行为:\n \n.join(findings[:10]) # 最多显示10条 else: return 未发现明显可疑行为 tool def get_security_advice(self, topic: str) - str: 获取特定主题的安全建议 Args: topic: 安全主题如 密码安全、钓鱼防护 Returns: 安全建议列表 # 这个工具会调用SecGPT-14B来生成建议 prompt f你是一个网络安全专家请提供关于{topic}的5条实用安全建议。 建议要具体、可操作适合企业环境。 格式 1. 第一条建议 2. 第二条建议 ... # 这里我们直接返回一个示例实际中应该调用SecGPT advice_map { 密码安全: [ 1. 使用至少12位复杂密码包含大小写字母、数字和特殊字符, 2. 不同系统使用不同密码, 3. 启用双因素认证, 4. 定期更换密码但不要过于频繁, 5. 使用密码管理器管理密码 ], 钓鱼防护: [ 1. 仔细检查发件人邮箱注意拼写错误, 2. 不要点击来历不明的链接, 3. 警惕紧急或威胁性语言, 4. 验证请求的真实性特别是涉及转账的, 5. 定期进行钓鱼演练培训 ] } if topic in advice_map: return \n.join(advice_map[topic]) else: return f关于{topic}的建议\n1. 保持系统和软件更新\n2. 定期进行安全培训\n3. 实施最小权限原则\n4. 启用日志监控\n5. 制定应急预案 # 创建工具实例 security_tools SecurityTools() tools [ security_tools.scan_ports, security_tools.check_vulnerability, security_tools.analyze_log, security_tools.get_security_advice ] # 测试工具 if __name__ __main__: # 测试端口扫描工具 print(测试端口扫描:) result security_tools.scan_ports(127.0.0.1, 80,443,8080) print(result[:500] ...\n) # 测试CVE查询 print(测试CVE查询:) result security_tools.check_vulnerability(CVE-2021-44228) print(result \n) # 测试日志分析 print(测试日志分析:) sample_log 2024-01-01 12:00:00 Login successful for user admin 2024-01-01 12:00:05 Failed password for user root 2024-01-01 12:00:10 Invalid user test from 192.168.1.100 result security_tools.analyze_log(sample_log) print(result)这些工具让AI不仅能说还能做。比如当AI分析日志发现暴力破解尝试时它可以建议“是否要调用端口扫描工具检查攻击源”4.5 第四步组装完整的安全代理现在我们把记忆、知识库、工具组合起来创建一个完整的安全代理# security_agent.py from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.memory import ConversationBufferMemory from langchain.tools.retriever import create_retriever_tool class SecurityAgent: def __init__(self, llm, vectorstoreNone): 初始化安全代理 self.llm llm self.vectorstore vectorstore self.memory ConversationBufferMemory( memory_keychat_history, return_messagesTrue ) # 获取基础工具 self.tools self._setup_tools() # 创建代理 self.agent self._create_agent() def _setup_tools(self): 设置代理可用的工具 tools [] # 添加安全工具 security_tools_obj SecurityTools() tools.extend([ security_tools_obj.scan_ports, security_tools_obj.check_vulnerability, security_tools_obj.analyze_log, security_tools_obj.get_security_advice ]) # 如果知识库存在添加检索工具 if self.vectorstore: retriever self.vectorstore.as_retriever( search_kwargs{k: 3} ) retriever_tool create_retriever_tool( retriever, search_company_knowledge, 搜索公司内部的安全文档、策略和网络拓扑信息。当问题涉及公司内部信息时使用此工具。 ) tools.append(retriever_tool) return tools def _create_agent(self): 创建代理执行器 # 系统提示词定义代理的角色和能力 system_prompt 你是一个企业网络安全助手具有以下能力 1. 回答网络安全技术问题 2. 分析安全日志和事件 3. 提供漏洞信息和修复建议 4. 扫描网络端口 5. 查询公司内部安全知识库 6. 提供具体可操作的安全建议 请遵循以下原则 - 优先使用工具获取准确信息 - 涉及公司内部信息时先搜索知识库 - 回答要专业但易懂 - 如果不确定请明确说明 - 对于复杂问题分步骤解答 当前对话历史 {chat_history} 请根据用户问题选择合适工具或直接回答。 prompt ChatPromptTemplate.from_messages([ (system, system_prompt), MessagesPlaceholder(variable_namechat_history), (human, {input}), MessagesPlaceholder(variable_nameagent_scratchpad) ]) # 创建代理 agent create_openai_tools_agent( llmself.llm, toolsself.tools, promptprompt ) # 创建执行器 agent_executor AgentExecutor( agentagent, toolsself.tools, memoryself.memory, verboseTrue, # 显示思考过程生产环境可以关闭 handle_parsing_errorsTrue, max_iterations5 # 最多尝试5次 ) return agent_executor def ask(self, question: str) - str: 向代理提问 try: response self.agent.invoke({input: question}) return response[output] except Exception as e: return f代理执行出错: {str(e)} # 使用示例 if __name__ __main__: # 初始化SecGPT-14B from langchain_openai import ChatOpenAI secgpt_llm ChatOpenAI( modelSecGPT-14B, openai_api_basehttp://127.0.0.1:8000/v1, openai_api_keynot-needed, temperature0.3, max_tokens1024, ) # 如果有知识库可以加载 # from knowledge_base import CompanySecurityKnowledgeBase # kb CompanySecurityKnowledgeBase() # vectorstore kb.create_knowledge_base() # agent SecurityAgent(secgpt_llm, vectorstore) # 没有知识库的简化版本 agent SecurityAgent(secgpt_llm) # 测试代理 questions [ 扫描一下本地主机的80和443端口, 查询CVE-2021-44228漏洞信息, 分析这段日志Failed password for root from 192.168.1.100, 给我一些密码安全的建议, 我们公司的数据库服务器怎么防护 # 如果有知识库会搜索公司文档 ] for question in questions: print(f\n{*60}) print(f问题: {question}) print(f{*60}) answer agent.ask(question) print(f回答:\n{answer}) print(f{*60})运行这个代理你会看到AI的思考过程它先判断该用什么工具然后调用工具最后整理结果回答你。这就是一个真正的智能代理了。5. 实战案例构建自动化安全巡检系统5.1 场景描述每日安全巡检安全工程师每天要做很多重复性工作检查日志、扫描漏洞、查看告警。我们可以用SecGPT-14B代理自动化这些任务。假设每天需要检查关键服务器的开放端口变化分析防火墙日志中的异常连接检查已知漏洞的修复状态生成巡检报告5.2 实现自动化巡检脚本# security_checkup.py import schedule import time from datetime import datetime import json from security_agent import SecurityAgent class AutomatedSecurityCheckup: def __init__(self, agent): self.agent agent self.checkup_history [] def check_ports(self, servers): 检查服务器端口 print(f[{datetime.now()}] 开始端口检查...) results [] for server in servers: question f扫描服务器 {server} 的常见端口 (21,22,23,80,443,3389,8080) answer self.agent.ask(question) results.append({ server: server, time: datetime.now().isoformat(), result: answer[:500] # 只保存前500字符 }) time.sleep(2) # 避免请求过快 return results def analyze_firewall_logs(self, log_file): 分析防火墙日志 print(f[{datetime.now()}] 开始分析防火墙日志...) try: with open(log_file, r) as f: logs f.read() # 只分析最近1000行避免太长 recent_logs \n.join(logs.split(\n)[-1000:]) question f分析以下防火墙日志找出可疑连接\n{recent_logs} answer self.agent.ask(question) return { time: datetime.now().isoformat(), log_file: log_file, analysis: answer } except Exception as e: return {error: str(e)} def check_vulnerabilities(self, cve_list): 检查漏洞信息 print(f[{datetime.now()}] 开始漏洞检查...) results [] for cve in cve_list: question f查询漏洞 {cve} 的详细信息 answer self.agent.ask(question) results.append({ cve: cve, time: datetime.now().isoformat(), info: answer }) time.sleep(1) return results def generate_report(self, port_results, log_results, vuln_results): 生成巡检报告 print(f[{datetime.now()}] 生成巡检报告...) question f基于以下巡检结果生成一份专业的安全巡检报告 1. 端口扫描结果{json.dumps(port_results, ensure_asciiFalse)[:1000]} 2. 防火墙日志分析{json.dumps(log_results, ensure_asciiFalse)[:1000]} 3. 漏洞检查结果{json.dumps(vuln_results, ensure_asciiFalse)[:1000]} 报告需要包括 - 执行摘要 - 发现的问题 - 风险等级评估 - 修复建议 - 后续行动计划 请用中文生成报告。 report self.agent.ask(question) return report def daily_checkup(self): 每日巡检任务 print(f\n{*60}) print(f[{datetime.now()}] 开始每日安全巡检) print(f{*60}) # 1. 检查端口 servers [192.168.1.1, 192.168.1.10, 192.168.1.20] port_results self.check_ports(servers) # 2. 分析日志假设日志文件存在 log_results self.analyze_firewall_logs(/var/log/firewall.log) # 3. 检查漏洞 cve_list [CVE-2021-44228, CVE-2021-45046, CVE-2022-22965] vuln_results self.check_vulnerabilities(cve_list) # 4. 生成报告 report self.generate_report(port_results, log_results, vuln_results) # 5. 保存结果 checkup_record { timestamp: datetime.now().isoformat(), port_scan: port_results, log_analysis: log_results, vulnerability_check: vuln_results, report: report } self.checkup_history.append(checkup_record) # 保存到文件 with open(fsecurity_checkup_{datetime.now().strftime(%Y%m%d)}.json, w) as f: json.dump(checkup_record, f, ensure_asciiFalse, indent2) print(f\n巡检报告已生成:) print(report[:1000] ... if len(report) 1000 else report) print(f\n详细结果已保存到文件) print(f{*60}) return checkup_record def run_scheduled(self, hour9, minute0): 定时运行巡检 print(f安全巡检系统已启动将在每天 {hour:02d}:{minute:02d} 运行) # 安排每日任务 schedule.every().day.at(f{hour:02d}:{minute:02d}).do(self.daily_checkup) # 立即运行一次测试用 self.daily_checkup() # 保持运行 while True: schedule.run_pending() time.sleep(60) # 使用示例 if __name__ __main__: # 初始化代理需要先启动SecGPT-14B服务 from langchain_openai import ChatOpenAI secgpt_llm ChatOpenAI( modelSecGPT-14B, openai_api_basehttp://127.0.0.1:8000/v1, openai_api_keynot-needed, temperature0.3, max_tokens2048, ) # 创建代理 from security_agent import SecurityAgent agent SecurityAgent(secgpt_llm) # 创建巡检系统 checkup_system AutomatedSecurityCheckup(agent) # 运行一次巡检测试 print(运行测试巡检...) result checkup_system.daily_checkup() # 如果要定时运行取消下面这行的注释 # checkup_system.run_scheduled(hour9, minute0) # 每天9点运行这个系统能自动完成日常安全巡检生成专业报告大大减轻安全工程师的重复性工作。5.3 扩展功能实时威胁监控我们还可以扩展这个系统实现实时威胁监控# threat_monitor.py import threading import queue import time from datetime import datetime class RealTimeThreatMonitor: def __init__(self, agent, alert_callbackNone): self.agent agent self.alert_callback alert_callback self.log_queue queue.Queue() self.running False def monitor_log_file(self, log_file, check_interval5): 监控日志文件变化 last_size 0 while self.running: try: current_size os.path.getsize(log_file) if current_size last_size: # 读取新增的日志 with open(log_file, r) as f: f.seek(last_size) new_logs f.read() last_size current_size if new_logs.strip(): self.log_queue.put(new_logs) time.sleep(check_interval) except Exception as e: print(f监控日志文件出错: {e}) time.sleep(check_interval) def analyze_logs(self): 分析日志队列中的内容 while self.running: try: # 获取日志最多等待10秒 log_batch self.log_queue.get(timeout10) # 分析日志 question f实时分析以下安全日志立即识别任何威胁或异常行为。 如果发现高危威胁请明确标注【高危警报】。 日志内容 {log_batch} 请提供 1. 威胁等级高危/中危/低危/正常 2. 具体发现 3. 建议的应急响应措施 analysis self.agent.ask(question) # 检查是否包含高危警报 if 【高危警报】 in analysis or 高危威胁 in analysis: print(f\n⚠️ 发现高危威胁) print(f时间: {datetime.now()}) print(f分析结果:\n{analysis}) # 触发警报回调 if self.alert_callback: self.alert_callback(analysis) # 记录分析结果 with open(threat_monitor.log, a) as f: f.write(f\n[{datetime.now()}] 日志分析:\n{analysis}\n) except queue.Empty: continue # 队列为空继续等待 except Exception as e: print(f分析日志出错: {e}) def start(self, log_files): 启动监控 self.running True # 启动日志监控线程 monitor_threads [] for log_file in log_files: thread threading.Thread( targetself.monitor_log_file, args(log_file,) ) thread.daemon True thread.start() monitor_threads.append(thread) # 启动分析线程 analysis_thread threading.Thread(targetself.analyze_logs) analysis_thread.daemon True analysis_thread.start() print(f威胁监控已启动监控文件: {log_files}) # 保持主线程运行 try: while self.running: time.sleep(1) except KeyboardInterrupt: self.stop() def stop(self): 停止监控 self.running False print(威胁监控已停止) def alert_to_slack(self, message): 发送警报到Slack示例 # 这里可以实现发送到Slack、钉钉、企业微信等 print(f发送警报: {message[:200]}...) # 使用示例 if __name__ __main__: # 初始化代理 from langchain_openai import ChatOpenAI from security_agent import SecurityAgent secgpt_llm ChatOpenAI( modelSecGPT-14B, openai_api_basehttp://127.0.0.1:8000/v1, openai_api_keynot-needed, temperature0.3, max_tokens1024, ) agent SecurityAgent(secgpt_llm) # 创建监控器 monitor RealTimeThreatMonitor( agent, alert_callbacklambda msg: print(f警报回调: {msg[:100]}...) ) # 监控的日志文件示例 log_files [ /var/log/auth.log, # 认证日志 /var/log/syslog, # 系统日志 /var/log/nginx/access.log # Web访问日志 ] # 启动监控在实际环境中使用 # monitor.start(log_files) print(威胁监控系统就绪示例代码实际使用时取消注释监控启动)这个实时监控系统能7x24小时分析日志发现威胁立即告警让安全团队能快速响应。6. 总结与展望6.1 我们构建了什么通过这篇教程我们完成了一个从零到一的企业级网络安全知识代理系统部署了SecGPT-14B一个专业的网络安全大模型能理解安全领域的专业问题。集成了LangChain用这个框架管理对话记忆、连接知识库、调用工具。创建了知识代理结合了长期记忆、工具使用和推理决策能力。实现了实用功能包括安全问答、日志分析、漏洞查询、端口扫描。构建了自动化系统每日安全巡检和实时威胁监控。这个系统不再是简单的问答机器人而是一个能真正帮安全工程师干活的智能助手。6.2 实际应用价值在实际工作中这个系统能降低工作负担自动化重复性巡检任务让工程师专注复杂问题。快速响应7x24小时监控发现威胁立即告警。知识传承把老师傅的经验沉淀到知识库新人也能快速上手。减少人为错误标准化分析流程避免遗漏重要信息。提升效率一个代理相当于多个初级安全工程师的工作量。6.3 下一步可以做什么如果你已经跟着教程搭建了基础系统这里有一些进阶方向集成更多工具漏洞扫描器Nessus、OpenVASSIEM系统Splunk、ELK威胁情报平台工单系统Jira、ServiceNow优化知识库接入公司Confluence、Wiki文档定期自动更新知识库添加图片、PDF等非文本内容增强代理能力多代理协作不同代理负责不同领域自主决策让代理在特定条件下自动执行操作学习优化根据反馈不断改进回答质量完善监控告警集成短信、电话告警分级告警策略自动生成处置工单6.4 开始你的实践最好的学习方式就是动手做。我建议你从简单开始先部署SecGPT-14B体验基础问答功能。逐步添加先加一个工具比如日志分析用起来再加下一个。结合实际从你们公司最痛的点开始比如每日巡检或者告警分析。持续迭代根据使用反馈不断优化提示词、工具和流程。记住技术是为人服务的。这个系统的目标是帮助安全工程师更好地工作而不是取代他们。合理的自动化能让人从重复劳动中解放出来去做更有创造性的工作。网络安全的世界每天都在变化新的威胁不断出现。有了AI助手的帮助我们能更快地发现威胁、更准地分析攻击、更好地保护系统。这只是一个开始未来的可能性还有很多。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。