Ideas

Your Switches Are Screaming. Nobody's Listening.

Building a real-time AI log analyzer from scratch with a Unix box, a few devices, and zero enterprise software. Here's the actual POC.

Every switch in your network is having a conversation. Right now. It’s spitting out syslog messages at a rate that would make a teenager’s group chat look quiet — interface state changes, spanning-tree recalculations, authentication failures, power supply warnings, fan speed alerts, NTP sync losses, memory allocation errors. Hundreds of messages per minute, per device.

And nobody’s reading them.

Oh sure, they’re going somewhere. Maybe a syslog server that stores them in flat files nobody opens. Maybe a SIEM that indexes them and charges you per gigabyte for the privilege. Maybe they’re hitting the default buffer on the switch itself, silently rolling over and vanishing into the void every 4,096 messages.

But actually understanding them in real time? Correlating that the three switches on floors four, five, and six all went down within the same 90-second window — and that probably means a power issue, not three simultaneous hardware failures? Noticing that your storage array has been creeping up 2% per day and you’ve got about two weeks before it hits the wall?

That’s the stuff that falls through the cracks. Not because the data isn’t there, but because the volume buries the signal so deep that human eyes can’t find it in time.

I’ve been digging into how AI can actually process this firehose in real time, and I found that the answer isn’t one technology — it’s a layered system where each layer does exactly one job. The deeper I went, the more I realized you could build a working proof of concept with a Unix box, a few network devices, and about $0 in software licensing.

Here’s how.

The Two-Brain Problem

The naive approach to “AI-powered log analysis” goes something like this: pipe all your logs into ChatGPT and ask it what’s wrong. And I’ll be honest, for a home lab with three devices, that might actually work. But for anything resembling a real environment — say 200 firewalls and 5,000 switches — you hit a wall immediately.

A busy Cisco switch can generate 50-200 syslog messages per minute under normal conditions. A firewall doing deep packet inspection on a busy link? Easily 500-1,000 messages per minute. Let’s do the math on a modest enterprise:

The Firehose — Typical Enterprise Syslog Volume
════════════════════════════════════════════════

200 firewalls × 500 msg/min    = 100,000 msg/min
5,000 switches × 100 msg/min   = 500,000 msg/min
500 servers × 50 msg/min        = 25,000 msg/min
200 APs × 20 msg/min            = 4,000 msg/min
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Total:                           ~629,000 msg/min
                                 ~10,500 msg/sec
                                 ~900 million msg/day

At an average of 200 tokens per log message:
Token consumption:               ~2.1 billion tokens/day

Claude Sonnet at $3/million input tokens:
Daily cost:                      ~$6,300/day
Monthly cost:                    ~$189,000/month

Just to READ them. Not analyze. Not correlate. Just ingest.

So yeah. Throwing an LLM at raw syslog streams is like hiring a Supreme Court justice to read your junk mail. Technically possible, catastrophically expensive, and a waste of a very capable brain on a very stupid task.

The solution isn’t to avoid AI. It’s to use the right kind of AI at each layer. And this is where it gets interesting, because most of what happens in the first layer isn’t AI at all.

Tier 1: The Mechanical Bouncer

The first layer of any real-time log processing pipeline is pure, old-school, deterministic filtering. No neural networks. No model inference. Just regex, pattern matching, and conditional logic running at wire speed.

This is the part that ChatGPT glossed over when I asked about it, and honestly, it’s the most important part. Because this layer is responsible for reducing 10,000 messages per second down to maybe 10-50 that are actually worth thinking about.

Here’s what Tier 1 actually does:

# Tier 1: Deterministic pre-filter
# This isn't AI. This is a bouncer at the door.

import re
from dataclasses import dataclass
from enum import Enum

class Severity(Enum):
    EMERGENCY = 0    # System unusable
    ALERT = 1        # Immediate action needed
    CRITICAL = 2     # Critical conditions
    ERROR = 3        # Error conditions
    WARNING = 4      # Warning conditions
    NOTICE = 5       # Normal but significant
    INFO = 6         # Informational
    DEBUG = 7        # Debug messages

# Syslog messages follow RFC 5424 format
# <priority>version timestamp hostname app-name procid msgid structured-data msg

NOISE_PATTERNS = [
    r"LINEPROTO-5-UPDOWN.*changed state to up",     # Interface flap recovery
    r"SYS-5-CONFIG_I",                                # Config saved (routine)
    r"SEC_LOGIN-5-LOGIN_SUCCESS",                     # Successful logins (boring)
    r"LINK-3-UPDOWN.*GigabitEthernet0/0/0.*down",   # Known maintenance window
    r"SNMP-3-AUTHFAIL.*public",                       # SNMP community misconfig (known)
    r"CDP-4-NATIVE_VLAN_MISMATCH",                    # Ongoing known issue #4471
    r"STP-2-LOOPGUARD_BLOCK",                         # Handled by STP, self-healing
]

ESCALATION_PATTERNS = [
    (r"PLATFORM-2-PF_PWRSPLY", "power_supply_failure"),
    (r"DUAL-5-NBRCHANGE.*down", "routing_neighbor_down"),
    (r"LINK-3-UPDOWN.*TenGigabit.*down", "uplink_down"),
    (r"SEC_LOGIN-4-LOGIN_FAILED.*repeated", "brute_force_attempt"),
    (r"SYS-2-MALLOCFAIL", "memory_exhaustion"),
    (r"PLATFORM-4-ELEMENT_WARNING.*Temperature", "thermal_warning"),
    (r"FAN-3-FAN_FAILED", "fan_failure"),
    (r"STACKMGR-4-STACK_LINK_CHANGE.*removed", "stack_member_lost"),
]

@dataclass
class FilteredEvent:
    timestamp: str
    source_ip: str
    hostname: str
    severity: Severity
    raw_message: str
    category: str
    escalate_to_ai: bool

