智能WAF规则生成器

智能WAF规则生成器设计思路

一、项目核心架构设计

1. 系统流程图

[原始日志] → 数据清洗 → 特征提取 → 规则生成 → [WAF规则文件]
│ │
├─ 机器学习模型(可选) ─┤
└─ 规则匹配引擎 ───────┘

2. 技术选型

模块 推荐方案 替代方案
日志解析 Python + Loguru + 正则表达式 ELK Stack (高资源需求)
特征提取 Scikit-learn TF-IDF / 自定义规则树 PySpark (大数据量场景)
规则生成 决策树模型 / 关联规则分析 深度学习(LSTM)
规则测试 ModSecurity + OWASP CRS Nginx + Lua WAF
可视化 Flask + ECharts Django + D3.js

二、分步实现指南

阶段1:日志收集与预处理

目标:构建可处理真实攻击日志的流水线

  1. 获取样本数据

    # 使用公开攻击日志数据集
    # GitHub资源:https://github.com/elastic/examples/tree/master/Security%20Analytics
    import pandas as pd

    logs = pd.read_csv('http_attack_logs.csv')
    print(logs[['timestamp', 'uri', 'user_agent']].head())
  2. 日志清洗模板

    import re
    from loguru import logger

    def sanitize_log(log_entry):
    # 移除非ASCII字符
    cleaned = log_entry.encode('ascii', 'ignore').decode()

    # 标准化SQL注入特征
    injection_patterns = [
    r'(union\s+select)',
    r'(sleep\(\d+\))',
    r'(\bexec\b.+select)'
    ]
    for pattern in injection_patterns:
    cleaned = re.sub(pattern, '[SQLi]', cleaned, flags=re.IGNORECASE)

    logger.info(f"Cleaned log: {cleaned}")
    return cleaned

阶段2:特征工程

关键点:提取攻击行为的可量化特征

  1. 基础特征提取

    def extract_basic_features(log):
    features = {}

    # URL长度异常检测
    features['url_length'] = len(log['uri'])

    # 特殊字符统计
    features['special_chars'] = sum(1 for c in log['uri'] if c in ['\'','"',';','%'])

    # HTTP方法分布
    features['http_method'] = 1 if log['method'] not in ['GET','POST'] else 0

    return features
  2. NLP特征处理(可选)

    from sklearn.feature_extraction.text import TfidfVectorizer

    corpus = [log['uri'] for log in logs]
    vectorizer = TfidfVectorizer(ngram_range=(2,3), max_features=100)
    X = vectorizer.fit_transform(corpus)

阶段3:规则生成引擎

实现两种模式:基于规则匹配(快速启动) + 机器学习(进阶)

  1. 基于规则的生成器

    rule_templates = {
    'SQLi': {
    'conditions': [
    {'feature': 'special_chars', 'op': '>=', 'value': 3},
    {'feature': 'url_length', 'op': '>', 'value': 100}
    ],
    'action': 'deny',
    'rule': 'SecRule ARGS "@detectSQLi" "id:1001,deny,status:403"'
    },
    'XSS': {
    'conditions': [...],
    'action': 'block'
    }
    }

    def generate_rules(features):
    activated_rules = []
    for rule_name, config in rule_templates.items():
    if all(eval_condition(feat, config['conditions']) for feat in features):
    activated_rules.append(config['rule'])
    return activated_rules
  2. 机器学习方案(示例使用决策树)

    from sklearn.tree import DecisionTreeClassifier
    from sklearn.model_selection import train_test_split

    # 假设已有标注数据 X_features, y_labels
    X_train, X_test, y_train, y_test = train_test_split(X_features, y_labels)

    clf = DecisionTreeClassifier(max_depth=5)
    clf.fit(X_train, y_train)

    # 将决策树转换为可读规则
    from sklearn.tree import export_text
    print(export_text(clf, feature_names=feature_names))

阶段4:规则测试验证

搭建测试环境

# 使用Docker快速部署ModSecurity + OWASP CRS
docker run -p 80:80 -v $(pwd)/rules:/etc/modsecurity.d/rules owasp/modsecurity-crs

自动化测试脚本

import requests

def test_rule(rule_file, test_cases):
# 加载新规则到WAF
with open('/etc/modsecurity.d/rules/custom_rules.conf', 'w') as f:
f.write(rule_file)

# 发送测试请求
results = {}
for case in test_cases:
resp = requests.get(f"http://localhost?input={case['payload']}")
results[case['type']] = 'Blocked' if resp.status_code == 403 else 'Allowed'

return results

# 示例测试用例
test_cases = [
{'type': 'SQLi', 'payload': "' OR 1=1--"},
{'type': 'XSS', 'payload': "<script>alert(1)</script>"}
]

三、开发路线图(4周计划)

周次 里程碑 交付物
1 数据管道搭建 可处理10万条日志的清洗脚本
2 核心规则引擎完成 支持5种基础攻击模式的规则生成
3 集成机器学习模块 准确率>85%的决策树模型
4 可视化Dashboard 支持实时日志分析的前端界面

四、避坑指南

  1. 性能优化

    • 使用PyPy替代CPython提升处理速度

    • 对正则表达式做预编译:

      precompiled_patterns = [re.compile(p) for p in injection_patterns]
  2. 规则冲突处理

    def resolve_rule_conflicts(rules):
    # 按优先级排序
    priority_order = ['SQLi', 'XSS', 'PathTraversal']
    return sorted(rules, key=lambda x: priority_order.index(x['type']))
  3. 误报率控制

    • 引入白名单机制:
    whitelist = ['/api/healthcheck', '/static/']
    if any(log['uri'].startswith(path) for path in whitelist):
    return []

五、进阶优化方向

  1. 实时流处理

    # 使用Apache Kafka + Faust
    import faust

    app = faust.App('waf-learner', broker='kafka://localhost')
    topic = app.topic('raw-logs')

    @app.agent(topic)
    async def process_logs(stream):
    async for log in stream:
    cleaned = sanitize_log(log)
    features = extract_features(cleaned)
    rules = generate_rules(features)
    await send_to_waf(rules)
  2. 强化学习应用

    # 使用OpenAI Gym自定义环境
    class WAFEnv(gym.Env):
    def __init__(self):
    self.action_space = spaces.Discrete(3) # allow, block, challenge
    self.observation_space = ... # 特征向量

    def step(self, action):
    # 执行动作并获取reward
    reward = calculate_reward(action, actual_threat)
    return next_state, reward, done, info

六、项目展示建议

  1. GitHub仓库结构

    /Smart-WAF-Generator
    ├── /data_samples # 测试日志样本
    ├── /docs # 项目文档
    ├── engine.py # 核心规则引擎
    ├── requirements.txt # 依赖列表
    └── tests/ # 单元测试
  2. 成果演示重点

    • 对比传统WAF的规则生成速度
    • 展示误报率/漏报率测试数据
    • 可视化攻击模式聚类分析图

七、资源直通车

  1. 必读文档
  2. 调试工具
    • WAF测试工具:sqlmap、XSSer
    • 流量分析:Wireshark + ModSecurity Audit Log