SecGPT-14B保姆级教程Chainlit中添加用户权限控制与审计日志功能1. 引言为什么需要权限与审计想象一下你部署了一个功能强大的网络安全助手SecGPT-14B团队里的同事都能通过Chainlit前端向它提问。这很方便但很快你会发现一些问题实习生问了不该问的敏感漏洞细节怎么办谁在什么时间问了什么问题出了事怎么追溯模型被恶意输入攻击了怎么发现这就是我们今天要解决的问题。一个没有权限控制和审计日志的AI应用就像一栋没有门锁和监控的大楼谁都能进发生了什么也无从查起。对于SecGPT-14B这样处理安全信息的模型这尤其危险。本教程将手把手教你在已经用vLLM部署好的SecGPT-14B和Chainlit前端基础上如何像搭积木一样轻松添加两样关键“安保设施”用户权限控制给不同的人发不同的“门禁卡”控制他们能问什么。审计日志功能安装一个“全方位监控”记录谁、在什么时候、问了什么、得到了什么回答。学完这篇教程你将得到一个更安全、更可控、更专业的SecGPT-14B应用。我们直接从你已经部署好的环境开始不绕弯子用最清晰的步骤和代码让你快速实现这些功能。2. 环境与项目准备在开始动手之前我们先确认一下你的“施工场地”是否准备就绪。2.1 确认基础环境根据你提供的描述我们已经有一个正在运行的环境模型服务使用vLLM成功部署了SecGPT-14B模型。前端界面使用Chainlit作为Web交互界面。验证方式可以通过Chainlit页面正常提问并获取回答例如询问“什么是XSS攻击”。如果你的环境还没达到这一步请先参考相关文档完成SecGPT-14B的基础部署和Chainlit的对接。本教程假设你的chainlit应用主文件例如app.py已经能够正常调用模型。2.2 规划项目结构为了让代码清晰好维护我们稍微规划一下目录。在你的Chainlit项目根目录下可能会是这样的结构your_chainlit_project/ ├── app.py # 现有的Chainlit主应用文件 ├── auth.py # 【新增】存放权限认证相关逻辑 ├── logger.py # 【新增】存放审计日志相关逻辑 ├── requirements.txt # 项目依赖文件 └── .env # 【可选】存放敏感配置如密钥接下来我们首先来打造“门禁系统”——用户权限控制。3. 实现用户权限控制权限控制的核心是识别用户是谁然后判断他能不能执行某个操作。在Chainlit中我们可以利用其提供的认证回调机制来实现。3.1 创建认证与权限模块我们新建一个auth.py文件在这里实现用户管理和权限检查。# auth.py import os from typing import Optional, Dict, List import hashlib import secrets # 示例用户数据库。在实际生产中应使用数据库如SQLite, PostgreSQL # 这里密码存储使用了简单的加盐哈希生产环境应考虑更安全的方案如bcrypt。 USERS_DB { admin: { password_hash: hashlib.sha256((“your_admin_password_salt” “secure_password_123”).encode()).hexdigest(), salt: “your_admin_password_salt”, role: “admin”, “permissions”: [“ask_question”, “view_logs”, “manage_users”] # 管理员权限 }, “analyst”: { “password_hash”: hashlib.sha256((“analyst_salt” “analyst_pass”).encode()).hexdigest(), “salt”: “analyst_salt”, “role”: “security_analyst”, “permissions”: [“ask_question”] # 分析师只能提问 }, “intern”: { “password_hash”: hashlib.sha256((“intern_salt” “intern_pass”).encode()).hexdigest(), “salt”: “intern_salt”, “role”: “intern”, “permissions”: [“ask_question”] # 实习生默认也只有提问权限但可通过规则限制内容 } } # 权限规则定义不同角色/用户对敏感话题的访问限制 SENSITIVE_TOPICS [“0day”, “exploit”, “bypass”, “sqlmap tamper”, “webshell密码”] PERMISSION_RULES { “intern”: { “blocked_keywords”: SENSITIVE_TOPICS, # 实习生禁止询问包含这些关键词的问题 “allowed_domains”: [“basic_concept”, “xss”, “csrf”] # 只允许询问基础概念类问题 }, “security_analyst”: { “blocked_keywords”: [], “allowed_domains”: [“all”] # 分析师可以问所有类型问题 }, “admin”: { “blocked_keywords”: [], “allowed_domains”: [“all”] } } def verify_password(username: str, password: str) - bool: 验证用户名和密码 user USERS_DB.get(username) if not user: return False # 重新计算哈希进行验证 input_hash hashlib.sha256((user[“salt”] password).encode()).hexdigest() return secrets.compare_digest(input_hash, user[“password_hash”]) def get_user_role(username: str) - Optional[str]: 获取用户角色 user USERS_DB.get(username) return user.get(“role”) if user else None def check_permission(username: str, permission: str) - bool: 检查用户是否拥有特定权限如‘ask_question’ user USERS_DB.get(username) if not user: return False return permission in user.get(“permissions”, []) def validate_question_content(username: str, question: str) - Dict: 根据用户角色验证问题内容是否被允许。 返回字典{“allowed”: bool, “reason”: str} role get_user_role(username) if not role: return {“allowed”: False, “reason”: “用户角色未知”} rules PERMISSION_RULES.get(role, {}) # 检查关键词黑名单 for keyword in rules.get(“blocked_keywords”, []): if keyword.lower() in question.lower(): return {“allowed”: False, “reason”: f“问题包含受限关键词 ‘{keyword}‘”} # 这里可以添加更复杂的域检查逻辑例如使用一个分类模型对问题进行分类 # 本示例中我们仅做关键词匹配的简单演示 return {“allowed”: True, “reason”: “”}3.2 在Chainlit中集成认证现在我们需要修改主app.py让Chainlit在启动时使用我们的认证逻辑。# app.py import chainlit as cl from auth import verify_password, get_user_role, check_permission, validate_question_content import sys import os # 假设你已有调用vLLM模型的函数这里用placeholder代替 from your_model_client import call_secgpt_model # 1. 设置Chainlit认证回调函数 cl.password_auth_callback def auth_callback(username: str, password: str): Chainlit调用的认证函数 if verify_password(username, password): role get_user_role(username) # 返回一个cl.User对象可以附带identifier和metadata return cl.User( identifierusername, metadata{“role”: role, “username”: username} ) else: # 认证失败返回None return None # 2. 可选设置一个简单的权限检查装饰器或中间件 def require_permission(permission: str): 一个简单的装饰器用于检查用户权限示例Chainlit原生支持有限 def decorator(func): async def wrapper(*args, **kwargs): # 获取当前会话用户信息 user cl.user_session.get(“user”) if not user: await cl.Message(content“未认证用户请先登录。”).send() return username user.metadata.get(“username”) from auth import check_permission if not check_permission(username, permission): await cl.Message(content“权限不足无法执行此操作。”).send() return return await func(*args, **kwargs) return wrapper return decorator # 3. 主消息处理函数现在包含权限验证 cl.on_message async def main(message: cl.Message): 处理用户消息的核心函数。 现在增加了内容安全检查。 # 获取当前用户 user cl.user_session.get(“user”) if not user: await cl.Message(content“会话异常请重新登录。”).send() return username user.metadata.get(“username”, “anonymous”) user_role user.metadata.get(“role”, “unknown”) # 记录原始问题用于审计日志下一节实现 raw_question message.content # 步骤1检查是否有提问权限 from auth import check_permission if not check_permission(username, “ask_question”): await cl.Message( contentf“抱歉您的角色 ‘{user_role}’ 没有提问权限。请联系管理员。” ).send() return # 步骤2验证问题内容是否合规 validation_result validate_question_content(username, raw_question) if not validation_result[“allowed”]: await cl.Message( contentf“提问内容未通过安全检查。原因{validation_result[‘reason’]}” ).send() # 注意这里可以触发审计日志记录违规尝试 return # 步骤3权限和内容都通过显示处理中状态并调用模型 msg cl.Message(content“”) await msg.send() # 模拟或实际调用SecGPT-14B模型 # 这里替换成你实际调用vLLM endpoint的代码 try: # 假设这是你的模型调用函数 model_response await call_secgpt_model(raw_question) # 你也可以在这里对模型输出进行二次安全检查如果需要 await msg.stream_token(model_response) except Exception as e: await msg.stream_token(f“调用模型时出错{str(e)}”) await msg.update()完成以上步骤后重启你的Chainlit应用。访问前端页面时首先会看到一个登录框。只有使用auth.py中定义的用户名和密码如admin/secure_password_123才能登录。实习生如果尝试询问包含“0day”的问题将会被拦截。门禁系统装好了接下来我们安装“监控系统”。4. 实现审计日志功能审计日志需要记录关键操作并安全存储。我们选择将日志写入文件并考虑结构化如JSON以便后续分析。4.1 创建日志记录模块新建logger.py文件。# logger.py import json import logging from datetime import datetime from typing import Dict, Any, Optional import os from pathlib import Path # 配置日志目录 LOG_DIR Path(“audit_logs”) LOG_DIR.mkdir(exist_okTrue) def get_log_file(): 按天生成日志文件例如 audit_2025_02_17.log today datetime.now().strftime(“%Y_%m_%d”) return LOG_DIR / f“audit_{today}.log” def log_audit_event(event_type: str, user_info: Dict, details: Dict, success: bool True): 记录审计事件的核心函数。 参数: event_type: 事件类型如 ‘USER_LOGIN‘, ‘QUESTION_ASKED‘, ‘PERMISSION_DENIED‘ user_info: 用户信息字典至少包含 ‘username‘, ‘role‘ details: 事件详情字典内容随事件类型变化 success: 操作是否成功 log_entry { “timestamp”: datetime.now().isoformat(), “event_type”: event_type, “user”: user_info, “details”: details, “success”: success, # 可以添加其他上下文如IP地址如果Chainlit能获取到 # “ip_address”: ip_address } log_file get_log_file() try: # 以追加模式写入每行一个JSON对象 with open(log_file, ‘a’, encoding‘utf-8’) as f: f.write(json.dumps(log_entry, ensure_asciiFalse) “\n”) except IOError as e: # 如果文件写入失败降级到标准日志 logging.error(f“Failed to write audit log: {e}”) logging.info(f“Audit event (fallback): {log_entry}”) # 一些便捷的日志函数 def log_user_login(username: str, role: str): 记录用户登录事件 log_audit_event( event_type“USER_LOGIN”, user_info{“username”: username, “role”: role}, details{“action”: “user_authentication”}, successTrue ) def log_question_asked(username: str, role: str, question: str, response_preview: str, tokens_used: Optional[int] None): 记录用户提问事件 details { “question”: question, “response_preview”: response_preview[:200], # 只记录预览避免日志过大 “action”: “model_query” } if tokens_used: details[“tokens_used”] tokens_used log_audit_event( event_type“QUESTION_ASKED”, user_info{“username”: username, “role”: role}, detailsdetails, successTrue ) def log_permission_denied(username: str, role: str, reason: str, question: str “”): 记录权限拒绝事件 details { “reason”: reason, “question_blocked”: question, “action”: “permission_check” } log_audit_event( event_type“PERMISSION_DENIED”, user_info{“username”: username, “role”: role}, detailsdetails, successFalse )4.2 在Chainlit中集成审计日志现在我们回到app.py在关键节点调用审计日志函数。# app.py (在之前代码基础上添加) import chainlit as cl from auth import verify_password, get_user_role, check_permission, validate_question_content from logger import log_user_login, log_question_asked, log_permission_denied # 导入日志函数 # ... 其他导入 ... cl.password_auth_callback def auth_callback(username: str, password: str): if verify_password(username, password): role get_user_role(username) # 【审计点】记录成功的登录 log_user_login(username, role) return cl.User( identifierusername, metadata{“role”: role, “username”: username} ) else: # 【审计点】可以在这里记录失败的登录尝试需要稍微修改log_audit_event以支持无用户 # 例如log_failed_login_attempt(username) return None cl.on_message async def main(message: cl.Message): user cl.user_session.get(“user”) if not user: await cl.Message(content“会话异常请重新登录。”).send() return username user.metadata.get(“username”, “anonymous”) user_role user.metadata.get(“role”, “unknown”) raw_question message.content # 权限检查 if not check_permission(username, “ask_question”): deny_reason f“角色 ‘{user_role}’ 无提问权限” # 【审计点】记录权限拒绝 log_permission_denied(username, user_role, deny_reason, raw_question) await cl.Message(contentf“抱歉{deny_reason}。请联系管理员。”).send() return # 内容验证 validation_result validate_question_content(username, raw_question) if not validation_result[“allowed”]: deny_reason validation_result[“reason”] # 【审计点】记录内容违规拒绝 log_permission_denied(username, user_role, deny_reason, raw_question) await cl.Message(contentf“提问内容未通过安全检查。原因{deny_reason}”).send() return # 处理问题... msg cl.Message(content“”) await msg.send() try: model_response await call_secgpt_model(raw_question) # 【审计点】记录成功的提问和响应预览 # 注意实际token使用量可能需要从vLLM响应头中获取 response_preview model_response[:200] (“…” if len(model_response) 200 else “”) log_question_asked(username, user_role, raw_question, response_preview) # tokens_used可后续添加 await msg.stream_token(model_response) except Exception as e: # 【审计点】可以记录模型调用失败事件 await msg.stream_token(f“调用模型时出错{str(e)}”) await msg.update()现在你的应用已经具备了完整的审计能力。所有关键操作都会被记录到audit_logs/目录下按日期分割的日志文件中格式为JSON Lines便于后续使用日志分析工具如ELK Stack或简单脚本进行查询和分析。5. 功能验证与测试代码写完了我们得试试它是不是真的管用。5.1 重启并测试应用重启Chainlit服务chainlit run app.py打开浏览器访问Chainlit提供的地址通常是http://localhost:8000。登录测试使用analyst/analyst_pass登录应该成功。使用错误的密码登录应该失败前端提示。权限测试用intern账号登录尝试提问“什么是XSS攻击”应该成功。用intern账号登录尝试提问“有没有最新的0day漏洞”应该被拦截并看到提示信息。审计日志检查在项目根目录下查看audit_logs/文件夹。打开当天的日志文件如audit_2025_02_17.log你应该能看到类似下面的JSON记录{timestamp: 2025-02-17T10:15:30.123456, event_type: USER_LOGIN, user: {username: analyst, role: security_analyst}, details: {action: user_authentication}, success: true} {timestamp: 2025-02-17T10:16:45.654321, event_type: PERMISSION_DENIED, user: {username: intern, role: intern}, details: {reason: 问题包含受限关键词 0day, question_blocked: 有没有最新的0day漏洞, action: permission_check}, success: false} {timestamp: 2025-02-17T10:17:20.987654, event_type: QUESTION_ASKED, user: {username: analyst, role: security_analyst}, details: {question: 什么是XSS攻击, response_preview: XSS跨站脚本攻击是一种..., action: model_query}, success: true}5.2 查看日志文件日志文件是纯文本的JSON Lines格式你可以用任何文本编辑器查看也可以用命令行工具快速分析# 查看今天的日志 tail -f audit_logs/audit_2025_02_17.log # 使用jq工具如果已安装美化查看和过滤 cat audit_logs/audit_2025_02_17.log | jq ‘.’ # 查找所有失败事件 cat audit_logs/audit_2025_02_17.log | jq ‘select(.success false)’6. 总结与进阶思考恭喜你你已经成功为你的SecGPT-14B Chainlit应用加装了“门禁”权限控制和“监控”审计日志。让我们回顾一下关键步骤和收获权限控制的核心我们通过auth.py模块管理用户和角色利用Chainlit的password_auth_callback实现登录认证并在消息处理函数中根据角色和规则对问题内容进行双重校验功能权限内容合规。审计日志的实现我们通过logger.py模块提供了结构化的日志记录功能并在app.py中的登录、提问、拒绝等关键节点埋点将所有重要操作以JSON格式按天归档。安全与实用兼顾示例代码中使用了密码加盐哈希、权限规则与内容关键词过滤为你的AI安全助手构建了基础但有效的安全防线。6.1 下一步可以做什么现在的实现是一个坚实起点你可以根据实际需求进一步强化它更安全的用户存储将USERS_DB从代码移到真正的数据库如SQLite、PostgreSQL并使用bcrypt或argon2等专业库哈希密码。更精细的权限模型实现RBAC基于角色的访问控制或ABAC基于属性的访问控制例如控制用户只能访问特定分类的知识库。更智能的内容过滤除了关键词可以集成一个轻量级文本分类模型在问题提交给SecGPT-14B之前先判断其所属的安全领域和敏感级别。日志分析与告警编写脚本定期分析审计日志自动检测异常模式如某个用户短时间内大量提问失败、频繁尝试敏感关键词并发送邮件或Slack告警。前端体验优化在Chainlit UI上显示当前登录用户和角色或者为管理员增加一个查看实时日志的页面。通过本教程你不仅为SecGPT-14B添加了关键的安全功能也掌握了一套为AI应用添加基础安全层的通用方法。希望你能在此基础上打造出更强大、更安全的智能安全运营助手。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。
SecGPT-14B保姆级教程:Chainlit中添加用户权限控制与审计日志功能
发布时间:2026/6/1 17:52:53
SecGPT-14B保姆级教程Chainlit中添加用户权限控制与审计日志功能1. 引言为什么需要权限与审计想象一下你部署了一个功能强大的网络安全助手SecGPT-14B团队里的同事都能通过Chainlit前端向它提问。这很方便但很快你会发现一些问题实习生问了不该问的敏感漏洞细节怎么办谁在什么时间问了什么问题出了事怎么追溯模型被恶意输入攻击了怎么发现这就是我们今天要解决的问题。一个没有权限控制和审计日志的AI应用就像一栋没有门锁和监控的大楼谁都能进发生了什么也无从查起。对于SecGPT-14B这样处理安全信息的模型这尤其危险。本教程将手把手教你在已经用vLLM部署好的SecGPT-14B和Chainlit前端基础上如何像搭积木一样轻松添加两样关键“安保设施”用户权限控制给不同的人发不同的“门禁卡”控制他们能问什么。审计日志功能安装一个“全方位监控”记录谁、在什么时候、问了什么、得到了什么回答。学完这篇教程你将得到一个更安全、更可控、更专业的SecGPT-14B应用。我们直接从你已经部署好的环境开始不绕弯子用最清晰的步骤和代码让你快速实现这些功能。2. 环境与项目准备在开始动手之前我们先确认一下你的“施工场地”是否准备就绪。2.1 确认基础环境根据你提供的描述我们已经有一个正在运行的环境模型服务使用vLLM成功部署了SecGPT-14B模型。前端界面使用Chainlit作为Web交互界面。验证方式可以通过Chainlit页面正常提问并获取回答例如询问“什么是XSS攻击”。如果你的环境还没达到这一步请先参考相关文档完成SecGPT-14B的基础部署和Chainlit的对接。本教程假设你的chainlit应用主文件例如app.py已经能够正常调用模型。2.2 规划项目结构为了让代码清晰好维护我们稍微规划一下目录。在你的Chainlit项目根目录下可能会是这样的结构your_chainlit_project/ ├── app.py # 现有的Chainlit主应用文件 ├── auth.py # 【新增】存放权限认证相关逻辑 ├── logger.py # 【新增】存放审计日志相关逻辑 ├── requirements.txt # 项目依赖文件 └── .env # 【可选】存放敏感配置如密钥接下来我们首先来打造“门禁系统”——用户权限控制。3. 实现用户权限控制权限控制的核心是识别用户是谁然后判断他能不能执行某个操作。在Chainlit中我们可以利用其提供的认证回调机制来实现。3.1 创建认证与权限模块我们新建一个auth.py文件在这里实现用户管理和权限检查。# auth.py import os from typing import Optional, Dict, List import hashlib import secrets # 示例用户数据库。在实际生产中应使用数据库如SQLite, PostgreSQL # 这里密码存储使用了简单的加盐哈希生产环境应考虑更安全的方案如bcrypt。 USERS_DB { admin: { password_hash: hashlib.sha256((“your_admin_password_salt” “secure_password_123”).encode()).hexdigest(), salt: “your_admin_password_salt”, role: “admin”, “permissions”: [“ask_question”, “view_logs”, “manage_users”] # 管理员权限 }, “analyst”: { “password_hash”: hashlib.sha256((“analyst_salt” “analyst_pass”).encode()).hexdigest(), “salt”: “analyst_salt”, “role”: “security_analyst”, “permissions”: [“ask_question”] # 分析师只能提问 }, “intern”: { “password_hash”: hashlib.sha256((“intern_salt” “intern_pass”).encode()).hexdigest(), “salt”: “intern_salt”, “role”: “intern”, “permissions”: [“ask_question”] # 实习生默认也只有提问权限但可通过规则限制内容 } } # 权限规则定义不同角色/用户对敏感话题的访问限制 SENSITIVE_TOPICS [“0day”, “exploit”, “bypass”, “sqlmap tamper”, “webshell密码”] PERMISSION_RULES { “intern”: { “blocked_keywords”: SENSITIVE_TOPICS, # 实习生禁止询问包含这些关键词的问题 “allowed_domains”: [“basic_concept”, “xss”, “csrf”] # 只允许询问基础概念类问题 }, “security_analyst”: { “blocked_keywords”: [], “allowed_domains”: [“all”] # 分析师可以问所有类型问题 }, “admin”: { “blocked_keywords”: [], “allowed_domains”: [“all”] } } def verify_password(username: str, password: str) - bool: 验证用户名和密码 user USERS_DB.get(username) if not user: return False # 重新计算哈希进行验证 input_hash hashlib.sha256((user[“salt”] password).encode()).hexdigest() return secrets.compare_digest(input_hash, user[“password_hash”]) def get_user_role(username: str) - Optional[str]: 获取用户角色 user USERS_DB.get(username) return user.get(“role”) if user else None def check_permission(username: str, permission: str) - bool: 检查用户是否拥有特定权限如‘ask_question’ user USERS_DB.get(username) if not user: return False return permission in user.get(“permissions”, []) def validate_question_content(username: str, question: str) - Dict: 根据用户角色验证问题内容是否被允许。 返回字典{“allowed”: bool, “reason”: str} role get_user_role(username) if not role: return {“allowed”: False, “reason”: “用户角色未知”} rules PERMISSION_RULES.get(role, {}) # 检查关键词黑名单 for keyword in rules.get(“blocked_keywords”, []): if keyword.lower() in question.lower(): return {“allowed”: False, “reason”: f“问题包含受限关键词 ‘{keyword}‘”} # 这里可以添加更复杂的域检查逻辑例如使用一个分类模型对问题进行分类 # 本示例中我们仅做关键词匹配的简单演示 return {“allowed”: True, “reason”: “”}3.2 在Chainlit中集成认证现在我们需要修改主app.py让Chainlit在启动时使用我们的认证逻辑。# app.py import chainlit as cl from auth import verify_password, get_user_role, check_permission, validate_question_content import sys import os # 假设你已有调用vLLM模型的函数这里用placeholder代替 from your_model_client import call_secgpt_model # 1. 设置Chainlit认证回调函数 cl.password_auth_callback def auth_callback(username: str, password: str): Chainlit调用的认证函数 if verify_password(username, password): role get_user_role(username) # 返回一个cl.User对象可以附带identifier和metadata return cl.User( identifierusername, metadata{“role”: role, “username”: username} ) else: # 认证失败返回None return None # 2. 可选设置一个简单的权限检查装饰器或中间件 def require_permission(permission: str): 一个简单的装饰器用于检查用户权限示例Chainlit原生支持有限 def decorator(func): async def wrapper(*args, **kwargs): # 获取当前会话用户信息 user cl.user_session.get(“user”) if not user: await cl.Message(content“未认证用户请先登录。”).send() return username user.metadata.get(“username”) from auth import check_permission if not check_permission(username, permission): await cl.Message(content“权限不足无法执行此操作。”).send() return return await func(*args, **kwargs) return wrapper return decorator # 3. 主消息处理函数现在包含权限验证 cl.on_message async def main(message: cl.Message): 处理用户消息的核心函数。 现在增加了内容安全检查。 # 获取当前用户 user cl.user_session.get(“user”) if not user: await cl.Message(content“会话异常请重新登录。”).send() return username user.metadata.get(“username”, “anonymous”) user_role user.metadata.get(“role”, “unknown”) # 记录原始问题用于审计日志下一节实现 raw_question message.content # 步骤1检查是否有提问权限 from auth import check_permission if not check_permission(username, “ask_question”): await cl.Message( contentf“抱歉您的角色 ‘{user_role}’ 没有提问权限。请联系管理员。” ).send() return # 步骤2验证问题内容是否合规 validation_result validate_question_content(username, raw_question) if not validation_result[“allowed”]: await cl.Message( contentf“提问内容未通过安全检查。原因{validation_result[‘reason’]}” ).send() # 注意这里可以触发审计日志记录违规尝试 return # 步骤3权限和内容都通过显示处理中状态并调用模型 msg cl.Message(content“”) await msg.send() # 模拟或实际调用SecGPT-14B模型 # 这里替换成你实际调用vLLM endpoint的代码 try: # 假设这是你的模型调用函数 model_response await call_secgpt_model(raw_question) # 你也可以在这里对模型输出进行二次安全检查如果需要 await msg.stream_token(model_response) except Exception as e: await msg.stream_token(f“调用模型时出错{str(e)}”) await msg.update()完成以上步骤后重启你的Chainlit应用。访问前端页面时首先会看到一个登录框。只有使用auth.py中定义的用户名和密码如admin/secure_password_123才能登录。实习生如果尝试询问包含“0day”的问题将会被拦截。门禁系统装好了接下来我们安装“监控系统”。4. 实现审计日志功能审计日志需要记录关键操作并安全存储。我们选择将日志写入文件并考虑结构化如JSON以便后续分析。4.1 创建日志记录模块新建logger.py文件。# logger.py import json import logging from datetime import datetime from typing import Dict, Any, Optional import os from pathlib import Path # 配置日志目录 LOG_DIR Path(“audit_logs”) LOG_DIR.mkdir(exist_okTrue) def get_log_file(): 按天生成日志文件例如 audit_2025_02_17.log today datetime.now().strftime(“%Y_%m_%d”) return LOG_DIR / f“audit_{today}.log” def log_audit_event(event_type: str, user_info: Dict, details: Dict, success: bool True): 记录审计事件的核心函数。 参数: event_type: 事件类型如 ‘USER_LOGIN‘, ‘QUESTION_ASKED‘, ‘PERMISSION_DENIED‘ user_info: 用户信息字典至少包含 ‘username‘, ‘role‘ details: 事件详情字典内容随事件类型变化 success: 操作是否成功 log_entry { “timestamp”: datetime.now().isoformat(), “event_type”: event_type, “user”: user_info, “details”: details, “success”: success, # 可以添加其他上下文如IP地址如果Chainlit能获取到 # “ip_address”: ip_address } log_file get_log_file() try: # 以追加模式写入每行一个JSON对象 with open(log_file, ‘a’, encoding‘utf-8’) as f: f.write(json.dumps(log_entry, ensure_asciiFalse) “\n”) except IOError as e: # 如果文件写入失败降级到标准日志 logging.error(f“Failed to write audit log: {e}”) logging.info(f“Audit event (fallback): {log_entry}”) # 一些便捷的日志函数 def log_user_login(username: str, role: str): 记录用户登录事件 log_audit_event( event_type“USER_LOGIN”, user_info{“username”: username, “role”: role}, details{“action”: “user_authentication”}, successTrue ) def log_question_asked(username: str, role: str, question: str, response_preview: str, tokens_used: Optional[int] None): 记录用户提问事件 details { “question”: question, “response_preview”: response_preview[:200], # 只记录预览避免日志过大 “action”: “model_query” } if tokens_used: details[“tokens_used”] tokens_used log_audit_event( event_type“QUESTION_ASKED”, user_info{“username”: username, “role”: role}, detailsdetails, successTrue ) def log_permission_denied(username: str, role: str, reason: str, question: str “”): 记录权限拒绝事件 details { “reason”: reason, “question_blocked”: question, “action”: “permission_check” } log_audit_event( event_type“PERMISSION_DENIED”, user_info{“username”: username, “role”: role}, detailsdetails, successFalse )4.2 在Chainlit中集成审计日志现在我们回到app.py在关键节点调用审计日志函数。# app.py (在之前代码基础上添加) import chainlit as cl from auth import verify_password, get_user_role, check_permission, validate_question_content from logger import log_user_login, log_question_asked, log_permission_denied # 导入日志函数 # ... 其他导入 ... cl.password_auth_callback def auth_callback(username: str, password: str): if verify_password(username, password): role get_user_role(username) # 【审计点】记录成功的登录 log_user_login(username, role) return cl.User( identifierusername, metadata{“role”: role, “username”: username} ) else: # 【审计点】可以在这里记录失败的登录尝试需要稍微修改log_audit_event以支持无用户 # 例如log_failed_login_attempt(username) return None cl.on_message async def main(message: cl.Message): user cl.user_session.get(“user”) if not user: await cl.Message(content“会话异常请重新登录。”).send() return username user.metadata.get(“username”, “anonymous”) user_role user.metadata.get(“role”, “unknown”) raw_question message.content # 权限检查 if not check_permission(username, “ask_question”): deny_reason f“角色 ‘{user_role}’ 无提问权限” # 【审计点】记录权限拒绝 log_permission_denied(username, user_role, deny_reason, raw_question) await cl.Message(contentf“抱歉{deny_reason}。请联系管理员。”).send() return # 内容验证 validation_result validate_question_content(username, raw_question) if not validation_result[“allowed”]: deny_reason validation_result[“reason”] # 【审计点】记录内容违规拒绝 log_permission_denied(username, user_role, deny_reason, raw_question) await cl.Message(contentf“提问内容未通过安全检查。原因{deny_reason}”).send() return # 处理问题... msg cl.Message(content“”) await msg.send() try: model_response await call_secgpt_model(raw_question) # 【审计点】记录成功的提问和响应预览 # 注意实际token使用量可能需要从vLLM响应头中获取 response_preview model_response[:200] (“…” if len(model_response) 200 else “”) log_question_asked(username, user_role, raw_question, response_preview) # tokens_used可后续添加 await msg.stream_token(model_response) except Exception as e: # 【审计点】可以记录模型调用失败事件 await msg.stream_token(f“调用模型时出错{str(e)}”) await msg.update()现在你的应用已经具备了完整的审计能力。所有关键操作都会被记录到audit_logs/目录下按日期分割的日志文件中格式为JSON Lines便于后续使用日志分析工具如ELK Stack或简单脚本进行查询和分析。5. 功能验证与测试代码写完了我们得试试它是不是真的管用。5.1 重启并测试应用重启Chainlit服务chainlit run app.py打开浏览器访问Chainlit提供的地址通常是http://localhost:8000。登录测试使用analyst/analyst_pass登录应该成功。使用错误的密码登录应该失败前端提示。权限测试用intern账号登录尝试提问“什么是XSS攻击”应该成功。用intern账号登录尝试提问“有没有最新的0day漏洞”应该被拦截并看到提示信息。审计日志检查在项目根目录下查看audit_logs/文件夹。打开当天的日志文件如audit_2025_02_17.log你应该能看到类似下面的JSON记录{timestamp: 2025-02-17T10:15:30.123456, event_type: USER_LOGIN, user: {username: analyst, role: security_analyst}, details: {action: user_authentication}, success: true} {timestamp: 2025-02-17T10:16:45.654321, event_type: PERMISSION_DENIED, user: {username: intern, role: intern}, details: {reason: 问题包含受限关键词 0day, question_blocked: 有没有最新的0day漏洞, action: permission_check}, success: false} {timestamp: 2025-02-17T10:17:20.987654, event_type: QUESTION_ASKED, user: {username: analyst, role: security_analyst}, details: {question: 什么是XSS攻击, response_preview: XSS跨站脚本攻击是一种..., action: model_query}, success: true}5.2 查看日志文件日志文件是纯文本的JSON Lines格式你可以用任何文本编辑器查看也可以用命令行工具快速分析# 查看今天的日志 tail -f audit_logs/audit_2025_02_17.log # 使用jq工具如果已安装美化查看和过滤 cat audit_logs/audit_2025_02_17.log | jq ‘.’ # 查找所有失败事件 cat audit_logs/audit_2025_02_17.log | jq ‘select(.success false)’6. 总结与进阶思考恭喜你你已经成功为你的SecGPT-14B Chainlit应用加装了“门禁”权限控制和“监控”审计日志。让我们回顾一下关键步骤和收获权限控制的核心我们通过auth.py模块管理用户和角色利用Chainlit的password_auth_callback实现登录认证并在消息处理函数中根据角色和规则对问题内容进行双重校验功能权限内容合规。审计日志的实现我们通过logger.py模块提供了结构化的日志记录功能并在app.py中的登录、提问、拒绝等关键节点埋点将所有重要操作以JSON格式按天归档。安全与实用兼顾示例代码中使用了密码加盐哈希、权限规则与内容关键词过滤为你的AI安全助手构建了基础但有效的安全防线。6.1 下一步可以做什么现在的实现是一个坚实起点你可以根据实际需求进一步强化它更安全的用户存储将USERS_DB从代码移到真正的数据库如SQLite、PostgreSQL并使用bcrypt或argon2等专业库哈希密码。更精细的权限模型实现RBAC基于角色的访问控制或ABAC基于属性的访问控制例如控制用户只能访问特定分类的知识库。更智能的内容过滤除了关键词可以集成一个轻量级文本分类模型在问题提交给SecGPT-14B之前先判断其所属的安全领域和敏感级别。日志分析与告警编写脚本定期分析审计日志自动检测异常模式如某个用户短时间内大量提问失败、频繁尝试敏感关键词并发送邮件或Slack告警。前端体验优化在Chainlit UI上显示当前登录用户和角色或者为管理员增加一个查看实时日志的页面。通过本教程你不仅为SecGPT-14B添加了关键的安全功能也掌握了一套为AI应用添加基础安全层的通用方法。希望你能在此基础上打造出更强大、更安全的智能安全运营助手。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。