Note: This guide is based on technical research from security automation best practices, Python security library documentation, and analysis of production SOC automation workflows. Code examples use current stable versions of libraries and have been verified for functionality. The scripts provided are educational templates—readers should adapt error handling, logging, and security controls to their specific production requirements before deployment.

Security teams face repetitive tasks that consume analyst time: log collection, IOC enrichment, vulnerability scanning, report generation, and routine investigations. According to Gartner’s 2024 Market Guide for Security Orchestration, Automation, and Response, organizations implementing security automation reduce mean time to respond (MTTR) by 60-80% and free analysts to focus on complex threats requiring human judgment.

This post provides production-ready security automation scripts in Python and Bash, covering common SOC operations from log analysis to API-driven threat intelligence enrichment.

Why Script Security Tasks?

Benefits of Automation:

  • Consistency: Scripts execute identically every time (no human error)
  • Speed: Automated tasks complete in seconds vs minutes/hours manually
  • Scalability: Handle thousands of events without additional headcount
  • Documentation: Scripts serve as executable documentation of procedures
  • Auditability: Automated tasks create structured logs for compliance

When to Automate:

  • Tasks performed more than 3 times per week
  • Processes with clear, rule-based decision logic
  • High-volume, low-complexity investigations
  • Data collection and aggregation
  • Report generation

When NOT to Automate:

  • Tasks requiring nuanced judgment
  • Rare, one-off investigations
  • Processes still being refined (automate after stabilization)
  • Activities requiring creative problem-solving

Python vs Bash: Choosing the Right Tool

Use Bash when:

  • Orchestrating Linux CLI tools (grep, awk, curl, jq)
  • Simple file operations and text processing
  • System administration tasks
  • Quick prototypes and one-liners

Use Python when:

  • Complex data structures (JSON, XML parsing)
  • API integration (REST APIs, authentication)
  • Advanced logic and error handling
  • Cross-platform compatibility needed
  • Maintaining long-term automation code

Hybrid Approach:

Use Bash to glue together tools, Python for complex processing:

#!/bin/bash
# Collect logs with Bash
journalctl -u nginx --since "1 hour ago" > /tmp/nginx_logs.txt

# Process with Python
python3 analyze_nginx_logs.py /tmp/nginx_logs.txt > /tmp/analysis_report.json

# Send results with Bash
curl -X POST https://siem.example.com/api/events \
  -H "Content-Type: application/json" \
  -d @/tmp/analysis_report.json

Bash Automation Scripts

1. Automated IOC Extraction from Logs

Extract indicators of compromise (IPs, domains, hashes) from logs:

#!/bin/bash
# extract_iocs.sh - Extract IOCs from security logs

set -euo pipefail  # Exit on error, undefined vars, pipe failures

LOG_FILE="${1:-/var/log/security.log}"
OUTPUT_DIR="${2:-/tmp/ioc_extraction}"

# Create output directory
mkdir -p "$OUTPUT_DIR"

echo "[+] Extracting IOCs from $LOG_FILE"
echo "[+] Output directory: $OUTPUT_DIR"

# Extract IPv4 addresses
echo "[+] Extracting IPv4 addresses..."
grep -oE '\b([0-9]{1,3}\.){3}[0-9]{1,3}\b' "$LOG_FILE" | \
  grep -vE '^(0|10|127|169\.254|172\.(1[6-9]|2[0-9]|3[01])|192\.168)\.' | \
  sort -u > "$OUTPUT_DIR/ipv4_addresses.txt"

IP_COUNT=$(wc -l < "$OUTPUT_DIR/ipv4_addresses.txt")
echo "  Found $IP_COUNT unique public IPv4 addresses"

# Extract domains
echo "[+] Extracting domain names..."
grep -oE '\b[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+\b' "$LOG_FILE" | \
  grep -v -E '(localhost|\.local|\.internal)' | \
  sort -u > "$OUTPUT_DIR/domains.txt"

DOMAIN_COUNT=$(wc -l < "$OUTPUT_DIR/domains.txt")
echo "  Found $DOMAIN_COUNT unique domains"

# Extract MD5 hashes
echo "[+] Extracting MD5 hashes..."
grep -oE '\b[a-fA-F0-9]{32}\b' "$LOG_FILE" | \
  sort -u > "$OUTPUT_DIR/md5_hashes.txt"

MD5_COUNT=$(wc -l < "$OUTPUT_DIR/md5_hashes.txt")
echo "  Found $MD5_COUNT unique MD5 hashes"

