How to build an AI-powered content moderation pipeline for user comments
How to build an AI-powered content moderation pipeline for user comments
如何构建一个基于 AI 的用户评论内容审核流水线
Comment sections and user-submitted content are an attack surface. Spam bots, coordinated harassment, phishing links disguised as helpful replies — if you ship a public-facing form or discussion feature, you will encounter all of these within days. 评论区和用户提交的内容是攻击面所在。垃圾信息机器人、协同骚扰、伪装成有用回复的钓鱼链接——如果你发布了一个面向公众的表单或讨论功能,几天之内你就会遇到所有这些问题。
Rule-based filters (regex, keyword lists) have ~60-70% precision at best and generate constant maintenance overhead. An LLM-based classifier can handle nuanced toxic content, context-dependent spam, and subtle manipulation that keyword filters miss entirely. 基于规则的过滤器(正则表达式、关键词列表)的最佳精确度仅为 60-70% 左右,且会产生持续的维护开销。基于大语言模型(LLM)的分类器可以处理关键词过滤器完全无法识别的细微毒性内容、依赖上下文的垃圾信息以及隐蔽的操纵行为。
This tutorial builds a complete moderation pipeline in Python: receive a comment, classify it with an LLM, cache repeated inputs, process batches efficiently, and route borderline cases to a human review queue. The same architecture works for form submissions, support tickets, forum posts, and any other user-generated text. For organizations managing content at scale, this pairs well with the broader security controls described in practical security guides. 本教程将使用 Python 构建一个完整的审核流水线:接收评论、使用 LLM 进行分类、缓存重复输入、高效处理批次,并将边缘情况路由至人工审核队列。同样的架构也适用于表单提交、支持工单、论坛帖子以及任何其他用户生成的内容。对于大规模管理内容的企业,这可以与实用安全指南中描述的更广泛的安全控制措施良好结合。
Architecture overview
架构概览
User comment
│
▼
Cache lookup (Redis/dict) ──hit──▶ cached decision
│ miss
▼
Batch accumulator (up to 20 items or 500ms)
│
▼
LLM classifier (structured JSON output)
│
├── safe ──────────────▶ publish immediately
├── spam/toxic ────────▶ auto-reject + log
└── borderline (< 0.75)▶ human review queue
Setup
环境准备
pip install openai redis pydantic python-dotenv
For local development, run Redis: 对于本地开发,请运行 Redis:
docker run -d -p 6379:6379 redis:alpine
Data models
数据模型
# models.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time
class ContentCategory(str, Enum):
SAFE = "safe"
SPAM = "spam"
TOXIC = "toxic"
PHISHING = "phishing"
BORDERLINE = "borderline"
@dataclass
class ModerationResult:
category: ContentCategory
confidence: float # 0.0 – 1.0
reasoning: str # one sentence
flags: list[str] # e.g. ["contains_url", "promotional_language"]
needs_human_review: bool
processing_time_ms: float
cache_hit: bool = False
@dataclass
class PendingComment:
comment_id: str
text: str
user_id: str
submitted_at: float = field(default_factory=time.time)
context: Optional[str] = None # e.g. article slug or thread title
LLM classifier with structured output
具有结构化输出的 LLM 分类器
# classifier.py
import json
import hashlib
import time
import logging
from typing import Optional
from openai import OpenAI
from models import ContentCategory, ModerationResult
logger = logging.getLogger(__name__)
llm_client = OpenAI(
api_key="your_api_key",
base_url="https://api.your-llm-provider.com/v1",
)
SYSTEM_PROMPT = """You are a content moderation classifier. Analyze submitted text and return a JSON object with exactly these fields:
- "category": one of "safe", "spam", "toxic", "phishing", "borderline"
- "confidence": float 0.0-1.0 (your certainty in the classification)
- "reasoning": one sentence explaining the decision
- "flags": array of strings identifying specific issues (empty array if safe)
Categories:
- safe: legitimate user content, on-topic discussion, genuine questions
- spam: promotional content, repeated phrases, unsolicited advertising, SEO link drops
- toxic: harassment, hate speech, threats, personal attacks, profanity targeting users
- phishing: credential harvesting, fake login prompts, deceptive links, scam patterns
- borderline: ambiguous content that requires human judgment
Return ONLY the JSON object. No prose."""
def classify_single(text: str, context: Optional[str] = None, model: str = "gpt-4o-mini") -> dict:
"""Call LLM for a single text. Returns raw parsed dict."""
user_content = f"Text to classify:\n{text}"
if context:
user_content = f"Context: {context}\n\n{user_content}"
response = llm_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
temperature=0.0, # deterministic for classification
max_tokens=200,
response_format={"type": "json_object"}, # forces JSON output
)
raw = response.choices[0].message.content
return json.loads(raw)
def classify_batch(texts: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
"""
Classify multiple texts in a single API call.
texts: [{"id": str, "text": str, "context": Optional[str]}]
Returns results in the same order.
"""
if not texts: return []
# Build a numbered batch prompt
items_block = "\n\n".join(
f'Item {i+1} (id={item["id"]}):\n{item["text"][:800]}' +
(f'\nContext: {item["context"]}' if item.get("context") else "")
for i, item in enumerate(texts)
)
batch_prompt = f"""Classify each of the following {len(texts)} items. Return a JSON array where each element corresponds to one item, in order. Each element must have: id, category, confidence, reasoning, flags.
{items_block}"""
response = llm_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT.replace(
"Return ONLY the JSON object.",
"Return ONLY a JSON array of objects, one per item."
)},
{"role": "user", "content": batch_prompt},
],
temperature=0.0,
max_tokens=100 * len(texts),
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content
parsed = json.loads(raw)
# The model may return {"results": [...]} or a bare array
if isinstance(parsed, dict):
for key in ("results", "items", "classifications"):
if key in parsed and isinstance(parsed[key], list):
return parsed[key]
if isinstance(parsed, list):
return parsed
logger.warning("Unexpected batch response structure: %s", raw[:200])
return []
Cost estimation
成本估算
Before running at scale, understand what you’re paying. 在大规模运行之前,请先了解你的成本支出。
# cost.py
# Approximate token counts for moderation
SYSTEM_PROMPT_TOKENS = 180 # fixed per call
TOKENS_PER_COMMENT = 60 # average user comment
OUTPUT_TOKENS = 80 # JSON response
def estimate_cost(num_comments: int, batch_size: int = 20,
input_price_per_1k: float = 0.00015, # gpt-4o-mini pricing
output_price_per_1k: float = 0.0006) -> dict:
"""
Estimate API cost for moderating num_comments.
Batch calls amortize the system prompt cost.
"""
num_batches = -(-num_comments // batch_size) # ceil division
# Per batch: 1 system prompt + batch_size * comment tokens