How to build an AI-powered content moderation pipeline for user comments

How to build an AI-powered content moderation pipeline for user comments

如何构建一个基于 AI 的用户评论内容审核流水线

Comment sections and user-submitted content are an attack surface. Spam bots, coordinated harassment, phishing links disguised as helpful replies — if you ship a public-facing form or discussion feature, you will encounter all of these within days. 评论区和用户提交的内容是攻击面所在。垃圾信息机器人、协同骚扰、伪装成有用回复的钓鱼链接——如果你发布了一个面向公众的表单或讨论功能,几天之内你就会遇到所有这些问题。

Rule-based filters (regex, keyword lists) have ~60-70% precision at best and generate constant maintenance overhead. An LLM-based classifier can handle nuanced toxic content, context-dependent spam, and subtle manipulation that keyword filters miss entirely. 基于规则的过滤器(正则表达式、关键词列表)的最佳精确度仅为 60-70% 左右,且会产生持续的维护开销。基于大语言模型(LLM)的分类器可以处理关键词过滤器完全无法识别的细微毒性内容、依赖上下文的垃圾信息以及隐蔽的操纵行为。

This tutorial builds a complete moderation pipeline in Python: receive a comment, classify it with an LLM, cache repeated inputs, process batches efficiently, and route borderline cases to a human review queue. The same architecture works for form submissions, support tickets, forum posts, and any other user-generated text. For organizations managing content at scale, this pairs well with the broader security controls described in practical security guides. 本教程将使用 Python 构建一个完整的审核流水线:接收评论、使用 LLM 进行分类、缓存重复输入、高效处理批次,并将边缘情况路由至人工审核队列。同样的架构也适用于表单提交、支持工单、论坛帖子以及任何其他用户生成的内容。对于大规模管理内容的企业,这可以与实用安全指南中描述的更广泛的安全控制措施良好结合。

Architecture overview

架构概览

User comment


Cache lookup (Redis/dict) ──hit──▶ cached decision
│ miss

Batch accumulator (up to 20 items or 500ms)


LLM classifier (structured JSON output)

├── safe ──────────────▶ publish immediately
├── spam/toxic ────────▶ auto-reject + log
└── borderline (< 0.75)▶ human review queue

Setup

环境准备

pip install openai redis pydantic python-dotenv

For local development, run Redis: 对于本地开发,请运行 Redis:

docker run -d -p 6379:6379 redis:alpine

Data models

数据模型

# models.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time

class ContentCategory(str, Enum):
    SAFE = "safe"
    SPAM = "spam"
    TOXIC = "toxic"
    PHISHING = "phishing"
    BORDERLINE = "borderline"

@dataclass
class ModerationResult:
    category: ContentCategory
    confidence: float # 0.0 – 1.0
    reasoning: str # one sentence
    flags: list[str] # e.g. ["contains_url", "promotional_language"]
    needs_human_review: bool
    processing_time_ms: float
    cache_hit: bool = False

@dataclass
class PendingComment:
    comment_id: str
    text: str
    user_id: str
    submitted_at: float = field(default_factory=time.time)
    context: Optional[str] = None # e.g. article slug or thread title

LLM classifier with structured output

具有结构化输出的 LLM 分类器

# classifier.py
import json
import hashlib
import time
import logging
from typing import Optional
from openai import OpenAI
from models import ContentCategory, ModerationResult

logger = logging.getLogger(__name__)
llm_client = OpenAI(
    api_key="your_api_key",
    base_url="https://api.your-llm-provider.com/v1",
)

SYSTEM_PROMPT = """You are a content moderation classifier. Analyze submitted text and return a JSON object with exactly these fields:
- "category": one of "safe", "spam", "toxic", "phishing", "borderline"
- "confidence": float 0.0-1.0 (your certainty in the classification)
- "reasoning": one sentence explaining the decision
- "flags": array of strings identifying specific issues (empty array if safe)

Categories:
- safe: legitimate user content, on-topic discussion, genuine questions
- spam: promotional content, repeated phrases, unsolicited advertising, SEO link drops
- toxic: harassment, hate speech, threats, personal attacks, profanity targeting users
- phishing: credential harvesting, fake login prompts, deceptive links, scam patterns
- borderline: ambiguous content that requires human judgment

Return ONLY the JSON object. No prose."""

def classify_single(text: str, context: Optional[str] = None, model: str = "gpt-4o-mini") -> dict:
    """Call LLM for a single text. Returns raw parsed dict."""
    user_content = f"Text to classify:\n{text}"
    if context:
        user_content = f"Context: {context}\n\n{user_content}"
    
    response = llm_client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
        temperature=0.0, # deterministic for classification
        max_tokens=200,
        response_format={"type": "json_object"}, # forces JSON output
    )
    raw = response.choices[0].message.content
    return json.loads(raw)

def classify_batch(texts: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
    """
    Classify multiple texts in a single API call.
    texts: [{"id": str, "text": str, "context": Optional[str]}]
    Returns results in the same order.
    """
    if not texts: return []

    # Build a numbered batch prompt
    items_block = "\n\n".join(
        f'Item {i+1} (id={item["id"]}):\n{item["text"][:800]}' + 
        (f'\nContext: {item["context"]}' if item.get("context") else "")
        for i, item in enumerate(texts)
    )
    
    batch_prompt = f"""Classify each of the following {len(texts)} items. Return a JSON array where each element corresponds to one item, in order. Each element must have: id, category, confidence, reasoning, flags.

{items_block}"""

    response = llm_client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT.replace(
                "Return ONLY the JSON object.", 
                "Return ONLY a JSON array of objects, one per item."
            )},
            {"role": "user", "content": batch_prompt},
        ],
        temperature=0.0,
        max_tokens=100 * len(texts),
        response_format={"type": "json_object"},
    )
    raw = response.choices[0].message.content
    parsed = json.loads(raw)
    
    # The model may return {"results": [...]} or a bare array
    if isinstance(parsed, dict):
        for key in ("results", "items", "classifications"):
            if key in parsed and isinstance(parsed[key], list):
                return parsed[key]
    if isinstance(parsed, list):
        return parsed
        
    logger.warning("Unexpected batch response structure: %s", raw[:200])
    return []

Cost estimation

成本估算

Before running at scale, understand what you’re paying. 在大规模运行之前,请先了解你的成本支出。

# cost.py
# Approximate token counts for moderation
SYSTEM_PROMPT_TOKENS = 180 # fixed per call
TOKENS_PER_COMMENT = 60 # average user comment
OUTPUT_TOKENS = 80 # JSON response

def estimate_cost(num_comments: int, batch_size: int = 20, 
                  input_price_per_1k: float = 0.00015, # gpt-4o-mini pricing
                  output_price_per_1k: float = 0.0006) -> dict:
    """
    Estimate API cost for moderating num_comments. 
    Batch calls amortize the system prompt cost.
    """
    num_batches = -(-num_comments // batch_size) # ceil division
    # Per batch: 1 system prompt + batch_size * comment tokens