def tier1_filter(raw_syslog: str) -> FilteredEvent | None:
    """
    Pure mechanical filtering. No AI. No inference.
    Returns None for messages we don't care about.
    Returns a FilteredEvent for anything that survives.
    """
    # Parse syslog priority to get severity
    severity = parse_severity(raw_syslog)

    # Drop DEBUG and INFO immediately — that's ~70% of all messages
    if severity.value >= Severity.INFO.value:
        return None

    # Drop known noise patterns — another ~20%
    for pattern in NOISE_PATTERNS:
        if re.search(pattern, raw_syslog):
            return None

    # Check for known escalation patterns
    for pattern, category in ESCALATION_PATTERNS:
        if re.search(pattern, raw_syslog):
            return FilteredEvent(
                timestamp=parse_timestamp(raw_syslog),
                source_ip=parse_source(raw_syslog),
                hostname=parse_hostname(raw_syslog),
                severity=severity,
                category=category,
                raw_message=raw_syslog,
                escalate_to_ai=True,
            )

    # Anything else that survived filtering but didn't match
    # an escalation pattern: pass through but don't escalate
    return FilteredEvent(
        timestamp=parse_timestamp(raw_syslog),
        source_ip=parse_source(raw_syslog),
        hostname=parse_hostname(raw_syslog),
        severity=severity,
        category="unclassified",
        raw_message=raw_syslog,
        escalate_to_ai=(severity.value <= Severity.ERROR.value),
    )

Is this AI? No. Not even close. It’s if statements with regex. Your grandma could understand the logic (well, probably not the regex). And yes, it still requires humans to maintain the pattern lists and keep them up to date.

But here’s the thing: this layer isn’t trying to be smart. Its only job is to be fast and to not let the obviously boring stuff through. And it does that job at hundreds of thousands of messages per second on a single CPU core. That speed is the whole point. You’re trading intelligence for throughput, and at this layer, that’s exactly the right trade.

The output of Tier 1 in a typical enterprise? Maybe 50-200 events per minute instead of 10,000 per second. That’s a 99.97% reduction in volume. And now you’ve got something an actual AI can chew on without melting your budget.

“The art of being wise is the art of knowing what to overlook.” — William James

He was talking about philosophy, but he could’ve been talking about syslog.

Tier 2: The Lightweight Machine Learning Models

Here’s where it stops being traditional IT and starts being actual data science. And this is the part that fascinated me the most, because these aren’t LLMs. They’re not language models at all. They don’t understand English. They don’t generate text. They eat numbers and spit out probabilities.

The models that run at this tier are fundamentally different from ChatGPT or Claude. They’re specialized mathematical engines that do one thing exceptionally well: find patterns in numerical data that humans would miss.

Anomaly Detection: The Isolation Forest

Let me explain how an isolation forest works, because it’s beautifully simple once you see it.

Imagine you have a scatter plot of data points. Most of them are clustered together — that’s your “normal.” A few outliers are way off by themselves — those are your anomalies.

An isolation forest builds a bunch of random decision trees. At each node, it picks a random feature and a random split point. Normal data points — the ones in the dense cluster — take many splits to isolate because they have so many neighbors. Anomalous data points — the loners — get isolated quickly because they’re already far from everyone else.

The number of splits needed to isolate a point becomes its “anomaly score.” Few splits = more anomalous. Many splits = more normal.

That’s it. No thresholds. No “alert when CPU > 85%.” The model learns what normal looks like from your actual data and flags anything that doesn’t fit.

# Anomaly detection with Isolation Forest
# This is the ENTIRE model training. Not kidding.

from sklearn.ensemble import IsolationForest
import numpy as np

# Features extracted from your log stream over the past 30 days:
# [hour_of_day, day_of_week, msg_rate_per_min, error_rate,
#  unique_sources, severity_avg, interface_flap_count]

# Collect 30 days of "normal" operation as training data
training_data = np.array([
    # hour, dow, msg_rate, err_rate, sources, sev_avg, flaps
    [  2,    1,   45,       0.02,     12,      5.1,     0  ],  # Typical Tuesday 2am
    [  9,    1,   380,      0.05,     48,      4.8,     2  ],  # Tuesday morning rush
    [ 14,    3,   290,      0.03,     42,      5.0,     1  ],  # Thursday afternoon
    [  2,    6,   850,      0.01,     15,      5.5,     0  ],  # Saturday backup window
    # ... thousands more rows from your actual environment
])

# Train the model. That's literally one line.
model = IsolationForest(
    n_estimators=200,          # Number of trees
    contamination=0.01,        # Expect ~1% of data to be anomalous
    random_state=42,
    n_jobs=-1,                 # Use all CPU cores
)
model.fit(training_data)

# Now, in real time, score each 1-minute window:
def score_current_window(features: np.ndarray) -> float:
    """
    Returns anomaly score. -1 = anomaly, 1 = normal.
    The more negative, the more anomalous.
    """
    score = model.decision_function(features.reshape(1, -1))
    return score[0]

# Example: It's Tuesday at 2am but message rate is 900/min
# The model knows Tuesday 2am is usually 45/min
# Score: -0.34 → ANOMALY
current = np.array([2, 1, 900, 0.15, 38, 3.2, 7])
print(f"Anomaly score: {score_current_window(current)}")
# Output: Anomaly score: -0.34  ← Something's wrong

Notice what just happened. Nobody configured a threshold. Nobody said “alert when message rate exceeds 500.” The model looked at 30 days of your data and figured out that Tuesday at 2 AM usually means 45 messages per minute. When it saw 900, it didn’t compare to a static number — it compared to what your specific network usually does at that specific time on that specific day of the week.

Saturday at 2 AM showing 850 messages? The model knows that’s the backup window. Normal. No alert. But Tuesday at 2 AM showing 900? That’s a 20x deviation from the learned baseline. Fire the alert.

This is why these models beat static thresholds: they understand context. And they do it without understanding a single word of English.

