智能WAF规则生成器
智能WAF规则生成器
Moonbeaut智能WAF规则生成器设计思路
一、项目核心架构设计
1. 系统流程图
[原始日志] → 数据清洗 → 特征提取 → 规则生成 → [WAF规则文件] |
2. 技术选型
模块 | 推荐方案 | 替代方案 |
---|---|---|
日志解析 | Python + Loguru + 正则表达式 | ELK Stack (高资源需求) |
特征提取 | Scikit-learn TF-IDF / 自定义规则树 | PySpark (大数据量场景) |
规则生成 | 决策树模型 / 关联规则分析 | 深度学习(LSTM) |
规则测试 | ModSecurity + OWASP CRS | Nginx + Lua WAF |
可视化 | Flask + ECharts | Django + D3.js |
二、分步实现指南
阶段1:日志收集与预处理
目标:构建可处理真实攻击日志的流水线
获取样本数据:
# 使用公开攻击日志数据集
# GitHub资源:https://github.com/elastic/examples/tree/master/Security%20Analytics
import pandas as pd
logs = pd.read_csv('http_attack_logs.csv')
print(logs[['timestamp', 'uri', 'user_agent']].head())日志清洗模板:
import re
from loguru import logger
def sanitize_log(log_entry):
# 移除非ASCII字符
cleaned = log_entry.encode('ascii', 'ignore').decode()
# 标准化SQL注入特征
injection_patterns = [
r'(union\s+select)',
r'(sleep\(\d+\))',
r'(\bexec\b.+select)'
]
for pattern in injection_patterns:
cleaned = re.sub(pattern, '[SQLi]', cleaned, flags=re.IGNORECASE)
logger.info(f"Cleaned log: {cleaned}")
return cleaned
阶段2:特征工程
关键点:提取攻击行为的可量化特征
基础特征提取:
def extract_basic_features(log):
features = {}
# URL长度异常检测
features['url_length'] = len(log['uri'])
# 特殊字符统计
features['special_chars'] = sum(1 for c in log['uri'] if c in ['\'','"',';','%'])
# HTTP方法分布
features['http_method'] = 1 if log['method'] not in ['GET','POST'] else 0
return featuresNLP特征处理(可选):
from sklearn.feature_extraction.text import TfidfVectorizer
corpus = [log['uri'] for log in logs]
vectorizer = TfidfVectorizer(ngram_range=(2,3), max_features=100)
X = vectorizer.fit_transform(corpus)
阶段3:规则生成引擎
实现两种模式:基于规则匹配(快速启动) + 机器学习(进阶)
基于规则的生成器:
rule_templates = {
'SQLi': {
'conditions': [
{'feature': 'special_chars', 'op': '>=', 'value': 3},
{'feature': 'url_length', 'op': '>', 'value': 100}
],
'action': 'deny',
'rule': 'SecRule ARGS "@detectSQLi" "id:1001,deny,status:403"'
},
'XSS': {
'conditions': [...],
'action': 'block'
}
}
def generate_rules(features):
activated_rules = []
for rule_name, config in rule_templates.items():
if all(eval_condition(feat, config['conditions']) for feat in features):
activated_rules.append(config['rule'])
return activated_rules机器学习方案(示例使用决策树):
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
# 假设已有标注数据 X_features, y_labels
X_train, X_test, y_train, y_test = train_test_split(X_features, y_labels)
clf = DecisionTreeClassifier(max_depth=5)
clf.fit(X_train, y_train)
# 将决策树转换为可读规则
from sklearn.tree import export_text
print(export_text(clf, feature_names=feature_names))
阶段4:规则测试验证
搭建测试环境:
# 使用Docker快速部署ModSecurity + OWASP CRS |
自动化测试脚本:
import requests |
三、开发路线图(4周计划)
周次 | 里程碑 | 交付物 |
---|---|---|
1 | 数据管道搭建 | 可处理10万条日志的清洗脚本 |
2 | 核心规则引擎完成 | 支持5种基础攻击模式的规则生成 |
3 | 集成机器学习模块 | 准确率>85%的决策树模型 |
4 | 可视化Dashboard | 支持实时日志分析的前端界面 |
四、避坑指南
性能优化:
使用PyPy替代CPython提升处理速度
对正则表达式做预编译:
precompiled_patterns = [re.compile(p) for p in injection_patterns]
规则冲突处理:
def resolve_rule_conflicts(rules):
# 按优先级排序
priority_order = ['SQLi', 'XSS', 'PathTraversal']
return sorted(rules, key=lambda x: priority_order.index(x['type']))误报率控制:
- 引入白名单机制:
whitelist = ['/api/healthcheck', '/static/']
if any(log['uri'].startswith(path) for path in whitelist):
return []
五、进阶优化方向
实时流处理:
# 使用Apache Kafka + Faust
import faust
app = faust.App('waf-learner', broker='kafka://localhost')
topic = app.topic('raw-logs')
async def process_logs(stream):
async for log in stream:
cleaned = sanitize_log(log)
features = extract_features(cleaned)
rules = generate_rules(features)
await send_to_waf(rules)强化学习应用:
# 使用OpenAI Gym自定义环境
class WAFEnv(gym.Env):
def __init__(self):
self.action_space = spaces.Discrete(3) # allow, block, challenge
self.observation_space = ... # 特征向量
def step(self, action):
# 执行动作并获取reward
reward = calculate_reward(action, actual_threat)
return next_state, reward, done, info
六、项目展示建议
GitHub仓库结构:
/Smart-WAF-Generator
├── /data_samples # 测试日志样本
├── /docs # 项目文档
├── engine.py # 核心规则引擎
├── requirements.txt # 依赖列表
└── tests/ # 单元测试成果演示重点:
- 对比传统WAF的规则生成速度
- 展示误报率/漏报率测试数据
- 可视化攻击模式聚类分析图
七、资源直通车
- 必读文档:
- 调试工具:
- WAF测试工具:sqlmap、XSSer
- 流量分析:Wireshark + ModSecurity Audit Log
评论
匿名评论隐私政策
✅ 你无需删除空行,直接评论以获取最佳展示效果