Log Parsing Scripts
Ready-to-use scripts for the most common log analysis tasks. Copy, paste, run. All scripts are dependency-light — bash oneliners use only standard coreutils; Python scripts use only the standard library.
Fast answers from the command line using only awk, sort, uniq, and grep. Works on any Combined Format access log.
## Set your log path once LOG=/var/log/nginx/access.log # or /var/log/apache2/access.log ## ── TOP IPs BY REQUEST COUNT ────────────────────────── awk '{print $1}' $LOG | sort | uniq -c | sort -rn | head -20 ## ── TOP REQUESTED URLS ──────────────────────────────── awk '{print $7}' $LOG | sort | uniq -c | sort -rn | head -20 ## ── HTTP STATUS CODE SUMMARY ────────────────────────── awk '{print $9}' $LOG | sort | uniq -c | sort -rn ## ── TOP USER AGENTS ─────────────────────────────────── awk -F'"' '{print $6}' $LOG | sort | uniq -c | sort -rn | head -15 ## ── TOTAL BANDWIDTH SERVED (bytes) ─────────────────── awk '{sum += $10} END {print sum " bytes = " sum/1024/1024 " MB"}' $LOG ## ── ALL 4xx / 5xx ERRORS WITH URL ──────────────────── awk '$9 ~ /^[45]/' $LOG | awk '{print $9, $7}' | sort | uniq -c | sort -rn | head -30 ## ── REQUESTS IN LAST 60 MINUTES ─────────────────────── awk -v d="$(date -d '60 minutes ago' +'%d/%b/%Y:%H:%M:%S')" '$4 > "["d' $LOG | wc -l ## ── TOP REFERRERS (excluding self) ─────────────────── awk -F'"' '{print $4}' $LOG | grep -v "^-$\|yourdomain.com" | sort | uniq -c | sort -rn | head -15 ## ── SLOW REQUESTS (Nginx with $request_time logged) ── awk '$(NF) > 2.0 {print $(NF), $7}' $LOG | sort -rn | head -20
Produces a structured summary report: top IPs, top URLs, status distribution, bandwidth, and top user agents. Works on Combined Format logs from both Apache and Nginx.
#!/usr/bin/env python3 """Combined log parser — Apache / Nginx. No dependencies required.""" import re, sys from collections import Counter from pathlib import Path LOG_RE = re.compile( r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] "' r'(?P<method>\S+) (?P<uri>\S+) [^"]*" (?P<status>\d{3}) (?P<bytes>\S+)' r'(?: "(?P<referer>[^"]*)" "(?P<ua>[^"]*)")?' ) def parse_log(path: str) -> None: ips, uris, statuses, uas, errors = Counter(), Counter(), Counter(), Counter(), [] total_bytes = skipped = 0 for line in Path(path).open(encoding="utf-8", errors="replace"): m = LOG_RE.match(line) if not m: skipped += 1 continue ips[m["ip"]] += 1 uris[m["uri"].split("?")[0]] += 1 # strip query string statuses[m["status"]] += 1 if m["ua"]: uas[m["ua"]] += 1 if m["bytes"] != "-": total_bytes += int(m["bytes"]) if m["status"][0] in "45": errors.append(f"{m['status']} {m['uri']}") total = sum(ips.values()) print(f"\n{'='*55}\n LOG ANALYSIS: {path}\n{'='*55}") print(f"Total requests : {total:,} ({skipped} unparseable lines skipped)") print(f"Total bandwidth: {total_bytes/1_048_576:.1f} MB") for label, data, n in [ ("TOP 15 IPs", ips, 15), ("TOP 20 URIs", uris, 20), ("STATUS CODES", statuses, 20), ("TOP 10 USER AGENTS", uas, 10), ]: print(f"\n── {label} {'─'*(45-len(label))}") for val, cnt in data.most_common(n): pct = cnt / total * 100 if total else 0 print(f"{cnt:8,} ({pct:5.1f}%) {val}") if errors: err_c = Counter(errors) print(f"\n── TOP ERRORS (4xx/5xx) {'─'*30}") for e, c in err_c.most_common(15): print(f"{c:8,} {e}") if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python3 parse_log.py /var/log/nginx/access.log"); sys.exit(1) parse_log(sys.argv[1])
Parses the MySQL slow query log, fingerprints queries (normalizes literals), and reports the worst offenders by total time, average time, and rows examined.
#!/usr/bin/env python3 """MySQL slow query log analyzer. No dependencies required.""" import re, sys from collections import defaultdict from pathlib import Path def fingerprint(sql): sql = re.sub(rr"'[^']*'", "'?'", sql) sql = re.sub(rr"\b\d+\b", "?", sql) sql = re.sub(rr"\s+", " ", sql).strip() return sql[:120] def parse_slow_log(path): stats = defaultdict(lambda: {"count":0,"total_t":0,"max_t":0,"rows_ex":0}) qt = lt = re_val = 0 buf = [] for line in Path(path).open(encoding="utf-8", errors="replace"): if line.startswith("# Query_time"): m = re.search(rr"Query_time: ([\d.]+).*?Lock_time: ([\d.]+).*?Rows_examined: (\d+)", line) if m: qt, lt, re_val = float(m[1]), float(m[2]), int(m[3]) elif line.startswith("SET timestamp") or line.startswith("use "): pass elif line.startswith("#") or line.startswith("/"): if buf and qt: fp = fingerprint(" ".join(buf)) s = stats[fp] s["count"] += 1; s["total_t"] += qt s["max_t"] = max(s["max_t"], qt); s["rows_ex"] += re_val buf, qt = [], 0 else: sql = line.strip() if sql: buf.append(sql) print(f"\n{'='*60}\n SLOW QUERY REPORT: {path}\n{'='*60}") sorted_q = sorted(stats.items(), key=lambda x: x[1]["total_t"], reverse=True) print(f"\n{'Rank':<4} {'Count':>6} {'Total(s)':>10} {'Avg(s)':>8} {'Max(s)':>8} {'RowsEx':>10} Query") print("-"*120) for i, (fp, s) in enumerate(sorted_q[:20], 1): avg = s["total_t"] / s["count"] print(f"{i:<4} {s['count']:>6} {s['total_t']:>10.2f} {avg:>8.3f} {s['max_t']:>8.3f} {s['rows_ex']:>10,} {fp}") if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python3 slow_query.py /var/log/mysql/mysql-slow.log"); sys.exit(1) parse_slow_log(sys.argv[1])
Scans auth.log, vsftpd.log, and web access logs for brute force patterns and prints IPs exceeding configurable thresholds.
#!/usr/bin/env bash # Brute force detector — SSH, FTP, and web (401/403 floods) # Prints IPs over threshold and optionally blocks via ufw SSH_LOG=/var/log/auth.log FTP_LOG=/var/log/vsftpd.log WEB_LOG=/var/log/nginx/access.log SSH_THRESHOLD=20 # failed attempts before flagging FTP_THRESHOLD=10 WEB_THRESHOLD=50 # 401/403 responses to one IP BLOCK=false # set true to auto-block with ufw echo "===== BRUTE FORCE REPORT $(date) =====" ## SSH failures echo -e "\n── SSH Failed Logins (threshold: ${SSH_THRESHOLD})" grep "Failed password\|Invalid user" $SSH_LOG 2>/dev/null | awk '{for(i=1;i<=NF;i++) if($i=="from") print $(i+1)}' | sort | uniq -c | sort -rn | while read cnt ip; do [[ $cnt -ge $SSH_THRESHOLD ]] || continue printf "%6d %s\n" $cnt $ip [[ $BLOCK == true ]] && ufw deny from $ip to any &>/dev/null done ## FTP failures echo -e "\n── FTP Auth Failures (threshold: ${FTP_THRESHOLD})" grep "FAIL LOGIN\|530\|Invalid user\|authentication failed" $FTP_LOG 2>/dev/null | grep -oE "[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+" | sort | uniq -c | sort -rn | while read cnt ip; do [[ $cnt -ge $FTP_THRESHOLD ]] && printf "%6d %s\n" $cnt $ip done ## Web 401/403 floods echo -e "\n── Web Auth/Forbidden Floods (threshold: ${WEB_THRESHOLD})" awk '$9==401 || $9==403 {print $1}' $WEB_LOG 2>/dev/null | sort | uniq -c | sort -rn | while read cnt ip; do [[ $cnt -ge $WEB_THRESHOLD ]] && printf "%6d %s\n" $cnt $ip done echo -e "\nDone. Set BLOCK=true to auto-block with ufw."
Summarizes sent, deferred, bounced, and rejected messages, along with top senders and top rejection reasons.
#!/usr/bin/env bash # Postfix mail log summary MAIL_LOG=${1:-/var/log/mail.log} [[ -f $MAIL_LOG ]] || { echo "Log not found: $MAIL_LOG"; exit 1; } echo "===== POSTFIX LOG SUMMARY: $MAIL_LOG =====" echo "Period: $(head -1 $MAIL_LOG | cut -d' ' -f1-3) → $(tail -1 $MAIL_LOG | cut -d' ' -f1-3)" echo -e "\n── Delivery Status Counts" echo "Sent: $(grep -c 'status=sent' $MAIL_LOG)" echo "Deferred: $(grep -c 'status=deferred' $MAIL_LOG)" echo "Bounced: $(grep -c 'status=bounced' $MAIL_LOG)" echo "Rejected: $(grep -c 'NOQUEUE: reject' $MAIL_LOG)" echo "SASL fails:$(grep -c 'warning: .*SASL.*authentication failed' $MAIL_LOG)" echo -e "\n── Top 10 Sender Domains" grep "status=sent" $MAIL_LOG | grep -oP 'from=<[^>]+>' | grep -oP '@\K[^>]+' | sort | uniq -c | sort -rn | head -10 echo -e "\n── Top 10 Rejection Reasons" grep "NOQUEUE: reject" $MAIL_LOG | grep -oP '(?<=reject: ).*?(?=;)' | sort | uniq -c | sort -rn | head -10 echo -e "\n── Top 10 Deferred Reasons" grep "status=deferred" $MAIL_LOG | grep -oP '(?<=\().*?(?=\))' | sort | uniq -c | sort -rn | head -10
Tails a log file in real time, matches configurable regex patterns, and prints timestamped alerts. Extend with email/webhook notifications.
#!/usr/bin/env python3 """Real-time log monitor with pattern-based alerting. No dependencies.""" import re, sys, time, signal from pathlib import Path from datetime import datetime # Alert patterns: (label, regex, severity) PATTERNS = [ ("SQL Injection", re.compile(rr"(?:UNION|SELECT|INSERT|DROP|;--|'--)", re.I), "CRITICAL"), ("XSS Attempt", re.compile(rr"<script|javascript:|onerror=", re.I), "HIGH"), ("Path Traversal", re.compile(rr"\.\./|\.\.\\|%2e%2e", re.I), "HIGH"), ("Scanner UA", re.compile(rr"nikto|sqlmap|nessus|masscan|zgrab", re.I), "MEDIUM"), ("PHP Shell", re.compile(rr"cmd=|exec\(|shell_exec|base64_decode", re.I), "CRITICAL"), ("500 Error", re.compile(rr'" 5\d\d '), "HIGH"), ("429 Rate Limit", re.compile(rr'" 429 '), "MEDIUM"), ("SSH Brute Force", re.compile(rr"Failed password|Invalid user"), "HIGH"), ] COLORS = {"CRITICAL":"\033[91m", "HIGH":"\033[93m", "MEDIUM":"\033[96m", "LOW":"\033[0m"} RESET = "\033[0m" def tail_file(path: str): p = Path(path) if not p.exists(): print(f"File not found: {path}"); sys.exit(1) print(f"\033[1mMonitoring {path} — press Ctrl+C to stop\033[0m\n") signal.signal(signal.SIGINT, lambda *_: (print("\nStopped."), sys.exit(0))) with p.open() as f: f.seek(0, 2) # seek to end while True: line = f.readline() if not line: time.sleep(0.1); continue for label, rx, sev in PATTERNS: if rx.search(line): ts = datetime.now().strftime("%H:%M:%S") color = COLORS.get(sev, "") print(f"{color}[{ts}] [{sev:<8}] {label}{RESET}") print(f" {line.rstrip()[:160]}\n") if __name__ == "__main__": tail_file(sys.argv[1] if len(sys.argv) > 1 else "/var/log/nginx/access.log")
Exports Windows Security events to CSV for analysis in Excel, Splunk, or any SIEM. Configurable event ID list and time window.
# Windows Security Event Exporter # Exports key security events to CSV — run as Administrator param( [int]$HoursBack = 24, [string]$OutputPath = "C:\Temp\security_events_$(Get-Date -f yyyyMMdd_HHmm).csv" ) $EventIDs = @(4624, 4625, 4634, 4648, 4672, 4688, 4698, 4720, 4728, 4740, 4776, 4946, 7045, 1102) Write-Host "Collecting events from last $HoursBack hours..." -ForegroundColor Cyan $results = foreach ($id in $EventIDs) { try { Get-WinEvent -FilterHashtable @{ LogName = 'Security' Id = $id StartTime = (Get-Date).AddHours(-$HoursBack) } -ErrorAction Stop | ForEach-Object { $xml = [xml]$_.ToXml() $data = @{} $xml.Event.EventData.Data | ForEach-Object { $data[$_.Name] = $_.'#text' } [PSCustomObject]@{ TimeCreated = $_.TimeCreated EventID = $_.Id Level = $_.LevelDisplayName Computer = $_.MachineName TargetUser = $data['TargetUserName'] SubjectUser = $data['SubjectUserName'] LogonType = $data['LogonType'] IpAddress = $data['IpAddress'] ProcessName = $data['NewProcessName'] ServiceName = $data['ServiceName'] Status = $data['Status'] SubStatus = $data['SubStatus'] Message = $_.Message -replace "`n"," " } } } catch [System.Exception] { # No events found for this ID in time window — skip silently } } $results | Sort-Object TimeCreated -Descending | Export-Csv $OutputPath -NoTypeInformation Write-Host "Exported $($results.Count) events to $OutputPath" -ForegroundColor Green