Time Series Forecasting: Seeing the Future

Remember the storage example? Disk usage climbing 2% per day, currently at 88%? A static threshold at 90% gives you one day of warning. A time series model gives you two weeks.

# Time series forecasting with Prophet
# Predict when your storage will hit 100%

from prophet import Prophet
import pandas as pd

# Historical storage utilization data (daily readings)
df = pd.DataFrame({
    'ds': pd.date_range('2026-01-01', periods=60, freq='D'),
    'y': [
        # 60 days of slowly climbing storage
        52, 52, 53, 53, 54, 54, 55, 55, 56, 56,
        57, 57, 58, 58, 59, 60, 60, 61, 61, 62,
        63, 63, 64, 64, 65, 66, 66, 67, 68, 68,
        69, 70, 70, 71, 72, 72, 73, 74, 74, 75,
        76, 76, 77, 78, 78, 79, 80, 80, 81, 82,
        83, 83, 84, 85, 85, 86, 87, 87, 88, 89,
    ]
})

# Train the model
model = Prophet(
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=False,
)
model.fit(df)

# Forecast 30 days ahead
future = model.make_future_dataframe(periods=30)
forecast = model.predict(future)

# Find when utilization crosses 95%
critical = forecast[forecast['yhat'] >= 95].iloc[0]
print(f"Storage will hit 95% around: {critical['ds'].date()}")
print(f"Days until critical: {(critical['ds'] - pd.Timestamp.now()).days}")

# Output: Storage will hit 95% around: 2026-03-16
# Output: Days until critical: 7

Prophet (from Meta) is free, runs on a single CPU, and doesn’t need a GPU. It handles weekly seasonality automatically — so if your storage always spikes on Fridays because of weekly reports and drops slightly on weekends when temp files get cleaned, the model accounts for that.

But the model itself doesn’t tell you anything. It outputs a number. That’s it. A probability curve of future values. Something else needs to take that number and decide it’s worth telling a human about.

Classification: Is This a Real Threat or Just Tuesday?

Here’s a scenario: your firewall logs show 47 failed SSH attempts from an IP address in the last hour. Is it a brute force attack, or is it Dave from accounting who forgot his password again?

A gradient-boosted classifier can learn the difference:

# XGBoost classifier: real threat vs. noise
import xgboost as xgb
import numpy as np

# Features for each "suspicious event" cluster:
# [attempts_per_hour, unique_usernames_tried, unique_source_ips,
#  time_of_day, is_weekend, source_is_internal, geo_distance_km,
#  has_successful_login_history, password_complexity_score]

# Labeled training data from historical incidents
X_train = np.array([
    # Dave forgot his password (noise)
    [12,  1, 1, 9,  0, 1,   0, 1, 7],
    [8,   1, 1, 14, 0, 1,   0, 1, 6],
    [15,  1, 1, 10, 0, 1,   0, 1, 8],
    # Actual brute force attacks (threat)
    [847, 23, 1, 3,  1, 0, 8400, 0, 2],
    [412, 15, 3, 4,  0, 0, 6200, 0, 3],
    [1200, 1, 1, 2,  1, 0, 9100, 0, 1],
    # Credential stuffing (threat)
    [89,  89, 1, 22, 0, 0, 4500, 0, 5],
    [156, 156, 2, 1, 1, 0, 7800, 0, 4],
])
y_train = np.array([0, 0, 0, 1, 1, 1, 1, 1])  # 0=noise, 1=threat

model = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=4,
    learning_rate=0.1,
)
model.fit(X_train, y_train)

# Real-time classification
def classify_event(features):
    proba = model.predict_proba(features.reshape(1, -1))[0]
    return {
        'is_threat': bool(proba[1] > 0.7),
        'confidence': float(proba[1]),
        'label': 'threat' if proba[1] > 0.7 else 'noise',
    }

# New event: 47 attempts, 1 username, internal IP, Monday 9am
event = np.array([47, 1, 1, 9, 0, 1, 0, 1, 7])
result = classify_event(event)
# Output: {'is_threat': False, 'confidence': 0.12, 'label': 'noise'}
# → It's Dave again.

Notice: three completely different model types (isolation forest, Prophet, XGBoost), each handling a different kind of question, all running on the same filtered event stream. They don’t interfere with each other. They don’t even know the other models exist. They’re each independently consuming events and producing scores.

Tier 3: The LLM — The Brains of the Operation

Now — finally — we get to the large language model. But its job is different from what you might expect. The LLM doesn’t read raw logs. It doesn’t process 10,000 messages per second. It gets maybe 10-50 events per minute that have already been pre-filtered (Tier 1) and scored by specialized models (Tier 2).

The LLM’s job is the human-facing part:

  1. Correlation: “Three switches went down on adjacent floors within 90 seconds. The ML models flagged each one independently. I’m putting them together and telling you this looks like a localized power or cabling event.”

  2. Natural language explanation: “Storage on nas-01 has been climbing 2% daily for 40 days. The time-series model predicts you’ll hit 95% by March 16th. At current growth rates, you’ll be completely full by March 22nd.”

  3. Recommended actions: “Based on the thermal warning pattern on core-sw-03 and the fact that you replaced a fan module on this exact model last quarter, I’d suggest checking the fan tray. Here’s the part number.”

  4. Historical context: “This is the third time Building B has had this exact failure pattern on a Monday morning. The previous two times it was the UPS doing a self-test under load.”

# Tier 3: LLM integration for human-facing analysis
# This is where the expensive brain gets used — sparingly

import json
from anthropic import Anthropic

client = Anthropic()

