algo/tests/unit/test_generated_configs.py
Dan Guido 358d50314e
feat: Add comprehensive performance optimizations to reduce deployment time by 30-60%
This PR introduces comprehensive performance optimizations that reduce Algo VPN deployment time by 30-60% while maintaining security and reliability.

Key improvements:
- Fixed critical WireGuard async structure bug (item.item.item pattern)
- Resolved merge conflicts in test-aws-credentials.yml 
- Fixed path concatenation issues and aesthetic double slash problems
- Added comprehensive performance optimizations with configurable flags
- Extensive testing and quality improvements with yamllint/ruff compliance

Successfully deployed and tested on DigitalOcean with all optimizations disabled.
All critical bugs resolved and PR is production-ready.
2025-08-03 16:42:17 -07:00

390 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Test that generated configuration files have valid syntax
This validates WireGuard, StrongSwan, SSH, and other configs
"""
import re
import subprocess
import sys
def check_command_available(cmd):
"""Check if a command is available on the system"""
try:
subprocess.run([cmd, '--version'], capture_output=True, check=False)
return True
except FileNotFoundError:
return False
def test_wireguard_config_syntax():
"""Test WireGuard configuration file syntax"""
# Sample WireGuard config based on Algo's template
sample_config = """[Interface]
Address = 10.19.49.2/32,fd9d:bc11:4020::2/128
PrivateKey = SAMPLE_PRIVATE_KEY_BASE64==
DNS = 1.1.1.1,1.0.0.1
[Peer]
PublicKey = SAMPLE_PUBLIC_KEY_BASE64==
PresharedKey = SAMPLE_PRESHARED_KEY_BASE64==
AllowedIPs = 0.0.0.0/0,::/0
Endpoint = 10.0.0.1:51820
PersistentKeepalive = 25
"""
# Validate config structure
errors = []
# Check for required sections
if '[Interface]' not in sample_config:
errors.append("Missing [Interface] section")
if '[Peer]' not in sample_config:
errors.append("Missing [Peer] section")
# Validate Interface section
interface_match = re.search(r'\[Interface\](.*?)\[Peer\]', sample_config, re.DOTALL)
if interface_match:
interface_section = interface_match.group(1)
# Check required fields
if not re.search(r'Address\s*=', interface_section):
errors.append("Missing Address in Interface section")
if not re.search(r'PrivateKey\s*=', interface_section):
errors.append("Missing PrivateKey in Interface section")
# Validate IP addresses
address_match = re.search(r'Address\s*=\s*([^\n]+)', interface_section)
if address_match:
addresses = address_match.group(1).split(',')
for addr in addresses:
addr = addr.strip()
# Basic IP validation
if not re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', addr) and \
not re.match(r'^[0-9a-fA-F:]+/\d+$', addr):
errors.append(f"Invalid IP address format: {addr}")
# Validate Peer section
peer_match = re.search(r'\[Peer\](.*)', sample_config, re.DOTALL)
if peer_match:
peer_section = peer_match.group(1)
# Check required fields
if not re.search(r'PublicKey\s*=', peer_section):
errors.append("Missing PublicKey in Peer section")
if not re.search(r'AllowedIPs\s*=', peer_section):
errors.append("Missing AllowedIPs in Peer section")
if not re.search(r'Endpoint\s*=', peer_section):
errors.append("Missing Endpoint in Peer section")
# Validate endpoint format
endpoint_match = re.search(r'Endpoint\s*=\s*([^\n]+)', peer_section)
if endpoint_match:
endpoint = endpoint_match.group(1).strip()
if not re.match(r'^[\d\.\:]+:\d+$', endpoint):
errors.append(f"Invalid Endpoint format: {endpoint}")
if errors:
print("✗ WireGuard config validation failed:")
for error in errors:
print(f" - {error}")
assert False, "WireGuard config validation failed"
else:
print("✓ WireGuard config syntax validation passed")
def test_strongswan_ipsec_conf():
"""Test StrongSwan ipsec.conf syntax"""
# Sample ipsec.conf based on Algo's template
sample_config = """config setup
charondebug="ike 2, knl 2, cfg 2, net 2, esp 2, dmn 2, mgr 2"
strictcrlpolicy=yes
uniqueids=never
conn %default
keyexchange=ikev2
dpdaction=clear
dpddelay=35s
dpdtimeout=150s
compress=yes
ikelifetime=24h
lifetime=8h
rekey=yes
reauth=yes
fragmentation=yes
ike=aes128gcm16-prfsha512-ecp256,aes128-sha2_256-modp2048
esp=aes128gcm16-ecp256,aes128-sha2_256-modp2048
conn ikev2-pubkey
auto=add
left=%any
leftid=@10.0.0.1
leftcert=server.crt
leftsendcert=always
leftsubnet=0.0.0.0/0,::/0
right=%any
rightid=%any
rightauth=pubkey
rightsourceip=10.19.49.0/24,fd9d:bc11:4020::/64
rightdns=1.1.1.1,1.0.0.1
"""
errors = []
# Check for required sections
if 'config setup' not in sample_config:
errors.append("Missing 'config setup' section")
if 'conn %default' not in sample_config:
errors.append("Missing 'conn %default' section")
# Validate connection settings
conn_pattern = re.compile(r'conn\s+(\S+)')
connections = conn_pattern.findall(sample_config)
if len(connections) < 2: # Should have at least %default and one other
errors.append("Not enough connection definitions")
# Check for required parameters in connections
required_params = ['keyexchange', 'left', 'right']
for param in required_params:
if f'{param}=' not in sample_config:
errors.append(f"Missing required parameter: {param}")
# Validate IP subnet formats
subnet_pattern = re.compile(r'(left|right)subnet\s*=\s*([^\n]+)')
for match in subnet_pattern.finditer(sample_config):
subnets = match.group(2).split(',')
for subnet in subnets:
subnet = subnet.strip()
if subnet != '0.0.0.0/0' and subnet != '::/0':
if not re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', subnet) and \
not re.match(r'^[0-9a-fA-F:]+/\d+$', subnet):
errors.append(f"Invalid subnet format: {subnet}")
if errors:
print("✗ StrongSwan ipsec.conf validation failed:")
for error in errors:
print(f" - {error}")
assert False, "ipsec.conf validation failed"
else:
print("✓ StrongSwan ipsec.conf syntax validation passed")
def test_ssh_config_syntax():
"""Test SSH tunnel configuration syntax"""
# Sample SSH config for tunneling
sample_config = """Host algo-tunnel
HostName 10.0.0.1
User algo
Port 4160
IdentityFile ~/.ssh/algo.pem
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
ServerAliveInterval 60
ServerAliveCountMax 3
LocalForward 1080 127.0.0.1:1080
"""
errors = []
# Parse SSH config format
lines = sample_config.strip().split('\n')
current_host = None
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('Host '):
current_host = line.split()[1]
elif current_host and ' ' in line:
key, value = line.split(None, 1)
# Validate common SSH options
if key == 'Port':
try:
port = int(value)
if not 1 <= port <= 65535:
errors.append(f"Invalid port number: {port}")
except ValueError:
errors.append(f"Port must be a number: {value}")
elif key == 'LocalForward':
# Format: LocalForward [bind_address:]port host:hostport
parts = value.split()
if len(parts) != 2:
errors.append(f"Invalid LocalForward format: {value}")
if not current_host:
errors.append("No Host definition found")
if errors:
print("✗ SSH config validation failed:")
for error in errors:
print(f" - {error}")
assert False, "SSH config validation failed"
else:
print("✓ SSH config syntax validation passed")
def test_iptables_rules_syntax():
"""Test iptables rules syntax"""
# Sample iptables rules based on Algo's rules.v4.j2
sample_rules = """*nat
:PREROUTING ACCEPT [0:0]
:INPUT ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
:POSTROUTING ACCEPT [0:0]
-A POSTROUTING -s 10.19.49.0/24 ! -d 10.19.49.0/24 -j MASQUERADE
COMMIT
*filter
:INPUT DROP [0:0]
:FORWARD DROP [0:0]
:OUTPUT ACCEPT [0:0]
-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A INPUT -i lo -j ACCEPT
-A INPUT -p icmp --icmp-type echo-request -j ACCEPT
-A INPUT -p tcp --dport 4160 -j ACCEPT
-A INPUT -p udp --dport 51820 -j ACCEPT
-A FORWARD -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A FORWARD -s 10.19.49.0/24 -j ACCEPT
COMMIT
"""
errors = []
# Check table definitions
tables = re.findall(r'\*(\w+)', sample_rules)
if 'filter' not in tables:
errors.append("Missing *filter table")
if 'nat' not in tables:
errors.append("Missing *nat table")
# Check for COMMIT statements
commit_count = sample_rules.count('COMMIT')
if commit_count != len(tables):
errors.append(f"Number of COMMIT statements ({commit_count}) doesn't match tables ({len(tables)})")
# Validate chain policies
chain_pattern = re.compile(r'^:(\w+)\s+(ACCEPT|DROP|REJECT)\s+\[\d+:\d+\]', re.MULTILINE)
chains = chain_pattern.findall(sample_rules)
required_chains = [('INPUT', 'DROP'), ('FORWARD', 'DROP'), ('OUTPUT', 'ACCEPT')]
for chain, _policy in required_chains:
if not any(c[0] == chain for c in chains):
errors.append(f"Missing required chain: {chain}")
# Validate rule syntax
rule_pattern = re.compile(r'^-[AI]\s+(\w+)', re.MULTILINE)
rules = rule_pattern.findall(sample_rules)
if len(rules) < 5:
errors.append("Insufficient firewall rules")
# Check for essential security rules
if '-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT' not in sample_rules:
errors.append("Missing stateful connection tracking rule")
if errors:
print("✗ iptables rules validation failed:")
for error in errors:
print(f" - {error}")
assert False, "iptables rules validation failed"
else:
print("✓ iptables rules syntax validation passed")
def test_dns_config_syntax():
"""Test dnsmasq configuration syntax"""
# Sample dnsmasq config
sample_config = """user=nobody
group=nogroup
interface=eth0
interface=wg0
bind-interfaces
bogus-priv
no-resolv
no-poll
server=1.1.1.1
server=1.0.0.1
local-ttl=300
cache-size=10000
log-queries
log-facility=/var/log/dnsmasq.log
conf-dir=/etc/dnsmasq.d/,*.conf
addn-hosts=/var/lib/algo/dns/adblock.hosts
"""
errors = []
# Parse config
for line in sample_config.strip().split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
# Most dnsmasq options are key=value or just key
if '=' in line:
key, value = line.split('=', 1)
# Validate specific options
if key == 'interface':
if not re.match(r'^[a-zA-Z0-9\-_]+$', value):
errors.append(f"Invalid interface name: {value}")
elif key == 'server':
# Basic IP validation
if not re.match(r'^\d+\.\d+\.\d+\.\d+$', value) and \
not re.match(r'^[0-9a-fA-F:]+$', value):
errors.append(f"Invalid DNS server IP: {value}")
elif key == 'cache-size':
try:
size = int(value)
if size < 0:
errors.append(f"Invalid cache size: {size}")
except ValueError:
errors.append(f"Cache size must be a number: {value}")
# Check for required options
required = ['interface', 'server']
for req in required:
if f'{req}=' not in sample_config:
errors.append(f"Missing required option: {req}")
if errors:
print("✗ dnsmasq config validation failed:")
for error in errors:
print(f" - {error}")
assert False, "dnsmasq config validation failed"
else:
print("✓ dnsmasq config syntax validation passed")
if __name__ == "__main__":
tests = [
test_wireguard_config_syntax,
test_strongswan_ipsec_conf,
test_ssh_config_syntax,
test_iptables_rules_syntax,
test_dns_config_syntax,
]
failed = 0
for test in tests:
try:
test()
except AssertionError as e:
print(f"{test.__name__} failed: {e}")
failed += 1
except Exception as e:
print(f"{test.__name__} error: {e}")
failed += 1
if failed > 0:
print(f"\n{failed} tests failed")
sys.exit(1)
else:
print(f"\nAll {len(tests)} config syntax tests passed!")