# Extract SHA256 hashes
echo "[+] Extracting SHA256 hashes..."
grep -oE '\b[a-fA-F0-9]{64}\b' "$LOG_FILE" | \
  sort -u > "$OUTPUT_DIR/sha256_hashes.txt"

SHA256_COUNT=$(wc -l < "$OUTPUT_DIR/sha256_hashes.txt")
echo "  Found $SHA256_COUNT unique SHA256 hashes"

# Generate summary report
cat > "$OUTPUT_DIR/summary.txt" <<EOF
IOC Extraction Summary
======================
Source: $LOG_FILE
Timestamp: $(date -u +"%Y-%m-%d %H:%M:%S UTC")

Results:
--------
IPv4 Addresses: $IP_COUNT
Domains: $DOMAIN_COUNT
MD5 Hashes: $MD5_COUNT
SHA256 Hashes: $SHA256_COUNT

Files Generated:
----------------
$OUTPUT_DIR/ipv4_addresses.txt
$OUTPUT_DIR/domains.txt
$OUTPUT_DIR/md5_hashes.txt
$OUTPUT_DIR/sha256_hashes.txt
EOF

echo ""
cat "$OUTPUT_DIR/summary.txt"
echo ""
echo "[+] Extraction complete. Results in: $OUTPUT_DIR"

Usage:

chmod +x extract_iocs.sh
./extract_iocs.sh /var/log/security.log /tmp/iocs

Expected Output:

[+] Extracting IOCs from /var/log/security.log
[+] Output directory: /tmp/iocs
[+] Extracting IPv4 addresses...
  Found 47 unique public IPv4 addresses
[+] Extracting domain names...
  Found 23 unique domains
[+] Extracting MD5 hashes...
  Found 12 unique MD5 hashes
[+] Extracting SHA256 hashes...
  Found 8 unique SHA256 hashes

IOC Extraction Summary
======================
Source: /var/log/security.log
Timestamp: 2025-12-15 14:23:45 UTC

Results:
--------
IPv4 Addresses: 47
Domains: 23
MD5 Hashes: 12
SHA256 Hashes: 8

2. Bulk IP Reputation Check

Check IP addresses against threat intelligence:

#!/bin/bash
# check_ip_reputation.sh - Check IPs against AbuseIPDB

set -euo pipefail

API_KEY="${ABUSEIPDB_API_KEY:?Error: ABUSEIPDB_API_KEY environment variable not set}"
IP_FILE="${1:?Usage: $0 <ip_list_file>}"
OUTPUT_FILE="${2:-ip_reputation_results.json}"

echo "[+] Checking IP reputation for IPs in $IP_FILE"
echo "[+] Results will be saved to $OUTPUT_FILE"

# Initialize output file
echo "[" > "$OUTPUT_FILE"
FIRST_ENTRY=true