def generate_analysis(events: list[dict], ml_scores: dict) -> str:
    """
    Takes pre-filtered events + ML model outputs
    and generates human-readable analysis.
    Called maybe 10-50 times per minute, not 10,000/sec.
    """
    context = {
        "events": events,
        "anomaly_scores": ml_scores.get("anomaly", {}),
        "forecasts": ml_scores.get("forecast", {}),
        "classifications": ml_scores.get("classification", {}),
        "topology": get_network_topology(),  # Cached, updated hourly
        "recent_changes": get_recent_changes(hours=24),
    }

    response = client.messages.create(
        model="claude-haiku-4-5-20251001",  # Fast + cheap for ops analysis
        max_tokens=1024,
        system="""You are a senior network engineer analyzing pre-processed
        infrastructure events. You receive events that have already been
        filtered for significance and scored by ML models for anomaly
        detection, forecasting, and threat classification.

        Your job:
        1. Correlate related events across devices and time
        2. Explain what's happening in plain English
        3. Assess severity and urgency
        4. Recommend specific actions
        5. Reference any relevant historical patterns

        Be direct. No filler. Lead with the most critical finding.""",
        messages=[{
            "role": "user",
            "content": f"Analyze these infrastructure events:\n{json.dumps(context, indent=2)}"
        }]
    )

    return response.content[0].text

Cost at this tier: If you’re processing 50 events/minute through Claude Haiku with ~500 tokens each exchange, that’s about $0.50/day. Not $189,000/month. Because the mechanical bouncer and the ML models already did 99.97% of the work.

The Full Architecture: How It All Fits Together

Here’s the complete pipeline from device to dashboard:

Architecture diagram showing the AI-driven syslog analysis POC — devices stream logs through collection, Tier 1 pre-filtering, Tier 2 ML models (anomaly detection and threat classification), Tier 3 LLM analysis, and finally alerts and dashboards.

┌─────────────────────────────────────────────────────────────────┐
│                        NETWORK DEVICES                          │
│  Switches, Firewalls, Routers, APs, Servers, Storage            │
│  Generating syslog, SNMP traps, NetFlow, streaming telemetry    │
└─────────────────────┬───────────────────────────────────────────┘
                      │ UDP/514 (syslog), SNMP traps

┌─────────────────────────────────────────────────────────────────┐
│                    COLLECTION LAYER                              │
│  rsyslog / syslog-ng / Fluentd                                  │
│  Receives, normalizes, timestamps, forwards                     │
│  Throughput: 100,000+ msg/sec on commodity hardware              │
└─────────────────────┬───────────────────────────────────────────┘
                      │ TCP / Kafka protocol

┌─────────────────────────────────────────────────────────────────┐
│                    MESSAGE BROKER                                │
│  Apache Kafka (or Redis Streams for smaller scale)              │
│  Topics: raw_logs, filtered_events, ml_scores, alerts           │
│  Retention: 7 days raw, 90 days filtered                        │
└────┬─────────────────┬──────────────────┬───────────────────────┘
     │                 │                  │
     ▼                 ▼                  ▼
┌──────────┐   ┌──────────────┐   ┌──────────────┐
│  TIER 1  │   │   TIER 2     │   │   TIER 3     │
│ Mechani- │   │ ML Models    │   │  LLM Layer   │
│ cal       │   │              │   │              │
│ Filter   │──▶│ • Isolation  │──▶│ • Correlate  │
│          │   │   Forest     │   │ • Explain    │
│ • Regex  │   │ • Prophet    │   │ • Recommend  │
│ • Sever- │   │ • XGBoost    │   │ • Summarize  │
│   ity    │   │ • LSTM       │   │              │
│ • Rules  │   │              │   │ Claude Haiku │
│          │   │ scikit-learn │   │ or local LLM │
│ ~99.97%  │   │ ~80% of      │   │              │
│ dropped  │   │ remaining    │   │ ~50 events/  │
│          │   │ scored/routed│   │   minute      │
└──────────┘   └──────────────┘   └──────────────┘


                              ┌──────────────────┐
                              │   OUTPUT LAYER    │
                              │                   │
                              │ • Slack/PagerDuty │
                              │ • Dashboard       │
                              │ • Auto-remediation│
                              │ • Incident ticket │
                              └──────────────────┘

The beautiful thing about this architecture is that every component is independently scalable. Getting more syslog than your filter can handle? Spin up another filter instance. ML models becoming a bottleneck? Add another worker consuming from the Kafka topic. The LLM tier is the cheapest to scale because it’s processing the least data.

The POC: Building This With a Unix Box and a Few Devices

Alright, enough theory. Let’s build one. Here’s a concrete lab setup you could have running by the weekend.

Hardware

Lab Setup — Minimum Viable POC
═══════════════════════════════

1× Linux box (your "AI server")
   - Any modern x86_64 with 16GB+ RAM
   - Ubuntu 22.04 or Debian 12
   - Could literally be an old desktop
   - If you want local LLM: add a GPU (even a used RTX 3060 works)
   - If you're using Claude API: GPU not needed

3-5× Network devices that generate syslog
   - Managed switches (Cisco, Aruba, Juniper — anything with syslog)
   - A firewall (pfSense on a spare box works great)
   - A Linux server (generates auth.log, kern.log, etc.)
   - Even a Raspberry Pi running services will generate useful logs

Network
   - All devices configured to send syslog to the Linux box
   - SNMP enabled on managed devices for metric polling

Step 1: Set Up the Syslog Receiver

# On your Linux box — install and configure rsyslog
sudo apt update && sudo apt install -y rsyslog

# Enable UDP and TCP syslog reception
sudo tee /etc/rsyslog.d/10-remote.conf << 'EOF'
# Listen for syslog on UDP 514 and TCP 514
module(load="imudp")
input(type="imudp" port="514")

module(load="imtcp")
input(type="imtcp" port="514")

# Template: write logs as JSON for easy parsing
template(name="json-syslog" type="list") {
    constant(value="{")
    constant(value="\"timestamp\":\"") property(name="timereported" dateFormat="rfc3339")
    constant(value="\",\"host\":\"")   property(name="fromhost-ip")
    constant(value="\",\"hostname\":\"") property(name="hostname")
    constant(value="\",\"severity\":") property(name="syslogseverity")
    constant(value=",\"facility\":\"") property(name="syslogfacility-text")
    constant(value="\",\"tag\":\"")    property(name="syslogtag")
    constant(value="\",\"message\":\"") property(name="msg" format="jsonf")
    constant(value="\"}\n")
}

