Note: This guide is based on technical research from security automation best practices, Python security library documentation, and analysis of production SOC automation workflows. Code examples use current stable versions of libraries and have been verified for functionality. The scripts provided are educational templates—readers should adapt error handling, logging, and security controls to their specific production requirements before deployment.
Security teams face repetitive tasks that consume analyst time: log collection, IOC enrichment, vulnerability scanning, report generation, and routine investigations. According to Gartner’s 2024 Market Guide for Security Orchestration, Automation, and Response, organizations implementing security automation reduce mean time to respond (MTTR) by 60-80% and free analysts to focus on complex threats requiring human judgment.
This post provides production-ready security automation scripts in Python and Bash, covering common SOC operations from log analysis to API-driven threat intelligence enrichment.
Why Script Security Tasks?
Benefits of Automation:
- Consistency: Scripts execute identically every time (no human error)
- Speed: Automated tasks complete in seconds vs minutes/hours manually
- Scalability: Handle thousands of events without additional headcount
- Documentation: Scripts serve as executable documentation of procedures
- Auditability: Automated tasks create structured logs for compliance
When to Automate:
- Tasks performed more than 3 times per week
- Processes with clear, rule-based decision logic
- High-volume, low-complexity investigations
- Data collection and aggregation
- Report generation
When NOT to Automate:
- Tasks requiring nuanced judgment
- Rare, one-off investigations
- Processes still being refined (automate after stabilization)
- Activities requiring creative problem-solving
Python vs Bash: Choosing the Right Tool
Use Bash when:
- Orchestrating Linux CLI tools (grep, awk, curl, jq)
- Simple file operations and text processing
- System administration tasks
- Quick prototypes and one-liners
Use Python when:
- Complex data structures (JSON, XML parsing)
- API integration (REST APIs, authentication)
- Advanced logic and error handling
- Cross-platform compatibility needed
- Maintaining long-term automation code
Hybrid Approach:
Use Bash to glue together tools, Python for complex processing:
#!/bin/bash
# Collect logs with Bash
journalctl -u nginx --since "1 hour ago" > /tmp/nginx_logs.txt
# Process with Python
python3 analyze_nginx_logs.py /tmp/nginx_logs.txt > /tmp/analysis_report.json
# Send results with Bash
curl -X POST https://siem.example.com/api/events \
-H "Content-Type: application/json" \
-d @/tmp/analysis_report.json
Bash Automation Scripts
1. Automated IOC Extraction from Logs
Extract indicators of compromise (IPs, domains, hashes) from logs:
#!/bin/bash
# extract_iocs.sh - Extract IOCs from security logs
set -euo pipefail # Exit on error, undefined vars, pipe failures
LOG_FILE="${1:-/var/log/security.log}"
OUTPUT_DIR="${2:-/tmp/ioc_extraction}"
# Create output directory
mkdir -p "$OUTPUT_DIR"
echo "[+] Extracting IOCs from $LOG_FILE"
echo "[+] Output directory: $OUTPUT_DIR"
# Extract IPv4 addresses
echo "[+] Extracting IPv4 addresses..."
grep -oE '\b([0-9]{1,3}\.){3}[0-9]{1,3}\b' "$LOG_FILE" | \
grep -vE '^(0|10|127|169\.254|172\.(1[6-9]|2[0-9]|3[01])|192\.168)\.' | \
sort -u > "$OUTPUT_DIR/ipv4_addresses.txt"
IP_COUNT=$(wc -l < "$OUTPUT_DIR/ipv4_addresses.txt")
echo " Found $IP_COUNT unique public IPv4 addresses"
# Extract domains
echo "[+] Extracting domain names..."
grep -oE '\b[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+\b' "$LOG_FILE" | \
grep -v -E '(localhost|\.local|\.internal)' | \
sort -u > "$OUTPUT_DIR/domains.txt"
DOMAIN_COUNT=$(wc -l < "$OUTPUT_DIR/domains.txt")
echo " Found $DOMAIN_COUNT unique domains"
# Extract MD5 hashes
echo "[+] Extracting MD5 hashes..."
grep -oE '\b[a-fA-F0-9]{32}\b' "$LOG_FILE" | \
sort -u > "$OUTPUT_DIR/md5_hashes.txt"
MD5_COUNT=$(wc -l < "$OUTPUT_DIR/md5_hashes.txt")
echo " Found $MD5_COUNT unique MD5 hashes"
# Extract SHA256 hashes
echo "[+] Extracting SHA256 hashes..."
grep -oE '\b[a-fA-F0-9]{64}\b' "$LOG_FILE" | \
sort -u > "$OUTPUT_DIR/sha256_hashes.txt"
SHA256_COUNT=$(wc -l < "$OUTPUT_DIR/sha256_hashes.txt")
echo " Found $SHA256_COUNT unique SHA256 hashes"
# Generate summary report
cat > "$OUTPUT_DIR/summary.txt" <<EOF
IOC Extraction Summary
======================
Source: $LOG_FILE
Timestamp: $(date -u +"%Y-%m-%d %H:%M:%S UTC")
Results:
--------
IPv4 Addresses: $IP_COUNT
Domains: $DOMAIN_COUNT
MD5 Hashes: $MD5_COUNT
SHA256 Hashes: $SHA256_COUNT
Files Generated:
----------------
$OUTPUT_DIR/ipv4_addresses.txt
$OUTPUT_DIR/domains.txt
$OUTPUT_DIR/md5_hashes.txt
$OUTPUT_DIR/sha256_hashes.txt
EOF
echo ""
cat "$OUTPUT_DIR/summary.txt"
echo ""
echo "[+] Extraction complete. Results in: $OUTPUT_DIR"
Usage:
chmod +x extract_iocs.sh
./extract_iocs.sh /var/log/security.log /tmp/iocs
Expected Output:
[+] Extracting IOCs from /var/log/security.log
[+] Output directory: /tmp/iocs
[+] Extracting IPv4 addresses...
Found 47 unique public IPv4 addresses
[+] Extracting domain names...
Found 23 unique domains
[+] Extracting MD5 hashes...
Found 12 unique MD5 hashes
[+] Extracting SHA256 hashes...
Found 8 unique SHA256 hashes
IOC Extraction Summary
======================
Source: /var/log/security.log
Timestamp: 2025-12-15 14:23:45 UTC
Results:
--------
IPv4 Addresses: 47
Domains: 23
MD5 Hashes: 12
SHA256 Hashes: 8
2. Bulk IP Reputation Check
Check IP addresses against threat intelligence:
#!/bin/bash
# check_ip_reputation.sh - Check IPs against AbuseIPDB
set -euo pipefail
API_KEY="${ABUSEIPDB_API_KEY:?Error: ABUSEIPDB_API_KEY environment variable not set}"
IP_FILE="${1:?Usage: $0 <ip_list_file>}"
OUTPUT_FILE="${2:-ip_reputation_results.json}"
echo "[+] Checking IP reputation for IPs in $IP_FILE"
echo "[+] Results will be saved to $OUTPUT_FILE"
# Initialize output file
echo "[" > "$OUTPUT_FILE"
FIRST_ENTRY=true
while IFS= read -r ip; do
# Skip empty lines and comments
[[ -z "$ip" || "$ip" =~ ^# ]] && continue
echo "[+] Checking $ip..."
# Query AbuseIPDB API
response=$(curl -s -G https://api.abuseipdb.com/api/v2/check \
--data-urlencode "ipAddress=$ip" \
-d maxAgeInDays=90 \
-d verbose \
-H "Key: $API_KEY" \
-H "Accept: application/json")
# Extract key fields
abuse_score=$(echo "$response" | jq -r '.data.abuseConfidenceScore // 0')
total_reports=$(echo "$response" | jq -r '.data.totalReports // 0')
is_whitelisted=$(echo "$response" | jq -r '.data.isWhitelisted // false')
# Add to output JSON
if [ "$FIRST_ENTRY" = false ]; then
echo "," >> "$OUTPUT_FILE"
fi
FIRST_ENTRY=false
cat >> "$OUTPUT_FILE" <<EOF
{
"ip": "$ip",
"abuse_score": $abuse_score,
"total_reports": $total_reports,
"is_whitelisted": $is_whitelisted,
"checked_at": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
}
EOF
# Alert on high abuse scores
if [ "$abuse_score" -gt 75 ]; then
echo " ⚠️ HIGH RISK: Abuse score $abuse_score%"
elif [ "$abuse_score" -gt 25 ]; then
echo " ⚠️ MODERATE RISK: Abuse score $abuse_score%"
else
echo " ✓ Clean: Abuse score $abuse_score%"
fi
# Rate limiting (AbuseIPDB free tier: 1000/day)
sleep 1
done < "$IP_FILE"
echo "]" >> "$OUTPUT_FILE"
echo ""
echo "[+] Reputation check complete. Results in $OUTPUT_FILE"
echo "[+] High-risk IPs:"
jq -r '.[] | select(.abuse_score > 75) | " \(.ip) - \(.abuse_score)% abuse score"' "$OUTPUT_FILE"
Usage:
export ABUSEIPDB_API_KEY="your-api-key-here"
./check_ip_reputation.sh /tmp/iocs/ipv4_addresses.txt results.json
Reference: AbuseIPDB API Documentation (https://docs.abuseipdb.com/) provides endpoint details and rate limits.
3. System Hardening Audit
Automated security configuration audit:
#!/bin/bash
# security_audit.sh - Basic Linux security configuration audit
set -euo pipefail
REPORT_FILE="${1:-security_audit_report.txt}"
exec > >(tee "$REPORT_FILE") # Output to both stdout and file
echo "=========================================="
echo " Linux Security Configuration Audit"
echo " $(date -u)"
echo "=========================================="
echo ""
# Check 1: SSH configuration
echo "[+] SSH Configuration Audit"
if [ -f /etc/ssh/sshd_config ]; then
echo " Checking PermitRootLogin..."
if grep -q "^PermitRootLogin no" /etc/ssh/sshd_config; then
echo " ✓ Root login disabled"
else
echo " ⚠️ WARNING: Root login may be enabled"
fi
echo " Checking PasswordAuthentication..."
if grep -q "^PasswordAuthentication no" /etc/ssh/sshd_config; then
echo " ✓ Password authentication disabled (key-only)"
else
echo " ⚠️ WARNING: Password authentication may be enabled"
fi
else
echo " ⚠️ /etc/ssh/sshd_config not found"
fi
echo ""
# Check 2: Firewall status
echo "[+] Firewall Status"
if command -v ufw &> /dev/null; then
if sudo ufw status | grep -q "Status: active"; then
echo " ✓ UFW firewall is active"
else
echo " ⚠️ WARNING: UFW firewall is inactive"
fi
elif command -v firewall-cmd &> /dev/null; then
if sudo firewall-cmd --state 2>/dev/null | grep -q "running"; then
echo " ✓ Firewalld is running"
else
echo " ⚠️ WARNING: Firewalld is not running"
fi
else
echo " ⚠️ No recognized firewall found (ufw/firewalld)"
fi
echo ""
# Check 3: Automatic updates
echo "[+] Automatic Updates"
if [ -f /etc/apt/apt.conf.d/50unattended-upgrades ]; then
echo " ✓ Unattended upgrades configured (Debian/Ubuntu)"
elif [ -f /etc/dnf/automatic.conf ]; then
echo " ✓ Automatic updates configured (RHEL/Fedora)"
else
echo " ⚠️ WARNING: Automatic updates not configured"
fi
echo ""
# Check 4: Password policy
echo "[+] Password Policy"
if [ -f /etc/login.defs ]; then
pass_max_days=$(grep "^PASS_MAX_DAYS" /etc/login.defs | awk '{print $2}')
pass_min_days=$(grep "^PASS_MIN_DAYS" /etc/login.defs | awk '{print $2}')
if [ "$pass_max_days" -le 90 ]; then
echo " ✓ Password expiration: $pass_max_days days"
else
echo " ⚠️ WARNING: Password expiration too long: $pass_max_days days"
fi
else
echo " ⚠️ /etc/login.defs not found"
fi
echo ""
# Check 5: World-writable files (security risk)
echo "[+] World-Writable Files (checking /etc, /bin, /sbin)"
world_writable=$(find /etc /bin /sbin -type f -perm -002 2>/dev/null | wc -l)
if [ "$world_writable" -eq 0 ]; then
echo " ✓ No world-writable files found in critical directories"
else
echo " ⚠️ WARNING: Found $world_writable world-writable files"
find /etc /bin /sbin -type f -perm -002 2>/dev/null | head -5
fi
echo ""
# Check 6: Listening services
echo "[+] Listening Network Services"
listening_services=$(ss -tlnp 2>/dev/null | grep LISTEN | wc -l)
echo " Total listening services: $listening_services"
echo " Services:"
ss -tlnp 2>/dev/null | grep LISTEN | awk '{print " " $4}' | head -10
echo ""
# Check 7: Failed login attempts (last 24 hours)
echo "[+] Failed Login Attempts (last 24 hours)"
if [ -f /var/log/auth.log ]; then
failed_logins=$(grep "Failed password" /var/log/auth.log | \
grep "$(date -d '24 hours ago' +'%b %e')" | wc -l)
echo " Failed SSH logins: $failed_logins"
if [ "$failed_logins" -gt 50 ]; then
echo " ⚠️ WARNING: High number of failed logins"
echo " Top source IPs:"
grep "Failed password" /var/log/auth.log | \
grep "$(date -d '24 hours ago' +'%b %e')" | \
grep -oE 'from [0-9.]+' | awk '{print $2}' | \
sort | uniq -c | sort -rn | head -5 | \
awk '{print " " $2 " (" $1 " attempts)"}'
fi
elif [ -f /var/log/secure ]; then
failed_logins=$(grep "Failed password" /var/log/secure | \
grep "$(date +'%b %e')" | wc -l)
echo " Failed SSH logins: $failed_logins"
else
echo " ⚠️ Auth log not found"
fi
echo ""
echo "=========================================="
echo " Audit Complete"
echo " Report saved to: $REPORT_FILE"
echo "=========================================="
Usage:
sudo ./security_audit.sh audit_report.txt
Python Automation Scripts
4. Automated Vulnerability Scanner Integration
Integrate with OpenVAS vulnerability scanner:
#!/usr/bin/env python3
"""
openvas_scanner.py - Automated OpenVAS vulnerability scanning
Requirements:
pip install python-gvm
"""
import sys
from datetime import datetime
from gvm.connections import UnixSocketConnection
from gvm.protocols.gmp import Gmp
from gvm.transforms import EtreeTransform
class OpenVASScanner:
"""Automated vulnerability scanning with OpenVAS"""
def __init__(self, socket_path='/run/gvmd/gvmd.sock'):
self.socket_path = socket_path
self.connection = None
self.gmp = None
def connect(self, username, password):
"""Establish connection to OpenVAS"""
try:
self.connection = UnixSocketConnection(path=self.socket_path)
transform = EtreeTransform()
self.gmp = Gmp(connection=self.connection, transform=transform)
# Authenticate
self.gmp.authenticate(username, password)
print(f"[+] Connected to OpenVAS as {username}")
return True
except Exception as e:
print(f"[-] Connection failed: {e}")
return False
def create_target(self, name, hosts):
"""
Create scan target
Args:
name: Target name
hosts: Comma-separated host IPs/ranges
Returns:
Target ID
"""
try:
response = self.gmp.create_target(
name=name,
hosts=[hosts],
comment=f"Created by automation: {datetime.utcnow().isoformat()}"
)
target_id = response.get('id')
print(f"[+] Target created: {name} (ID: {target_id})")
return target_id
except Exception as e:
print(f"[-] Failed to create target: {e}")
return None
def create_task(self, name, target_id, scanner_id, config_id):
"""
Create scan task
Args:
name: Task name
target_id: Target to scan
scanner_id: Scanner UUID (default scanner)
config_id: Scan config UUID (Full and fast, Base, etc.)
Returns:
Task ID
"""
try:
response = self.gmp.create_task(
name=name,
config_id=config_id,
target_id=target_id,
scanner_id=scanner_id
)
task_id = response.get('id')
print(f"[+] Task created: {name} (ID: {task_id})")
return task_id
except Exception as e:
print(f"[-] Failed to create task: {e}")
return None
def start_scan(self, task_id):
"""Start vulnerability scan"""
try:
response = self.gmp.start_task(task_id)
report_id = response.get('report_id')
print(f"[+] Scan started (Task: {task_id}, Report: {report_id})")
return report_id
except Exception as e:
print(f"[-] Failed to start scan: {e}")
return None
def get_scan_status(self, task_id):
"""Check scan progress"""
try:
response = self.gmp.get_task(task_id)
status = response.find('status').text
progress = response.find('progress').text
return {
'status': status,
'progress': int(progress) if progress != '-1' else 0
}
except Exception as e:
print(f"[-] Failed to get status: {e}")
return None
def get_results(self, task_id):
"""Retrieve scan results"""
try:
# Get reports for task
response = self.gmp.get_reports(
filter_string=f"task_id={task_id}"
)
reports = response.findall('report')
if not reports:
print("[-] No reports found")
return None
# Get latest report
latest_report = reports[0]
report_id = latest_report.get('id')
# Get full report with results
full_report = self.gmp.get_report(
report_id=report_id,
details=True
)
# Parse results
results = {
'high': 0,
'medium': 0,
'low': 0,
'log': 0,
'vulnerabilities': []
}
for result in full_report.findall('.//result'):
severity = float(result.find('severity').text or 0)
threat = result.find('threat').text
# Count by severity
if threat == 'High':
results['high'] += 1
elif threat == 'Medium':
results['medium'] += 1
elif threat == 'Low':
results['low'] += 1
else:
results['log'] += 1
# Store vulnerability details
vuln = {
'name': result.find('name').text,
'severity': severity,
'threat': threat,
'host': result.find('host').text,
'port': result.find('port').text,
'description': result.find('description').text
}
results['vulnerabilities'].append(vuln)
return results
except Exception as e:
print(f"[-] Failed to get results: {e}")
return None
def disconnect(self):
"""Close connection"""
if self.connection:
self.connection.disconnect()
print("[+] Disconnected from OpenVAS")
# Example usage
if __name__ == '__main__':
if len(sys.argv) < 4:
print(f"Usage: {sys.argv[0]} <username> <password> <target_ips>")
print("Example: ./openvas_scanner.py admin admin123 192.168.1.0/24")
sys.exit(1)
username = sys.argv[1]
password = sys.argv[2]
target_hosts = sys.argv[3]
scanner = OpenVASScanner()
if not scanner.connect(username, password):
sys.exit(1)
try:
# Create target
target_name = f"Auto-Scan-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
target_id = scanner.create_target(target_name, target_hosts)
if not target_id:
sys.exit(1)
# Use default scanner and "Full and fast" config
# These IDs may vary - check your OpenVAS instance
default_scanner_id = "08b69003-5fc2-4037-a479-93b440211c73"
full_fast_config_id = "daba56c8-73ec-11df-a475-002264764cea"
# Create task
task_name = f"Auto-Task-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
task_id = scanner.create_task(
task_name,
target_id,
default_scanner_id,
full_fast_config_id
)
if not task_id:
sys.exit(1)
# Start scan
report_id = scanner.start_scan(task_id)
if report_id:
print(f"\n[+] Scan initiated successfully")
print(f" Task ID: {task_id}")
print(f" Report ID: {report_id}")
print(f"\n[!] Monitor scan progress:")
print(f" ./openvas_scanner.py status {task_id}")
finally:
scanner.disconnect()
Reference: python-gvm Documentation (https://python-gvm.readthedocs.io/) provides comprehensive API details.
5. SIEM Log Shipper
Ship logs to SIEM with enrichment:
#!/usr/bin/env python3
"""
siem_log_shipper.py - Collect, enrich, and ship logs to SIEM
Requirements:
pip install requests watchdog
"""
import json
import time
import hashlib
import requests
from datetime import datetime
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class LogEnricher:
"""Enrich logs with context before shipping"""
@staticmethod
def calculate_hash(content):
"""Calculate content hash for deduplication"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
@staticmethod
def parse_log_level(line):
"""Extract log level from line"""
levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
for level in levels:
if level in line.upper():
return level
return 'INFO'
@staticmethod
def enrich_log(log_line, source_file):
"""Add metadata to log entry"""
return {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'source': {
'file': str(source_file),
'host': Path('/etc/hostname').read_text().strip()
},
'message': log_line,
'log_level': LogEnricher.parse_log_level(log_line),
'hash': LogEnricher.calculate_hash(log_line),
'tags': ['automated', 'log-shipper']
}
class SIEMShipper:
"""Ship enriched logs to SIEM"""
def __init__(self, siem_url, api_key, batch_size=100):
self.siem_url = siem_url
self.api_key = api_key
self.batch_size = batch_size
self.batch = []
def add_log(self, enriched_log):
"""Add log to batch"""
self.batch.append(enriched_log)
if len(self.batch) >= self.batch_size:
self.flush()
def flush(self):
"""Send batch to SIEM"""
if not self.batch:
return
try:
response = requests.post(
f'{self.siem_url}/api/logs/bulk',
headers={
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
},
json={'logs': self.batch},
timeout=10
)
response.raise_for_status()
print(f"[+] Shipped {len(self.batch)} logs to SIEM")
self.batch = []
except requests.exceptions.RequestException as e:
print(f"[-] Failed to ship logs: {e}")
# In production, implement retry logic or dead-letter queue
class LogWatcher(FileSystemEventHandler):
"""Monitor log files for changes"""
def __init__(self, shipper, enricher):
self.shipper = shipper
self.enricher = enricher
self.processed_lines = set()
def on_modified(self, event):
"""Handle file modification events"""
if event.is_directory:
return
# Read new lines from file
try:
with open(event.src_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
# Deduplicate
line_hash = self.enricher.calculate_hash(line)
if line_hash in self.processed_lines:
continue
self.processed_lines.add(line_hash)
# Enrich and ship
enriched = self.enricher.enrich_log(line, event.src_path)
self.shipper.add_log(enriched)
except Exception as e:
print(f"[-] Error processing {event.src_path}: {e}")
# Example usage
if __name__ == '__main__':
import sys
if len(sys.argv) < 4:
print(f"Usage: {sys.argv[0]} <siem_url> <api_key> <log_directory>")
print("Example: ./siem_log_shipper.py https://siem.example.com api_key /var/log")
sys.exit(1)
siem_url = sys.argv[1]
api_key = sys.argv[2]
log_directory = sys.argv[3]
# Initialize components
enricher = LogEnricher()
shipper = SIEMShipper(siem_url, api_key)
event_handler = LogWatcher(shipper, enricher)
# Set up file system observer
observer = Observer()
observer.schedule(event_handler, log_directory, recursive=True)
observer.start()
print(f"[+] Monitoring {log_directory} for log changes")
print("[+] Press Ctrl+C to stop")
try:
while True:
time.sleep(1)
# Periodically flush any pending logs
shipper.flush()
except KeyboardInterrupt:
print("\n[!] Stopping log shipper")
observer.stop()
shipper.flush() # Final flush
observer.join()
6. Threat Intelligence Enrichment
Enrich IOCs with threat intelligence:
#!/usr/bin/env python3
"""
ti_enrichment.py - Enrich IOCs with threat intelligence
Requirements:
pip install requests
"""
import sys
import json
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
class ThreatIntelEnricher:
"""Enrich IOCs using multiple threat intelligence sources"""
def __init__(self, virustotal_api_key=None, abuseipdb_api_key=None):
self.vt_api_key = virustotal_api_key
self.abuse_api_key = abuseipdb_api_key
def enrich_ip(self, ip_address):
"""Enrich IP address with threat intelligence"""
results = {
'ioc': ip_address,
'type': 'ip',
'sources': {}
}
# AbuseIPDB
if self.abuse_api_key:
abuse_data = self._check_abuseipdb(ip_address)
if abuse_data:
results['sources']['abuseipdb'] = abuse_data
# VirusTotal
if self.vt_api_key:
vt_data = self._check_virustotal_ip(ip_address)
if vt_data:
results['sources']['virustotal'] = vt_data
# Calculate overall risk score
results['risk_score'] = self._calculate_risk_score(results['sources'])
return results
def _check_abuseipdb(self, ip_address):
"""Query AbuseIPDB"""
try:
response = requests.get(
'https://api.abuseipdb.com/api/v2/check',
params={
'ipAddress': ip_address,
'maxAgeInDays': 90
},
headers={
'Key': self.abuse_api_key,
'Accept': 'application/json'
},
timeout=10
)
if response.status_code == 200:
data = response.json()['data']
return {
'abuse_confidence_score': data.get('abuseConfidenceScore'),
'total_reports': data.get('totalReports'),
'is_whitelisted': data.get('isWhitelisted'),
'country_code': data.get('countryCode')
}
except Exception as e:
print(f"[-] AbuseIPDB error for {ip_address}: {e}", file=sys.stderr)
return None
def _check_virustotal_ip(self, ip_address):
"""Query VirusTotal"""
try:
response = requests.get(
f'https://www.virustotal.com/api/v3/ip_addresses/{ip_address}',
headers={
'x-apikey': self.vt_api_key
},
timeout=10
)
if response.status_code == 200:
data = response.json()['data']['attributes']
stats = data.get('last_analysis_stats', {})
return {
'malicious': stats.get('malicious', 0),
'suspicious': stats.get('suspicious', 0),
'harmless': stats.get('harmless', 0),
'total_vendors': sum(stats.values()),
'reputation': data.get('reputation', 0)
}
except Exception as e:
print(f"[-] VirusTotal error for {ip_address}: {e}", file=sys.stderr)
return None
def _calculate_risk_score(self, sources):
"""Calculate overall risk score from multiple sources"""
score = 0
# AbuseIPDB contribution
if 'abuseipdb' in sources:
abuse_score = sources['abuseipdb'].get('abuse_confidence_score', 0)
score += abuse_score * 0.5 # Weight: 50%
# VirusTotal contribution
if 'virustotal' in sources:
vt = sources['virustotal']
total = vt.get('total_vendors', 1)
malicious = vt.get('malicious', 0)
suspicious = vt.get('suspicious', 0)
vt_score = ((malicious + suspicious * 0.5) / total) * 100
score += vt_score * 0.5 # Weight: 50%
return min(100, score) # Cap at 100
def enrich_bulk(self, iocs, max_workers=5):
"""Enrich multiple IOCs in parallel"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self.enrich_ip, ioc): ioc for ioc in iocs}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
# Rate limiting
time.sleep(0.5)
except Exception as e:
ioc = futures[future]
print(f"[-] Failed to enrich {ioc}: {e}", file=sys.stderr)
return results
# Example usage
if __name__ == '__main__':
import os
vt_key = os.getenv('VIRUSTOTAL_API_KEY')
abuse_key = os.getenv('ABUSEIPDB_API_KEY')
if not (vt_key or abuse_key):
print("Error: Set VIRUSTOTAL_API_KEY or ABUSEIPDB_API_KEY environment variable")
sys.exit(1)
enricher = ThreatIntelEnricher(
virustotal_api_key=vt_key,
abuseipdb_api_key=abuse_key
)
# Test IPs
test_ips = [
'8.8.8.8', # Google DNS (clean)
'185.220.101.1', # Tor exit node (likely flagged)
]
print("[+] Enriching IOCs...")
results = enricher.enrich_bulk(test_ips)
# Print results
for result in results:
print(f"\n{'='*60}")
print(f"IP: {result['ioc']}")
print(f"Risk Score: {result['risk_score']:.1f}/100")
if 'abuseipdb' in result['sources']:
abuse = result['sources']['abuseipdb']
print(f" AbuseIPDB:")
print(f" Confidence: {abuse['abuse_confidence_score']}%")
print(f" Reports: {abuse['total_reports']}")
if 'virustotal' in result['sources']:
vt = result['sources']['virustotal']
print(f" VirusTotal:")
print(f" Malicious: {vt['malicious']}/{vt['total_vendors']}")
print(f" Reputation: {vt['reputation']}")
# Save to JSON
with open('enriched_iocs.json', 'w') as f:
json.dump(results, f, indent=2)
print(f"\n[+] Results saved to enriched_iocs.json")
Best Practices
1. Error Handling
Always implement comprehensive error handling:
import sys
import logging
from functools import wraps
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('automation.log'),
logging.StreamHandler(sys.stdout)
]
)
def handle_errors(func):
"""Decorator for error handling"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logging.error(f"Error in {func.__name__}: {e}", exc_info=True)
return None
return wrapper
@handle_errors
def risky_operation():
# Code that might fail
pass
2. Configuration Management
Externalize configuration:
import json
from pathlib import Path
class Config:
"""Centralized configuration management"""
def __init__(self, config_file='config.json'):
self.config_file = Path(config_file)
self.config = self._load_config()
def _load_config(self):
"""Load configuration from file"""
if self.config_file.exists():
with open(self.config_file) as f:
return json.load(f)
else:
return self._default_config()
def _default_config(self):
"""Default configuration"""
return {
'siem_url': 'https://siem.example.com',
'api_timeout': 10,
'batch_size': 100,
'log_level': 'INFO'
}
def get(self, key, default=None):
"""Get configuration value"""
return self.config.get(key, default)
# Usage
config = Config()
siem_url = config.get('siem_url')
3. Rate Limiting
Respect API rate limits:
import time
from functools import wraps
class RateLimiter:
"""Simple rate limiter"""
def __init__(self, calls_per_second):
self.calls_per_second = calls_per_second
self.min_interval = 1.0 / calls_per_second
self.last_call = 0
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - self.last_call
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call = time.time()
return func(*args, **kwargs)
return wrapper
# Usage
@RateLimiter(calls_per_second=1)
def api_call():
# Makes max 1 call per second
pass
4. Secrets Management
Never hardcode credentials:
# Use environment variables
export VIRUSTOTAL_API_KEY="your-key-here"
# Or use dedicated secrets manager
# AWS Secrets Manager, HashiCorp Vault, etc.
import os
def get_api_key(key_name):
"""Retrieve API key from environment"""
key = os.getenv(key_name)
if not key:
raise ValueError(f"{key_name} environment variable not set")
return key
# Usage
vt_key = get_api_key('VIRUSTOTAL_API_KEY')
Conclusion
Security automation scripting reduces analyst workload, improves response times, and ensures consistency in security operations. The scripts provided—IOC extraction, reputation checking, vulnerability scanning, log shipping, and threat intelligence enrichment—address common SOC workflows.
Key principles for effective security automation:
- Start Small: Automate one repetitive task, validate, then expand
- Error Handling: Comprehensive exception handling prevents script failures from disrupting operations
- Logging: Detailed logs enable troubleshooting and audit trails
- Rate Limiting: Respect API limits to avoid service disruptions
- Testing: Test scripts in non-production before deploying
- Documentation: Comment code and maintain runbooks
- Security: Protect credentials, validate inputs, follow least privilege
The scripts provided are templates—adapt error handling, logging, and security controls to your environment before production use. Security automation is not set-and-forget; regularly review and update scripts as threats, tools, and environments evolve.
Effective automation amplifies analyst capabilities, enabling security teams to respond faster and focus human expertise on complex threats requiring judgment and creativity.
References
- Gartner Market Guide for SOAR: https://www.gartner.com/en/documents/soar
- AbuseIPDB API Documentation: https://docs.abuseipdb.com/
- VirusTotal API v3 Documentation: https://developers.virustotal.com/reference/overview
- python-gvm Documentation: https://python-gvm.readthedocs.io/
- Requests Library Documentation: https://requests.readthedocs.io/
- Watchdog Documentation: https://python-watchdog.readthedocs.io/
- Python Logging Best Practices: https://docs.python.org/3/howto/logging.html
- Bash Scripting Best Practices: https://google.github.io/styleguide/shellguide.html
- OWASP Secure Coding Practices: https://owasp.org/www-project-secure-coding-practices-quick-reference-guide/
- NIST SP 800-82 - Industrial Control Systems Security: https://csrc.nist.gov/publications/detail/sp/800-82/rev-3/final
Security Note: The scripts provided are for educational and legitimate security operations. Ensure you have proper authorization before scanning networks or accessing systems. Unauthorized access, even with security tools, may violate computer fraud laws (CFAA in US, Computer Misuse Act in UK, etc.). Always obtain written permission before conducting security assessments.