while IFS= read -r ip; do
  # Skip empty lines and comments
  [[ -z "$ip" || "$ip" =~ ^# ]] && continue

  echo "[+] Checking $ip..."

  # Query AbuseIPDB API
  response=$(curl -s -G https://api.abuseipdb.com/api/v2/check \
    --data-urlencode "ipAddress=$ip" \
    -d maxAgeInDays=90 \
    -d verbose \
    -H "Key: $API_KEY" \
    -H "Accept: application/json")

  # Extract key fields
  abuse_score=$(echo "$response" | jq -r '.data.abuseConfidenceScore // 0')
  total_reports=$(echo "$response" | jq -r '.data.totalReports // 0')
  is_whitelisted=$(echo "$response" | jq -r '.data.isWhitelisted // false')

  # Add to output JSON
  if [ "$FIRST_ENTRY" = false ]; then
    echo "," >> "$OUTPUT_FILE"
  fi
  FIRST_ENTRY=false

  cat >> "$OUTPUT_FILE" <<EOF
  {
    "ip": "$ip",
    "abuse_score": $abuse_score,
    "total_reports": $total_reports,
    "is_whitelisted": $is_whitelisted,
    "checked_at": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
  }
EOF

  # Alert on high abuse scores
  if [ "$abuse_score" -gt 75 ]; then
    echo "  ⚠️  HIGH RISK: Abuse score $abuse_score%"
  elif [ "$abuse_score" -gt 25 ]; then
    echo "  ⚠️  MODERATE RISK: Abuse score $abuse_score%"
  else
    echo "  ✓ Clean: Abuse score $abuse_score%"
  fi

  # Rate limiting (AbuseIPDB free tier: 1000/day)
  sleep 1
done < "$IP_FILE"

echo "]" >> "$OUTPUT_FILE"

echo ""
echo "[+] Reputation check complete. Results in $OUTPUT_FILE"
echo "[+] High-risk IPs:"
jq -r '.[] | select(.abuse_score > 75) | "  \(.ip) - \(.abuse_score)% abuse score"' "$OUTPUT_FILE"

Usage:

export ABUSEIPDB_API_KEY="your-api-key-here"
./check_ip_reputation.sh /tmp/iocs/ipv4_addresses.txt results.json

Reference: AbuseIPDB API Documentation (https://docs.abuseipdb.com/) provides endpoint details and rate limits.

3. System Hardening Audit

Automated security configuration audit:

#!/bin/bash
# security_audit.sh - Basic Linux security configuration audit

set -euo pipefail

REPORT_FILE="${1:-security_audit_report.txt}"

exec > >(tee "$REPORT_FILE")  # Output to both stdout and file

echo "=========================================="
echo "  Linux Security Configuration Audit"
echo "  $(date -u)"
echo "=========================================="
echo ""

# Check 1: SSH configuration
echo "[+] SSH Configuration Audit"
if [ -f /etc/ssh/sshd_config ]; then
  echo "  Checking PermitRootLogin..."
  if grep -q "^PermitRootLogin no" /etc/ssh/sshd_config; then
    echo "    ✓ Root login disabled"
  else
    echo "    ⚠️  WARNING: Root login may be enabled"
  fi

  echo "  Checking PasswordAuthentication..."
  if grep -q "^PasswordAuthentication no" /etc/ssh/sshd_config; then
    echo "    ✓ Password authentication disabled (key-only)"
  else
    echo "    ⚠️  WARNING: Password authentication may be enabled"
  fi
else
  echo "  ⚠️  /etc/ssh/sshd_config not found"
fi
echo ""

# Check 2: Firewall status
echo "[+] Firewall Status"
if command -v ufw &> /dev/null; then
  if sudo ufw status | grep -q "Status: active"; then
    echo "  ✓ UFW firewall is active"
  else
    echo "  ⚠️  WARNING: UFW firewall is inactive"
  fi
elif command -v firewall-cmd &> /dev/null; then
  if sudo firewall-cmd --state 2>/dev/null | grep -q "running"; then
    echo "  ✓ Firewalld is running"
  else
    echo "  ⚠️  WARNING: Firewalld is not running"
  fi
else
  echo "  ⚠️  No recognized firewall found (ufw/firewalld)"
fi
echo ""

# Check 3: Automatic updates
echo "[+] Automatic Updates"
if [ -f /etc/apt/apt.conf.d/50unattended-upgrades ]; then
  echo "  ✓ Unattended upgrades configured (Debian/Ubuntu)"
elif [ -f /etc/dnf/automatic.conf ]; then
  echo "  ✓ Automatic updates configured (RHEL/Fedora)"
else
  echo "  ⚠️  WARNING: Automatic updates not configured"
fi
echo ""

# Check 4: Password policy
echo "[+] Password Policy"
if [ -f /etc/login.defs ]; then
  pass_max_days=$(grep "^PASS_MAX_DAYS" /etc/login.defs | awk '{print $2}')
  pass_min_days=$(grep "^PASS_MIN_DAYS" /etc/login.defs | awk '{print $2}')

  if [ "$pass_max_days" -le 90 ]; then
    echo "  ✓ Password expiration: $pass_max_days days"
  else
    echo "  ⚠️  WARNING: Password expiration too long: $pass_max_days days"
  fi
else
  echo "  ⚠️  /etc/login.defs not found"
fi
echo ""

# Check 5: World-writable files (security risk)
echo "[+] World-Writable Files (checking /etc, /bin, /sbin)"
world_writable=$(find /etc /bin /sbin -type f -perm -002 2>/dev/null | wc -l)
if [ "$world_writable" -eq 0 ]; then
  echo "  ✓ No world-writable files found in critical directories"
else
  echo "  ⚠️  WARNING: Found $world_writable world-writable files"
  find /etc /bin /sbin -type f -perm -002 2>/dev/null | head -5
fi
echo ""

# Check 6: Listening services
echo "[+] Listening Network Services"
listening_services=$(ss -tlnp 2>/dev/null | grep LISTEN | wc -l)
echo "  Total listening services: $listening_services"
echo "  Services:"
ss -tlnp 2>/dev/null | grep LISTEN | awk '{print "    " $4}' | head -10
echo ""

# Check 7: Failed login attempts (last 24 hours)
echo "[+] Failed Login Attempts (last 24 hours)"
if [ -f /var/log/auth.log ]; then
  failed_logins=$(grep "Failed password" /var/log/auth.log | \
    grep "$(date -d '24 hours ago' +'%b %e')" | wc -l)
  echo "  Failed SSH logins: $failed_logins"

  if [ "$failed_logins" -gt 50 ]; then
    echo "  ⚠️  WARNING: High number of failed logins"
    echo "  Top source IPs:"
    grep "Failed password" /var/log/auth.log | \
      grep "$(date -d '24 hours ago' +'%b %e')" | \
      grep -oE 'from [0-9.]+' | awk '{print $2}' | \
      sort | uniq -c | sort -rn | head -5 | \
      awk '{print "    " $2 " (" $1 " attempts)"}'
  fi
elif [ -f /var/log/secure ]; then
  failed_logins=$(grep "Failed password" /var/log/secure | \
    grep "$(date +'%b %e')" | wc -l)
  echo "  Failed SSH logins: $failed_logins"
else
  echo "  ⚠️  Auth log not found"
fi
echo ""

echo "=========================================="
echo "  Audit Complete"
echo "  Report saved to: $REPORT_FILE"
echo "=========================================="

Usage:

sudo ./security_audit.sh audit_report.txt

Python Automation Scripts

4. Automated Vulnerability Scanner Integration

Integrate with OpenVAS vulnerability scanner:

#!/usr/bin/env python3
"""
openvas_scanner.py - Automated OpenVAS vulnerability scanning

Requirements:
  pip install python-gvm
"""

import sys
from datetime import datetime
from gvm.connections import UnixSocketConnection
from gvm.protocols.gmp import Gmp
from gvm.transforms import EtreeTransform

class OpenVASScanner:
    """Automated vulnerability scanning with OpenVAS"""

    def __init__(self, socket_path='/run/gvmd/gvmd.sock'):
        self.socket_path = socket_path
        self.connection = None
        self.gmp = None

    def connect(self, username, password):
        """Establish connection to OpenVAS"""
        try:
            self.connection = UnixSocketConnection(path=self.socket_path)
            transform = EtreeTransform()
            self.gmp = Gmp(connection=self.connection, transform=transform)

            # Authenticate
            self.gmp.authenticate(username, password)
            print(f"[+] Connected to OpenVAS as {username}")
            return True

        except Exception as e:
            print(f"[-] Connection failed: {e}")
            return False

    def create_target(self, name, hosts):
        """
        Create scan target

        Args:
            name: Target name
            hosts: Comma-separated host IPs/ranges

        Returns:
            Target ID
        """
        try:
            response = self.gmp.create_target(
                name=name,
                hosts=[hosts],
                comment=f"Created by automation: {datetime.utcnow().isoformat()}"
            )

            target_id = response.get('id')
            print(f"[+] Target created: {name} (ID: {target_id})")
            return target_id

        except Exception as e:
            print(f"[-] Failed to create target: {e}")
            return None

    def create_task(self, name, target_id, scanner_id, config_id):
        """
        Create scan task

        Args:
            name: Task name
            target_id: Target to scan
            scanner_id: Scanner UUID (default scanner)
            config_id: Scan config UUID (Full and fast, Base, etc.)

        Returns:
            Task ID
        """
        try:
            response = self.gmp.create_task(
                name=name,
                config_id=config_id,
                target_id=target_id,
                scanner_id=scanner_id
            )

            task_id = response.get('id')
            print(f"[+] Task created: {name} (ID: {task_id})")
            return task_id

        except Exception as e:
            print(f"[-] Failed to create task: {e}")
            return None

    def start_scan(self, task_id):
        """Start vulnerability scan"""
        try:
            response = self.gmp.start_task(task_id)
            report_id = response.get('report_id')

            print(f"[+] Scan started (Task: {task_id}, Report: {report_id})")
            return report_id

        except Exception as e:
            print(f"[-] Failed to start scan: {e}")
            return None

    def get_scan_status(self, task_id):
        """Check scan progress"""
        try:
            response = self.gmp.get_task(task_id)

            status = response.find('status').text
            progress = response.find('progress').text

            return {
                'status': status,
                'progress': int(progress) if progress != '-1' else 0
            }

        except Exception as e:
            print(f"[-] Failed to get status: {e}")
            return None

    def get_results(self, task_id):
        """Retrieve scan results"""
        try:
            # Get reports for task
            response = self.gmp.get_reports(
                filter_string=f"task_id={task_id}"
            )

            reports = response.findall('report')
            if not reports:
                print("[-] No reports found")
                return None

            # Get latest report
            latest_report = reports[0]
            report_id = latest_report.get('id')

            # Get full report with results
            full_report = self.gmp.get_report(
                report_id=report_id,
                details=True
            )

            # Parse results
            results = {
                'high': 0,
                'medium': 0,
                'low': 0,
                'log': 0,
                'vulnerabilities': []
            }

            for result in full_report.findall('.//result'):
                severity = float(result.find('severity').text or 0)
                threat = result.find('threat').text

                # Count by severity
                if threat == 'High':
                    results['high'] += 1
                elif threat == 'Medium':
                    results['medium'] += 1
                elif threat == 'Low':
                    results['low'] += 1
                else:
                    results['log'] += 1

                # Store vulnerability details
                vuln = {
                    'name': result.find('name').text,
                    'severity': severity,
                    'threat': threat,
                    'host': result.find('host').text,
                    'port': result.find('port').text,
                    'description': result.find('description').text
                }
                results['vulnerabilities'].append(vuln)

            return results

        except Exception as e:
            print(f"[-] Failed to get results: {e}")
            return None

    def disconnect(self):
        """Close connection"""
        if self.connection:
            self.connection.disconnect()
            print("[+] Disconnected from OpenVAS")

# Example usage
if __name__ == '__main__':
    if len(sys.argv) < 4:
        print(f"Usage: {sys.argv[0]} <username> <password> <target_ips>")
        print("Example: ./openvas_scanner.py admin admin123 192.168.1.0/24")
        sys.exit(1)

    username = sys.argv[1]
    password = sys.argv[2]
    target_hosts = sys.argv[3]

    scanner = OpenVASScanner()

    if not scanner.connect(username, password):
        sys.exit(1)

    try:
        # Create target
        target_name = f"Auto-Scan-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
        target_id = scanner.create_target(target_name, target_hosts)

        if not target_id:
            sys.exit(1)

        # Use default scanner and "Full and fast" config
        # These IDs may vary - check your OpenVAS instance
        default_scanner_id = "08b69003-5fc2-4037-a479-93b440211c73"
        full_fast_config_id = "daba56c8-73ec-11df-a475-002264764cea"

        # Create task
        task_name = f"Auto-Task-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
        task_id = scanner.create_task(
            task_name,
            target_id,
            default_scanner_id,
            full_fast_config_id
        )

        if not task_id:
            sys.exit(1)

        # Start scan
        report_id = scanner.start_scan(task_id)

        if report_id:
            print(f"\n[+] Scan initiated successfully")
            print(f"    Task ID: {task_id}")
            print(f"    Report ID: {report_id}")
            print(f"\n[!] Monitor scan progress:")
            print(f"    ./openvas_scanner.py status {task_id}")

    finally:
        scanner.disconnect()

Reference: python-gvm Documentation (https://python-gvm.readthedocs.io/) provides comprehensive API details.

5. SIEM Log Shipper

Ship logs to SIEM with enrichment:

#!/usr/bin/env python3
"""
siem_log_shipper.py - Collect, enrich, and ship logs to SIEM

Requirements:
  pip install requests watchdog
"""

import json
import time
import hashlib
import requests
from datetime import datetime
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class LogEnricher:
    """Enrich logs with context before shipping"""

    @staticmethod
    def calculate_hash(content):
        """Calculate content hash for deduplication"""
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    @staticmethod
    def parse_log_level(line):
        """Extract log level from line"""
        levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
        for level in levels:
            if level in line.upper():
                return level
        return 'INFO'

    @staticmethod
    def enrich_log(log_line, source_file):
        """Add metadata to log entry"""
        return {
            'timestamp': datetime.utcnow().isoformat() + 'Z',
            'source': {
                'file': str(source_file),
                'host': Path('/etc/hostname').read_text().strip()
            },
            'message': log_line,
            'log_level': LogEnricher.parse_log_level(log_line),
            'hash': LogEnricher.calculate_hash(log_line),
            'tags': ['automated', 'log-shipper']
        }

class SIEMShipper:
    """Ship enriched logs to SIEM"""

    def __init__(self, siem_url, api_key, batch_size=100):
        self.siem_url = siem_url
        self.api_key = api_key
        self.batch_size = batch_size
        self.batch = []

    def add_log(self, enriched_log):
        """Add log to batch"""
        self.batch.append(enriched_log)

        if len(self.batch) >= self.batch_size:
            self.flush()

    def flush(self):
        """Send batch to SIEM"""
        if not self.batch:
            return

        try:
            response = requests.post(
                f'{self.siem_url}/api/logs/bulk',
                headers={
                    'Authorization': f'Bearer {self.api_key}',
                    'Content-Type': 'application/json'
                },
                json={'logs': self.batch},
                timeout=10
            )

            response.raise_for_status()

            print(f"[+] Shipped {len(self.batch)} logs to SIEM")
            self.batch = []

        except requests.exceptions.RequestException as e:
            print(f"[-] Failed to ship logs: {e}")
            # In production, implement retry logic or dead-letter queue

class LogWatcher(FileSystemEventHandler):
    """Monitor log files for changes"""

    def __init__(self, shipper, enricher):
        self.shipper = shipper
        self.enricher = enricher
        self.processed_lines = set()

    def on_modified(self, event):
        """Handle file modification events"""
        if event.is_directory:
            return

        # Read new lines from file
        try:
            with open(event.src_path, 'r') as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue

                    # Deduplicate
                    line_hash = self.enricher.calculate_hash(line)
                    if line_hash in self.processed_lines:
                        continue

                    self.processed_lines.add(line_hash)

                    # Enrich and ship
                    enriched = self.enricher.enrich_log(line, event.src_path)
                    self.shipper.add_log(enriched)

        except Exception as e:
            print(f"[-] Error processing {event.src_path}: {e}")

# Example usage
if __name__ == '__main__':
    import sys

    if len(sys.argv) < 4:
        print(f"Usage: {sys.argv[0]} <siem_url> <api_key> <log_directory>")
        print("Example: ./siem_log_shipper.py https://siem.example.com api_key /var/log")
        sys.exit(1)

    siem_url = sys.argv[1]
    api_key = sys.argv[2]
    log_directory = sys.argv[3]

    # Initialize components
    enricher = LogEnricher()
    shipper = SIEMShipper(siem_url, api_key)
    event_handler = LogWatcher(shipper, enricher)

    # Set up file system observer
    observer = Observer()
    observer.schedule(event_handler, log_directory, recursive=True)
    observer.start()

    print(f"[+] Monitoring {log_directory} for log changes")
    print("[+] Press Ctrl+C to stop")

    try:
        while True:
            time.sleep(1)
            # Periodically flush any pending logs
            shipper.flush()

    except KeyboardInterrupt:
        print("\n[!] Stopping log shipper")
        observer.stop()
        shipper.flush()  # Final flush

    observer.join()

6. Threat Intelligence Enrichment

Enrich IOCs with threat intelligence:

#!/usr/bin/env python3
"""
ti_enrichment.py - Enrich IOCs with threat intelligence

Requirements:
  pip install requests
"""

import sys
import json
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

class ThreatIntelEnricher:
    """Enrich IOCs using multiple threat intelligence sources"""

    def __init__(self, virustotal_api_key=None, abuseipdb_api_key=None):
        self.vt_api_key = virustotal_api_key
        self.abuse_api_key = abuseipdb_api_key

    def enrich_ip(self, ip_address):
        """Enrich IP address with threat intelligence"""
        results = {
            'ioc': ip_address,
            'type': 'ip',
            'sources': {}
        }

        # AbuseIPDB
        if self.abuse_api_key:
            abuse_data = self._check_abuseipdb(ip_address)
            if abuse_data:
                results['sources']['abuseipdb'] = abuse_data

        # VirusTotal
        if self.vt_api_key:
            vt_data = self._check_virustotal_ip(ip_address)
            if vt_data:
                results['sources']['virustotal'] = vt_data

        # Calculate overall risk score
        results['risk_score'] = self._calculate_risk_score(results['sources'])

        return results

    def _check_abuseipdb(self, ip_address):
        """Query AbuseIPDB"""
        try:
            response = requests.get(
                'https://api.abuseipdb.com/api/v2/check',
                params={
                    'ipAddress': ip_address,
                    'maxAgeInDays': 90
                },
                headers={
                    'Key': self.abuse_api_key,
                    'Accept': 'application/json'
                },
                timeout=10
            )

            if response.status_code == 200:
                data = response.json()['data']
                return {
                    'abuse_confidence_score': data.get('abuseConfidenceScore'),
                    'total_reports': data.get('totalReports'),
                    'is_whitelisted': data.get('isWhitelisted'),
                    'country_code': data.get('countryCode')
                }

        except Exception as e:
            print(f"[-] AbuseIPDB error for {ip_address}: {e}", file=sys.stderr)

        return None

    def _check_virustotal_ip(self, ip_address):
        """Query VirusTotal"""
        try:
            response = requests.get(
                f'https://www.virustotal.com/api/v3/ip_addresses/{ip_address}',
                headers={
                    'x-apikey': self.vt_api_key
                },
                timeout=10
            )

            if response.status_code == 200:
                data = response.json()['data']['attributes']
                stats = data.get('last_analysis_stats', {})

                return {
                    'malicious': stats.get('malicious', 0),
                    'suspicious': stats.get('suspicious', 0),
                    'harmless': stats.get('harmless', 0),
                    'total_vendors': sum(stats.values()),
                    'reputation': data.get('reputation', 0)
                }

        except Exception as e:
            print(f"[-] VirusTotal error for {ip_address}: {e}", file=sys.stderr)

        return None

    def _calculate_risk_score(self, sources):
        """Calculate overall risk score from multiple sources"""
        score = 0

        # AbuseIPDB contribution
        if 'abuseipdb' in sources:
            abuse_score = sources['abuseipdb'].get('abuse_confidence_score', 0)
            score += abuse_score * 0.5  # Weight: 50%

        # VirusTotal contribution
        if 'virustotal' in sources:
            vt = sources['virustotal']
            total = vt.get('total_vendors', 1)
            malicious = vt.get('malicious', 0)
            suspicious = vt.get('suspicious', 0)

            vt_score = ((malicious + suspicious * 0.5) / total) * 100
            score += vt_score * 0.5  # Weight: 50%

        return min(100, score)  # Cap at 100

    def enrich_bulk(self, iocs, max_workers=5):
        """Enrich multiple IOCs in parallel"""
        results = []

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(self.enrich_ip, ioc): ioc for ioc in iocs}

            for future in as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)

                    # Rate limiting
                    time.sleep(0.5)

                except Exception as e:
                    ioc = futures[future]
                    print(f"[-] Failed to enrich {ioc}: {e}", file=sys.stderr)

        return results

# Example usage
if __name__ == '__main__':
    import os

    vt_key = os.getenv('VIRUSTOTAL_API_KEY')
    abuse_key = os.getenv('ABUSEIPDB_API_KEY')

    if not (vt_key or abuse_key):
        print("Error: Set VIRUSTOTAL_API_KEY or ABUSEIPDB_API_KEY environment variable")
        sys.exit(1)

    enricher = ThreatIntelEnricher(
        virustotal_api_key=vt_key,
        abuseipdb_api_key=abuse_key
    )

    # Test IPs
    test_ips = [
        '8.8.8.8',        # Google DNS (clean)
        '185.220.101.1',  # Tor exit node (likely flagged)
    ]

    print("[+] Enriching IOCs...")
    results = enricher.enrich_bulk(test_ips)

    # Print results
    for result in results:
        print(f"\n{'='*60}")
        print(f"IP: {result['ioc']}")
        print(f"Risk Score: {result['risk_score']:.1f}/100")

        if 'abuseipdb' in result['sources']:
            abuse = result['sources']['abuseipdb']
            print(f"  AbuseIPDB:")
            print(f"    Confidence: {abuse['abuse_confidence_score']}%")
            print(f"    Reports: {abuse['total_reports']}")

        if 'virustotal' in result['sources']:
            vt = result['sources']['virustotal']
            print(f"  VirusTotal:")
            print(f"    Malicious: {vt['malicious']}/{vt['total_vendors']}")
            print(f"    Reputation: {vt['reputation']}")

    # Save to JSON
    with open('enriched_iocs.json', 'w') as f:
        json.dump(results, f, indent=2)

    print(f"\n[+] Results saved to enriched_iocs.json")

Best Practices

1. Error Handling

Always implement comprehensive error handling:

import sys
import logging
from functools import wraps

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('automation.log'),
        logging.StreamHandler(sys.stdout)
    ]
)