# Write all remote logs to a JSON file AND forward to a named pipe
if $fromhost-ip != "127.0.0.1" then {
    action(type="omfile" file="/var/log/remote/all.json" template="json-syslog")
    action(type="ompipe" pipe="/var/log/remote/syslog.pipe" template="json-syslog")
}
EOF

# Create the log directory and named pipe
sudo mkdir -p /var/log/remote
sudo mkfifo /var/log/remote/syslog.pipe

# Restart rsyslog
sudo systemctl restart rsyslog

Step 2: Configure Your Devices to Send Syslog

! Cisco IOS — send syslog to your Linux box
configure terminal
logging host 10.0.1.100 transport udp port 514
logging trap informational
logging source-interface Vlan1
logging on
end

# pfSense — Status > System Logs > Settings
# Remote Logging: Enable
# Remote log servers: 10.0.1.100:514
# Log everything, or at minimum: firewall events, system events

Step 3: Set Up Kafka (or Redis Streams for Simplicity)

For a POC, Redis Streams is simpler than Kafka and handles lab-scale volumes easily:

# Install Redis
sudo apt install -y redis-server
sudo systemctl enable redis-server

# Verify it's running
redis-cli ping
# Output: PONG

# Create streams (they auto-create on first write,
# but let's set retention)
redis-cli CONFIG SET stream-node-max-entries 10000

Step 4: The Syslog-to-Stream Bridge

#!/usr/bin/env python3
"""
syslog_bridge.py — Reads syslog from named pipe, applies Tier 1
filtering, pushes survivors to Redis Streams.
"""

import json
import re
import redis
import sys

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Tier 1 filter patterns (customize for your environment)
DROP_PATTERNS = [
    r"last message repeated \d+ times",
    r"CRON\[\d+\]",                    # Cron job logs
    r"systemd\[\d+\]: Started Session", # Session spam
    r"LINEPROTO.*changed state to up",  # Flap recovery
    r"SEC_LOGIN-5-LOGIN_SUCCESS",       # Normal logins
    r"sshd.*Accepted password",         # Successful SSH
]

ESCALATE_PATTERNS = [
    (r"LINK-3-UPDOWN.*down", "link_down"),
    (r"DUAL-5-NBRCHANGE.*down", "routing_neighbor_down"),
    (r"sshd.*Failed password", "auth_failure"),
    (r"Out of memory", "oom_kill"),
    (r"temperature.*critical", "thermal_critical"),
    (r"PLATFORM.*PWRSPLY.*fail", "power_failure"),
    (r"disk.*error|I/O error", "disk_error"),
    (r"kernel.*segfault", "segfault"),
]

def process_line(line: str):
    try:
        event = json.loads(line.strip())
    except json.JSONDecodeError:
        return

    msg = event.get('message', '')

    # Drop known noise
    for pattern in DROP_PATTERNS:
        if re.search(pattern, msg, re.IGNORECASE):
            r.incr('stats:dropped')
            return

    # Check for escalation patterns
    category = 'unclassified'
    escalate = False
    for pattern, cat in ESCALATE_PATTERNS:
        if re.search(pattern, msg, re.IGNORECASE):
            category = cat
            escalate = True
            break

    # Also escalate anything severity 0-3 (emergency through error)
    if event.get('severity', 7) <= 3:
        escalate = True

    event['category'] = category
    event['escalate'] = escalate

    # Push to Redis Stream
    stream = 'events:escalated' if escalate else 'events:filtered'
    r.xadd(stream, {'data': json.dumps(event)}, maxlen=50000)
    r.incr('stats:passed')

# Read from named pipe
print("Syslog bridge running. Waiting for messages...")
with open('/var/log/remote/syslog.pipe', 'r') as pipe:
    for line in pipe:
        process_line(line)

Step 5: The ML Scoring Engine

#!/usr/bin/env python3
"""
ml_scorer.py — Consumes filtered events from Redis,
runs them through ML models, publishes scored results.
"""

import json
import time
import redis
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import defaultdict, deque

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ──────────────────────────────────────────────
# Feature extraction: turn log events into numbers
# ──────────────────────────────────────────────

class FeatureEngine:
    """
    Maintains rolling windows of events and extracts
    numerical features for ML models.
    """
    def __init__(self, window_seconds=60):
        self.window = window_seconds
        self.events = deque()
        self.host_counts = defaultdict(int)
        self.category_counts = defaultdict(int)

    def add_event(self, event: dict):
        now = time.time()
        self.events.append((now, event))
        self.host_counts[event.get('host', 'unknown')] += 1
        self.category_counts[event.get('category', 'unknown')] += 1

        # Purge old events outside the window
        while self.events and self.events[0][0] < now - self.window:
            _, old = self.events.popleft()
            host = old.get('host', 'unknown')
            self.host_counts[host] = max(0, self.host_counts[host] - 1)

    def extract_features(self) -> np.ndarray:
        """
        Returns: [event_rate, unique_hosts, avg_severity,
                  error_ratio, auth_failure_count, link_down_count,
                  hour_of_day, day_of_week]
        """
        now = time.time()
        lt = time.localtime(now)

        total = len(self.events)
        unique_hosts = len([h for h, c in self.host_counts.items() if c > 0])

        severities = [e.get('severity', 5) for _, e in self.events]
        avg_sev = np.mean(severities) if severities else 5.0

        errors = sum(1 for _, e in self.events if e.get('severity', 5) <= 3)
        error_ratio = errors / max(total, 1)

        auth_fails = self.category_counts.get('auth_failure', 0)
        link_downs = self.category_counts.get('link_down', 0)

        return np.array([
            total,                    # events in window
            unique_hosts,             # unique source hosts
            avg_sev,                  # average severity
            error_ratio,              # fraction that are errors
            auth_fails,               # auth failures in window
            link_downs,               # link down events in window
            lt.tm_hour,               # hour of day (0-23)
            lt.tm_wday,               # day of week (0-6)
        ])

# ──────────────────────────────────────────────
# Model setup
# ──────────────────────────────────────────────

# Phase 1: Collect baseline data (first 7 days)
# Phase 2: Train model and score in real time
# The model retrains weekly on accumulated normal data

engine = FeatureEngine(window_seconds=60)
baseline_data = []
model = None
BASELINE_SAMPLES = 10080  # 1 sample per minute × 7 days

print("ML scorer running. Collecting baseline...")

last_sample = 0
last_id = '0-0'

while True:
    # Read new events from Redis Stream
    results = r.xread(
        {'events:escalated': last_id},
        count=100,
        block=5000  # Wait up to 5 seconds
    )

    for stream, messages in (results or []):
        for msg_id, data in messages:
            last_id = msg_id
            event = json.loads(data['data'])
            engine.add_event(event)

    # Extract features every 60 seconds
    now = time.time()
    if now - last_sample >= 60:
        last_sample = now
        features = engine.extract_features()

        if model is None:
            # Still collecting baseline
            baseline_data.append(features)
            remaining = BASELINE_SAMPLES - len(baseline_data)
            if remaining <= 0:
                # Train the model
                X = np.array(baseline_data)
                model = IsolationForest(
                    n_estimators=200,
                    contamination=0.02,
                    random_state=42,
                )
                model.fit(X)
                print(f"Model trained on {len(baseline_data)} samples")
            elif remaining % 1000 == 0:
                print(f"Baseline: {len(baseline_data)}/{BASELINE_SAMPLES}")
        else:
            # Score the current window
            score = model.decision_function(features.reshape(1, -1))[0]
            prediction = model.predict(features.reshape(1, -1))[0]

            result = {
                'timestamp': time.time(),
                'anomaly_score': float(score),
                'is_anomaly': bool(prediction == -1),
                'features': {
                    'event_rate': float(features[0]),
                    'unique_hosts': int(features[1]),
                    'avg_severity': float(features[2]),
                    'error_ratio': float(features[3]),
                    'auth_failures': int(features[4]),
                    'link_downs': int(features[5]),
                },
            }

            # Publish scores
            r.xadd('ml:scores', {'data': json.dumps(result)}, maxlen=10000)

            if result['is_anomaly']:
                r.xadd('alerts:anomaly', {'data': json.dumps(result)}, maxlen=1000)
                print(f"⚠ ANOMALY detected: score={score:.3f} "
                      f"rate={features[0]:.0f} hosts={features[1]:.0f}")

Step 6: The LLM Analysis Layer

#!/usr/bin/env python3
"""
llm_analyst.py — Consumes anomaly alerts and escalated events,
uses an LLM to generate human-readable analysis.
"""

import json
import time
import redis
from anthropic import Anthropic

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
client = Anthropic()

# Collect events for batch analysis (every 5 minutes or on anomaly)
event_buffer = []
last_analysis = time.time()
ANALYSIS_INTERVAL = 300  # 5 minutes

last_anomaly_id = '0-0'
last_event_id = '0-0'

def run_analysis(events: list, anomaly: dict = None):
    if not events and not anomaly:
        return

    context = {
        'events': events[-50:],  # Last 50 events max
        'anomaly_alert': anomaly,
        'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
    }

    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=1024,
        system="""You are a network operations analyst reviewing
        pre-filtered infrastructure events. These have already passed
        through severity filtering and ML anomaly detection.

        For each analysis:
        1. Identify the most significant events
        2. Look for correlations (multiple devices, same timeframe,
           same failure type)
        3. Suggest probable root cause
        4. Recommend specific actions

        Be concise. Network engineers don't want essays.
        Lead with what matters most.""",
        messages=[{
            "role": "user",
            "content": json.dumps(context, indent=2)
        }]
    )

    analysis = response.content[0].text

    # Store the analysis
    r.xadd('analysis:results', {
        'analysis': analysis,
        'event_count': len(events),
        'has_anomaly': bool(anomaly),
        'timestamp': time.time(),
    }, maxlen=5000)

    print(f"\n{'='*60}")
    print(f"ANALYSIS ({len(events)} events)")
    print(f"{'='*60}")
    print(analysis)
    print(f"{'='*60}\n")

print("LLM analyst running. Waiting for events...")

while True:
    # Check for anomaly alerts (high priority)
    anomaly_results = r.xread(
        {'alerts:anomaly': last_anomaly_id},
        count=1,
        block=1000,
    )

    for stream, messages in (anomaly_results or []):
        for msg_id, data in messages:
            last_anomaly_id = msg_id
            anomaly = json.loads(data['data'])
            # Immediate analysis on anomaly
            run_analysis(event_buffer.copy(), anomaly)
            event_buffer.clear()
            last_analysis = time.time()

    # Collect escalated events
    event_results = r.xread(
        {'events:escalated': last_event_id},
        count=50,
        block=1000,
    )

    for stream, messages in (event_results or []):
        for msg_id, data in messages:
            last_event_id = msg_id
            event = json.loads(data['data'])
            event_buffer.append(event)

    # Periodic analysis even without anomalies
    if time.time() - last_analysis >= ANALYSIS_INTERVAL and event_buffer:
        run_analysis(event_buffer.copy())
        event_buffer.clear()
        last_analysis = time.time()

Step 7: Start It Up

# Terminal 1 — Syslog bridge (Tier 1 filter)
python3 syslog_bridge.py

# Terminal 2 — ML scorer (Tier 2 anomaly detection)
python3 ml_scorer.py

# Terminal 3 — LLM analyst (Tier 3 natural language)
python3 llm_analyst.py