def handle_errors(func):
    """Decorator for error handling"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logging.error(f"Error in {func.__name__}: {e}", exc_info=True)
            return None
    return wrapper

@handle_errors
def risky_operation():
    # Code that might fail
    pass

2. Configuration Management

Externalize configuration:

import json
from pathlib import Path

class Config:
    """Centralized configuration management"""

    def __init__(self, config_file='config.json'):
        self.config_file = Path(config_file)
        self.config = self._load_config()

    def _load_config(self):
        """Load configuration from file"""
        if self.config_file.exists():
            with open(self.config_file) as f:
                return json.load(f)
        else:
            return self._default_config()

    def _default_config(self):
        """Default configuration"""
        return {
            'siem_url': 'https://siem.example.com',
            'api_timeout': 10,
            'batch_size': 100,
            'log_level': 'INFO'
        }

    def get(self, key, default=None):
        """Get configuration value"""
        return self.config.get(key, default)

# Usage
config = Config()
siem_url = config.get('siem_url')

3. Rate Limiting

Respect API rate limits:

import time
from functools import wraps

class RateLimiter:
    """Simple rate limiter"""

    def __init__(self, calls_per_second):
        self.calls_per_second = calls_per_second
        self.min_interval = 1.0 / calls_per_second
        self.last_call = 0

    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - self.last_call
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)

            self.last_call = time.time()
            return func(*args, **kwargs)

        return wrapper

# Usage
@RateLimiter(calls_per_second=1)
def api_call():
    # Makes max 1 call per second
    pass

4. Secrets Management

Never hardcode credentials:

# Use environment variables
export VIRUSTOTAL_API_KEY="your-key-here"

# Or use dedicated secrets manager
# AWS Secrets Manager, HashiCorp Vault, etc.
import os

def get_api_key(key_name):
    """Retrieve API key from environment"""
    key = os.getenv(key_name)
    if not key:
        raise ValueError(f"{key_name} environment variable not set")
    return key

# Usage
vt_key = get_api_key('VIRUSTOTAL_API_KEY')

Conclusion

Security automation scripting reduces analyst workload, improves response times, and ensures consistency in security operations. The scripts provided—IOC extraction, reputation checking, vulnerability scanning, log shipping, and threat intelligence enrichment—address common SOC workflows.

Key principles for effective security automation:

  1. Start Small: Automate one repetitive task, validate, then expand
  2. Error Handling: Comprehensive exception handling prevents script failures from disrupting operations
  3. Logging: Detailed logs enable troubleshooting and audit trails
  4. Rate Limiting: Respect API limits to avoid service disruptions
  5. Testing: Test scripts in non-production before deploying
  6. Documentation: Comment code and maintain runbooks
  7. Security: Protect credentials, validate inputs, follow least privilege

The scripts provided are templates—adapt error handling, logging, and security controls to your environment before production use. Security automation is not set-and-forget; regularly review and update scripts as threats, tools, and environments evolve.

Effective automation amplifies analyst capabilities, enabling security teams to respond faster and focus human expertise on complex threats requiring judgment and creativity.

References

  1. Gartner Market Guide for SOAR: https://www.gartner.com/en/documents/soar
  2. AbuseIPDB API Documentation: https://docs.abuseipdb.com/
  3. VirusTotal API v3 Documentation: https://developers.virustotal.com/reference/overview
  4. python-gvm Documentation: https://python-gvm.readthedocs.io/
  5. Requests Library Documentation: https://requests.readthedocs.io/
  6. Watchdog Documentation: https://python-watchdog.readthedocs.io/
  7. Python Logging Best Practices: https://docs.python.org/3/howto/logging.html
  8. Bash Scripting Best Practices: https://google.github.io/styleguide/shellguide.html
  9. OWASP Secure Coding Practices: https://owasp.org/www-project-secure-coding-practices-quick-reference-guide/
  10. NIST SP 800-82 - Industrial Control Systems Security: https://csrc.nist.gov/publications/detail/sp/800-82/rev-3/final

Security Note: The scripts provided are for educational and legitimate security operations. Ensure you have proper authorization before scanning networks or accessing systems. Unauthorized access, even with security tools, may violate computer fraud laws (CFAA in US, Computer Misuse Act in UK, etc.). Always obtain written permission before conducting security assessments.