# Terminal 4 — Watch the stats
watch -n 1 'redis-cli mget stats:dropped stats:passed'

# Terminal 5 — Read the analysis stream
redis-cli XREAD BLOCK 0 STREAMS analysis:results $

That’s the entire pipeline. Five Python files, one Redis instance, and rsyslog. On a single machine. No Kubernetes. No Kafka cluster. No enterprise license.

What Each Tier Actually Costs to Run

POC Cost Breakdown
═══════════════════

Hardware (one-time):
  Linux box (reuse old hardware):         $0
  Or: used Dell Optiplex on eBay:         $100-200
  Network devices (you probably have some): $0

Software (all open source):
  Ubuntu/Debian:                          $0
  rsyslog:                                $0
  Redis:                                  $0
  Python + scikit-learn + Prophet:        $0
  Anthropic API (Claude Haiku):           ~$0.50/day at lab scale

Monthly operating cost:
  Electricity for one box:                ~$10-15
  Claude Haiku API:                       ~$15/month (generous estimate)
  Total:                                  ~$25-30/month

Compare to:
  Splunk (500MB/day):                     $1,800/year minimum
  Datadog Log Management:                 $0.10/GB ingested + $1.70/M analyzed
  SolarWinds Log Analyzer:                $1,500+ for first node

The Models Aren’t LLMs — And That’s the Point

I want to hammer this home because it’s the thing I found most surprising in my research. The ML models at Tier 2 are nothing like ChatGPT. You can’t talk to them. They don’t understand language. They’re mathematical functions that take arrays of numbers as input and produce numbers as output.

What an LLM does:
  Input:  "Is this log message concerning?"
  Output: "Based on the severity and context, this appears to be..."

What an isolation forest does:
  Input:  [45.0, 12, 5.1, 0.02, 0, 0, 2, 1]
  Output: -0.34

What Prophet does:
  Input:  A column of dates and a column of numbers
  Output: A column of predicted future numbers

What XGBoost does:
  Input:  [847, 23, 1, 3, 1, 0, 8400, 0, 2]
  Output: 0.97 (probability this is a threat)

These models are fast. An isolation forest scores a data point in microseconds. Prophet makes a prediction in milliseconds. XGBoost classifies in microseconds. They don’t need GPUs (though GPUs can help for training on large datasets). They run on CPUs. They use megabytes of RAM, not gigabytes.

And here’s the crucial insight: they don’t need human-designed thresholds. The isolation forest doesn’t need you to tell it “alert at 85% CPU.” It figures out what’s normal from your data. Different networks have different normals. A university network at 2 AM looks nothing like a hospital network at 2 AM. Static thresholds are a one-size-fits-none approach. ML models adapt to your specific environment.

Does this eliminate the need for humans? No. Humans still design the feature engineering (what numbers to feed the model). Humans still label the training data for classification models. Humans still review and act on the alerts. But the models automate the pattern recognition that currently requires a senior engineer staring at screens, which is the most tedious, error-prone, and unsustainable part of the entire operations workflow.

Running Multiple Models on the Same Stream

One of the questions that came up in my research was whether you need to choose between anomaly detection, forecasting, and classification. The answer is: you don’t. They run in parallel.

# All three model types consuming the same event stream
# Each model type has its own consumer group in Redis

# Consumer group 1: Anomaly detection
# Reads events → extracts features → scores → publishes to ml:anomaly

# Consumer group 2: Time series forecasting
# Reads events → aggregates metrics hourly → forecasts → publishes to ml:forecast

# Consumer group 3: Classification
# Reads events → extracts event features → classifies → publishes to ml:classification

# The LLM layer reads from ALL THREE output streams
# and synthesizes a unified analysis

This is the “fan-out” pattern in streaming architectures. Kafka and Redis Streams both support it natively with consumer groups. Each model gets its own copy of every event, processes it independently, and publishes its results to its own output stream. The LLM layer then reads all the output streams and correlates everything.

Think of it like a hospital emergency room. The triage nurse (Tier 1) decides who gets seen. Then the patient might get blood work (anomaly detection), imaging (forecasting), and a specialist consultation (classification) — all happening in parallel. The attending physician (LLM) reads all the results and makes the final diagnosis.

The Correlation Problem: Why You Need the LLM

The ML models are great at spotting individual anomalies. But they can’t do what a senior network engineer does instinctively: connect the dots across different types of events.

Example: The adjacent floor switch failure.

The isolation forest on each switch independently flags “this switch went down.” Three anomaly alerts fire within 90 seconds. Each one is correct. But none of them knows about the other two.

The LLM reads all three alerts, notices they happened within 90 seconds, cross-references the network topology (which it has access to), realizes these switches are on floors 4, 5, and 6 of the same building, and concludes: “Three switches on adjacent floors failed within 90 seconds. This is consistent with a localized infrastructure event — likely power, cooling, or physical layer. Recommend dispatching facilities to check Building A floors 4-6 for power or environmental issues.”

No ML model can do that alone. The isolation forest doesn’t know about building floors. Prophet doesn’t correlate simultaneous failures. XGBoost classifies individual events, not event clusters. The LLM is the only component that can reason across all the data with contextual knowledge of your environment.

This is where AI actually earns its keep. Not reading raw logs. Not replacing regex. Doing the cognitive work that currently requires a human who has deep institutional knowledge of the network topology, the building layout, the maintenance schedule, and the failure history.

What I’d Do Differently Than ChatGPT Suggested

The ChatGPT conversation I started with gave a solid high-level overview, but it made the whole thing sound like you’d go out and buy Apache Kafka, Apache Flink, deploy Kubernetes clusters, and spin up GPU nodes. That’s the enterprise sales pitch version. Here’s what I’d actually tell someone who wants to understand this stuff:

  1. Start with tail -f and Python. Before you deploy anything, just watch your logs stream by. Build intuition for what “normal” looks like. You’ll be surprised how quickly you start noticing patterns.

  2. Redis Streams, not Kafka. For a lab or even a mid-size deployment, Kafka is overkill. Redis Streams gives you the same pub/sub semantics with consumer groups, in a tool that fits in 50MB of RAM and requires zero configuration.

  3. scikit-learn before TensorFlow. You don’t need deep learning for most infrastructure monitoring. Isolation forests, random forests, and gradient boosting handle 90% of use cases and train in seconds on a CPU.

  4. Claude Haiku, not GPT-4. For operational analysis — summarizing events, correlating alerts, generating recommendations — you don’t need the most powerful model. You need one that’s fast and cheap. Haiku at $0.25/million input tokens is 60x cheaper than GPT-4 and plenty smart for this task.

  5. Feature engineering is the whole game. The ML models are the easy part. Deciding what numbers to feed them is where the domain expertise lives. A message rate of 500/min means nothing without context. 500/min at 2 AM on a Tuesday for this specific device — that’s a feature with meaning.

“All models are wrong, but some are useful.” — George Box

He said that in 1976 about statistical models. It’s still the most honest thing anyone has said about AI.

Where This Idea Stands

I’ve got the architecture mapped out, the code sketched, and a lab that could run this by next weekend. The pieces are all free, open-source, and battle-tested individually. The novel part is wiring them together into a coherent pipeline where mechanical filtering, ML scoring, and LLM analysis each handle the layer they’re best suited for.

The questions I’m still noodling on:

  1. Baseline training period. Seven days feels right for an isolation forest to learn “normal,” but some environments have monthly patterns (month-end batch jobs, quarterly compliance scans). Do you need 30 days of baseline before the model is trustworthy?

  2. Model drift. Networks change. New devices get added, traffic patterns shift, new applications launch. How often does the model need retraining? Weekly? Monthly? Or do you use an online learning approach that continuously adapts?

  3. The Ollama angle. Could you replace the Claude API call with a local model running on Ollama? A quantized Llama 3 8B model can run on CPU and generates decent operational summaries. That eliminates the API cost entirely but probably reduces the quality of correlation and recommendation.

  4. The SNMP convergence. My network monitoring piece covers the metric collection side. This piece covers the log analysis side. In reality, these should be the same system. The ML models should be ingesting both structured metrics (SNMP counters) and unstructured logs (syslog messages) to build a complete picture.

  5. Alert routing intelligence. Right now the LLM just prints analysis. In a real system, it should know that a power failure alert goes to facilities, a security event goes to the SOC, and a storage prediction goes to the storage team. That’s a whole routing engine on top of what we’ve built.

The enterprise monitoring industry has been selling two products for decades: “we’ll collect your data” and “we’ll show you dashboards.” The first is a commodity. The second is a crutch for humans who can’t process the raw data. AI doesn’t just make dashboards prettier. It makes them optional. When the system can tell you what’s wrong, why it’s wrong, and what to do about it, in plain English, before you even sit down at your desk — that’s not an incremental improvement on SolarWinds. That’s a different product entirely.

And you can prototype it on a box under your desk for the cost of electricity.


FAQ

Do I need a GPU to run the ML models?

No. The ML models in this architecture — isolation forests, Prophet, XGBoost — all run on CPUs. scikit-learn is CPU-optimized and handles datasets up to millions of rows without breaking a sweat. You’d only need a GPU if you’re training deep learning models (LSTMs, transformers) on very large datasets, or if you want to run a local LLM instead of using an API. For the POC described here, a regular desktop CPU with 16GB of RAM is plenty.

How long does it take to train the anomaly detection model?

The isolation forest trains in seconds to minutes depending on dataset size. With 10,000 samples (about a week of 1-minute windows), training takes under 5 seconds on a modern CPU. Prophet is slightly slower — maybe 10-30 seconds for a 60-day dataset. XGBoost trains in seconds for small datasets. The longest part isn’t training; it’s collecting enough baseline data for the model to learn what “normal” looks like. Plan on 1-2 weeks of data collection before the anomaly detection is useful.

What happens when the model gets it wrong?

It will get things wrong, especially early on. False positives (flagging normal events as anomalies) are more common than false negatives. The practical approach: log every model prediction alongside the actual outcome, review the false positives weekly, and retrain with corrected labels. Over time, the model gets better because it’s learning your specific environment. The LLM layer also acts as a sanity check — it can sometimes recognize that a Tier 2 anomaly alert is actually routine when it has broader context.

Can this replace a SIEM like Splunk or Elastic?

Not directly, and it’s not trying to. A SIEM provides long-term log storage, compliance reporting, forensic search, and often a whole security ecosystem (threat intel feeds, SOAR integration, compliance templates). This architecture handles real-time detection and analysis. The ideal setup runs both: the AI pipeline for real-time intelligence, and a SIEM or log store for historical analysis and compliance. That said, for a small environment that currently has no log analysis, this POC gives you 80% of the value at 0% of the Splunk license cost.

How is this different from what Datadog or New Relic already offer?

Datadog and New Relic do offer anomaly detection features, and they’re solid products. The difference is three-fold: cost (they charge per GB or per host, and it adds up fast), intelligence depth (their anomaly detection is typically per-metric, not cross-domain correlation), and the LLM layer (they show you dashboards, not natural-language analysis with recommended actions). If you’re already paying for Datadog and it’s working, this isn’t necessarily better. But if you’re priced out of Datadog, or if you want deeper AI-driven analysis than their built-in features offer, this architecture gives you a path.

What’s the minimum viable lab to test this?

One Linux machine (even a Raspberry Pi 4 with 8GB RAM), one managed switch that sends syslog, and an internet connection for the Claude API. Install rsyslog, Redis, and Python. Configure the switch to send syslog to the Pi. Run the scripts. You’ll have a working three-tier log analysis pipeline in an afternoon. It won’t be production-ready, but you’ll understand exactly how each layer works and why the architecture is structured this way.