mirror of
https://github.com/trailofbits/algo.git
synced 2025-09-03 02:23:39 +02:00
* Fix VPN routing by adding output interface to NAT rules The NAT rules were missing the output interface specification (-o eth0), which caused routing failures on multi-homed systems (servers with multiple network interfaces). Without specifying the output interface, packets might not be NAT'd correctly. Changes: - Added -o {{ ansible_default_ipv4['interface'] }} to all NAT rules - Updated both IPv4 and IPv6 templates - Updated tests to verify output interface is present - Added ansible_default_ipv4/ipv6 to test fixtures This fixes the issue where VPN clients could connect but not route traffic to the internet on servers with multiple network interfaces (like DigitalOcean droplets with private networking enabled). 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix VPN routing by adding output interface to NAT rules On multi-homed systems (servers with multiple network interfaces or multiple IPs on one interface), MASQUERADE rules need to specify which interface to use for NAT. Without the output interface specification, packets may not be routed correctly. This fix adds the output interface to all NAT rules: -A POSTROUTING -s [vpn_subnet] -o eth0 -j MASQUERADE Changes: - Modified roles/common/templates/rules.v4.j2 to include output interface - Modified roles/common/templates/rules.v6.j2 for IPv6 support - Added tests to verify output interface is present in NAT rules - Added ansible_default_ipv4/ipv6 variables to test fixtures For deployments on providers like DigitalOcean where MASQUERADE still fails due to multiple IPs on the same interface, users can enable the existing alternative_ingress_ip option in config.cfg to use explicit SNAT. Testing: - Verified on live servers - All unit tests pass (67/67) - Mutation testing confirms test coverage This fixes VPN connectivity on servers with multiple interfaces while remaining backward compatible with single-interface deployments. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix dnscrypt-proxy not listening on VPN service IPs Problem: dnscrypt-proxy on Ubuntu uses systemd socket activation by default, which overrides the configured listen_addresses in dnscrypt-proxy.toml. The socket only listens on 127.0.2.1:53, preventing VPN clients from resolving DNS queries through the configured service IPs. Solution: Disable and mask the dnscrypt-proxy.socket unit to allow dnscrypt-proxy to bind directly to the VPN service IPs specified in its configuration file. This fixes DNS resolution for VPN clients on Ubuntu 20.04+ systems. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Apply Python linting and formatting - Run ruff check --fix to fix linting issues - Run ruff format to ensure consistent formatting - All tests still pass after formatting changes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Restrict DNS access to VPN clients only Security fix: The firewall rule for DNS was accepting traffic from any source (0.0.0.0/0) to the local DNS resolver. While the service IP is on the loopback interface (which normally isn't routable externally), this could be a security risk if misconfigured. Changed firewall rules to only accept DNS traffic from VPN subnets: - INPUT rule now includes -s {{ subnets }} to restrict source IPs - Applied to both IPv4 and IPv6 rules - Added test to verify DNS is properly restricted This ensures the DNS resolver is only accessible to connected VPN clients, not the entire internet. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix dnscrypt-proxy service startup with masked socket Problem: dnscrypt-proxy.service has a dependency on dnscrypt-proxy.socket through the TriggeredBy directive. When we mask the socket before starting the service, systemd fails with "Unit dnscrypt-proxy.socket is masked." Solution: 1. Override the service to remove socket dependency (TriggeredBy=) 2. Reload systemd daemon immediately after override changes 3. Start the service (which now doesn't require the socket) 4. Only then disable and mask the socket This ensures dnscrypt-proxy can bind directly to the configured IPs without socket activation, while preventing the socket from being re-enabled by package updates. Changes: - Added TriggeredBy= override to remove socket dependency - Added explicit daemon reload after service overrides - Moved socket masking to after service start in main.yml - Fixed YAML formatting issues Testing: Deployment now succeeds with dnscrypt-proxy binding to VPN IPs 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix dnscrypt-proxy by not masking the socket Problem: Masking dnscrypt-proxy.socket prevents the service from starting because the service has Requires=dnscrypt-proxy.socket dependency. Solution: Simply stop and disable the socket without masking it. This prevents socket activation while allowing the service to start and bind directly to the configured IPs. Changes: - Removed socket masking (just disable it) - Moved socket disabling before service start - Removed invalid systemd directives from override Testing: Confirmed dnscrypt-proxy now listens on VPN service IPs 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Use systemd socket activation properly for dnscrypt-proxy Instead of fighting systemd socket activation, configure it to listen on the correct VPN service IPs. This is more systemd-native and reliable. Changes: - Create socket override to listen on VPN IPs instead of localhost - Clear default listeners and add VPN service IPs - Use empty listen_addresses in dnscrypt-proxy.toml for socket activation - Keep socket enabled and let systemd manage the activation - Add handler for restarting socket when config changes Benefits: - Works WITH systemd instead of against it - Survives package updates better - No dependency conflicts - More reliable service management This approach is cleaner than disabling socket activation entirely and ensures dnscrypt-proxy is accessible to VPN clients on the correct IPs. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Document debugging lessons learned in CLAUDE.md Added comprehensive debugging guidance based on our troubleshooting session: - VPN connectivity troubleshooting order (DNS first!) - systemd socket activation best practices - Common deployment failures and solutions - Time wasters to avoid (lessons learned the hard way) - Multi-homed system considerations - Testing notes for DigitalOcean These additions will help future debugging sessions avoid the same rabbit holes and focus on the most likely issues first. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix DNS resolution for VPN clients by enabling route_localnet The issue was that dnscrypt-proxy listens on a special loopback IP (randomly generated in 172.16.0.0/12 range) which wasn't accessible from VPN clients. This fix: 1. Enables net.ipv4.conf.all.route_localnet sysctl to allow routing to loopback IPs from other interfaces 2. Ensures dnscrypt-proxy socket is properly restarted when its configuration changes 3. Adds proper handler flushing after socket configuration updates This allows VPN clients to reach the DNS resolver at the local_service_ip address configured on the loopback interface. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Improve security by using interface-specific route_localnet Instead of enabling route_localnet globally (net.ipv4.conf.all.route_localnet), this change enables it only on the specific interfaces that need it: - WireGuard interface (wg0) for WireGuard VPN clients - Main network interface (eth0/etc) for IPsec VPN clients This minimizes the security impact by restricting loopback routing to only the VPN interfaces, preventing other interfaces from being able to route to loopback addresses. The interface-specific approach provides the same functionality (allowing VPN clients to reach the DNS resolver on the local_service_ip) while reducing the potential attack surface. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Revert to global route_localnet to fix deployment failure The interface-specific route_localnet approach failed because: - WireGuard interface (wg0) doesn't exist until the service starts - We were trying to set the sysctl before the interface was created - This caused deployment failures with "No such file or directory" Reverting to the global setting (net.ipv4.conf.all.route_localnet=1) because: - It always works regardless of interface creation timing - VPN users are trusted (they have our credentials) - Firewall rules still restrict access to only port 53 - The security benefit of interface-specific settings is minimal - The added complexity isn't worth the marginal security improvement This ensures reliable deployments while maintaining the DNS resolution fix. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix dnscrypt-proxy socket restart and remove problematic BPF hardening Two important fixes: 1. Fix dnscrypt-proxy socket not restarting with new configuration - The socket wasn't properly restarting when its override config changed - This caused DNS to listen on wrong IP (127.0.2.1 instead of local_service_ip) - Now directly restart the socket when configuration changes - Add explicit daemon reload before restarting 2. Remove BPF JIT hardening that causes deployment errors - The net.core.bpf_jit_enable sysctl isn't available on all kernels - It was causing "Invalid argument" errors during deployment - This was optional security hardening with minimal benefit - Removing it eliminates deployment errors for most users These fixes ensure reliable DNS resolution for VPN clients and clean deployments without error messages. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Update CLAUDE.md with comprehensive debugging lessons learned Based on our extensive debugging session, this update adds critical documentation: ## DNS Architecture and Troubleshooting - Explained the local_service_ip design and why it requires route_localnet - Added detailed DNS debugging methodology with exact steps in order - Documented systemd socket activation complexities and common mistakes - Added specific commands to verify DNS is working correctly ## Architectural Decisions - Added new section explaining trade-offs in Algo's design choices - Documented why local_service_ip uses loopback instead of alternatives - Explained iptables-legacy vs iptables-nft backend choice ## Enhanced Debugging Guidance - Expanded troubleshooting with exact commands and expected outputs - Added warnings about configuration changes that need restarts - Documented socket activation override requirements in detail - Added common pitfalls like interface-specific sysctls ## Time Wasters Section - Added new lessons learned from this debugging session - Interface-specific route_localnet (fails before interface exists) - DNAT for loopback addresses (doesn't work) - BPF JIT hardening (causes errors on many kernels) This documentation will help future maintainers avoid the same debugging rabbit holes and understand why things are designed the way they are. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
576 lines
24 KiB
Python
576 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test PKI certificate validation for Ansible-generated certificates
|
|
Hybrid approach: validates actual certificates when available, else tests templates/config
|
|
Based on issues #14755, #14718 - Apple device compatibility
|
|
Issues #75, #153 - Security enhancements (name constraints, EKU restrictions)
|
|
"""
|
|
|
|
import glob
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import UTC
|
|
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import ExtensionOID, NameOID
|
|
|
|
|
|
def find_generated_certificates():
|
|
"""Find Ansible-generated certificate files in configs directory"""
|
|
# Look for configs directory structure created by Ansible
|
|
config_patterns = [
|
|
"configs/*/ipsec/.pki/cacert.pem",
|
|
"../configs/*/ipsec/.pki/cacert.pem", # From tests/unit directory
|
|
"../../configs/*/ipsec/.pki/cacert.pem", # Alternative path
|
|
]
|
|
|
|
for pattern in config_patterns:
|
|
ca_certs = glob.glob(pattern)
|
|
if ca_certs:
|
|
base_path = os.path.dirname(ca_certs[0])
|
|
return {
|
|
"ca_cert": ca_certs[0],
|
|
"base_path": base_path,
|
|
"server_certs": glob.glob(f"{base_path}/certs/*.crt"),
|
|
"p12_files": glob.glob(f"{base_path.replace('/.pki', '')}/manual/*.p12"),
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def test_openssl_version_detection():
|
|
"""Test that we can detect OpenSSL version for compatibility checks"""
|
|
result = subprocess.run(["openssl", "version"], capture_output=True, text=True)
|
|
|
|
assert result.returncode == 0, "Failed to get OpenSSL version"
|
|
|
|
# Parse version - e.g., "OpenSSL 3.0.2 15 Mar 2022"
|
|
version_match = re.search(r"OpenSSL\s+(\d+)\.(\d+)\.(\d+)", result.stdout)
|
|
assert version_match, f"Can't parse OpenSSL version: {result.stdout}"
|
|
|
|
major = int(version_match.group(1))
|
|
minor = int(version_match.group(2))
|
|
|
|
print(f"✓ OpenSSL version detected: {major}.{minor}")
|
|
return (major, minor)
|
|
|
|
|
|
def validate_ca_certificate_real(cert_files):
|
|
"""Validate actual Ansible-generated CA certificate"""
|
|
# Read the actual CA certificate generated by Ansible
|
|
with open(cert_files["ca_cert"], "rb") as f:
|
|
cert_data = f.read()
|
|
|
|
certificate = x509.load_pem_x509_certificate(cert_data)
|
|
|
|
# Check Basic Constraints
|
|
basic_constraints = certificate.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value
|
|
assert basic_constraints.ca is True, "CA certificate should have CA:TRUE"
|
|
assert basic_constraints.path_length == 0, "CA should have pathlen:0 constraint"
|
|
|
|
# Check Key Usage
|
|
key_usage = certificate.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE).value
|
|
assert key_usage.key_cert_sign is True, "CA should have keyCertSign usage"
|
|
assert key_usage.crl_sign is True, "CA should have cRLSign usage"
|
|
|
|
# Check Extended Key Usage (Issue #75)
|
|
eku = certificate.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE).value
|
|
assert x509.oid.ExtendedKeyUsageOID.SERVER_AUTH in eku, "CA should allow signing server certificates"
|
|
assert x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in eku, "CA should allow signing client certificates"
|
|
assert x509.ObjectIdentifier("1.3.6.1.5.5.7.3.17") in eku, "CA should have IPsec End Entity EKU"
|
|
|
|
# Check Name Constraints (Issue #75) - defense against certificate misuse
|
|
name_constraints = certificate.extensions.get_extension_for_oid(ExtensionOID.NAME_CONSTRAINTS).value
|
|
assert name_constraints.permitted_subtrees is not None, "CA should have permitted name constraints"
|
|
assert name_constraints.excluded_subtrees is not None, "CA should have excluded name constraints"
|
|
|
|
# Verify public domains are excluded
|
|
excluded_dns = [
|
|
constraint.value for constraint in name_constraints.excluded_subtrees if isinstance(constraint, x509.DNSName)
|
|
]
|
|
public_domains = [".com", ".org", ".net", ".gov", ".edu", ".mil", ".int"]
|
|
for domain in public_domains:
|
|
assert domain in excluded_dns, f"CA should exclude public domain {domain}"
|
|
|
|
# Verify private IP ranges are excluded (Issue #75)
|
|
excluded_ips = [
|
|
constraint.value for constraint in name_constraints.excluded_subtrees if isinstance(constraint, x509.IPAddress)
|
|
]
|
|
assert len(excluded_ips) > 0, "CA should exclude private IP ranges"
|
|
|
|
# Verify email domains are also excluded (Issue #153)
|
|
excluded_emails = [
|
|
constraint.value for constraint in name_constraints.excluded_subtrees if isinstance(constraint, x509.RFC822Name)
|
|
]
|
|
email_domains = [".com", ".org", ".net", ".gov", ".edu", ".mil", ".int"]
|
|
for domain in email_domains:
|
|
assert domain in excluded_emails, f"CA should exclude email domain {domain}"
|
|
|
|
print(f"✓ Real CA certificate has proper security constraints: {cert_files['ca_cert']}")
|
|
|
|
|
|
def validate_ca_certificate_config():
|
|
"""Validate CA certificate configuration in Ansible files (CI mode)"""
|
|
# Check that the Ansible task file has proper CA certificate configuration
|
|
openssl_task_file = find_ansible_file("roles/strongswan/tasks/openssl.yml")
|
|
if not openssl_task_file:
|
|
print("⚠ Could not find openssl.yml task file")
|
|
return
|
|
|
|
with open(openssl_task_file) as f:
|
|
content = f.read()
|
|
|
|
# Verify key security configurations are present
|
|
security_checks = [
|
|
("name_constraints_permitted", "Name constraints should be configured"),
|
|
("name_constraints_excluded", "Excluded name constraints should be configured"),
|
|
("extended_key_usage", "Extended Key Usage should be configured"),
|
|
("1.3.6.1.5.5.7.3.17", "IPsec End Entity OID should be present"),
|
|
("serverAuth", "Server authentication EKU should be present"),
|
|
("clientAuth", "Client authentication EKU should be present"),
|
|
("basic_constraints", "Basic constraints should be configured"),
|
|
("CA:TRUE", "CA certificate should be marked as CA"),
|
|
("pathlen:0", "Path length constraint should be set"),
|
|
]
|
|
|
|
for check, message in security_checks:
|
|
assert check in content, f"Missing security configuration: {message}"
|
|
|
|
# Verify public domains are excluded
|
|
public_domains = [".com", ".org", ".net", ".gov", ".edu", ".mil", ".int"]
|
|
for domain in public_domains:
|
|
# Handle both double quotes and single quotes in YAML
|
|
assert f'"DNS:{domain}"' in content or f"'DNS:{domain}'" in content, (
|
|
f"Public domain {domain} should be excluded"
|
|
)
|
|
|
|
# Verify private IP ranges are excluded
|
|
private_ranges = ["10.0.0.0", "172.16.0.0", "192.168.0.0"]
|
|
for ip_range in private_ranges:
|
|
assert ip_range in content, f"Private IP range {ip_range} should be excluded"
|
|
|
|
# Verify email domains are excluded (Issue #153)
|
|
email_domains = [".com", ".org", ".net", ".gov", ".edu", ".mil", ".int"]
|
|
for domain in email_domains:
|
|
# Handle both double quotes and single quotes in YAML
|
|
assert f'"email:{domain}"' in content or f"'email:{domain}'" in content, (
|
|
f"Email domain {domain} should be excluded"
|
|
)
|
|
|
|
# Verify IPv6 constraints are present (Issue #153)
|
|
assert "IP:::/0" in content, "IPv6 all addresses should be excluded"
|
|
|
|
print("✓ CA certificate configuration has proper security constraints")
|
|
|
|
|
|
def test_ca_certificate():
|
|
"""Test CA certificate - uses real certs if available, else validates config (Issue #75, #153)"""
|
|
cert_files = find_generated_certificates()
|
|
if cert_files:
|
|
validate_ca_certificate_real(cert_files)
|
|
else:
|
|
validate_ca_certificate_config()
|
|
|
|
|
|
def validate_server_certificates_real(cert_files):
|
|
"""Validate actual Ansible-generated server certificates"""
|
|
# Filter to only actual server certificates (not client certs)
|
|
# Server certificates contain IP addresses in the filename
|
|
import re
|
|
|
|
server_certs = [
|
|
f
|
|
for f in cert_files["server_certs"]
|
|
if not f.endswith("/cacert.pem") and re.search(r"\d+\.\d+\.\d+\.\d+\.crt$", f)
|
|
]
|
|
if not server_certs:
|
|
print("⚠ No server certificates found")
|
|
return
|
|
|
|
for server_cert_path in server_certs:
|
|
with open(server_cert_path, "rb") as f:
|
|
cert_data = f.read()
|
|
|
|
certificate = x509.load_pem_x509_certificate(cert_data)
|
|
|
|
# Check it's not a CA certificate
|
|
basic_constraints = certificate.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value
|
|
assert basic_constraints.ca is False, "Server certificate should not be a CA"
|
|
|
|
# Check Extended Key Usage (Issue #75)
|
|
eku = certificate.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE).value
|
|
assert x509.oid.ExtendedKeyUsageOID.SERVER_AUTH in eku, "Server cert must have serverAuth EKU"
|
|
assert x509.ObjectIdentifier("1.3.6.1.5.5.7.3.17") in eku, "Server cert should have IPsec End Entity EKU"
|
|
# Security check: Server certificates should NOT have clientAuth to prevent role confusion (Issue #153)
|
|
assert x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH not in eku, (
|
|
"Server cert should NOT have clientAuth EKU for role separation"
|
|
)
|
|
|
|
# Check SAN extension exists (required for Apple devices)
|
|
try:
|
|
san = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value
|
|
assert len(san) > 0, "Server certificate must have SAN extension for Apple device compatibility"
|
|
except x509.ExtensionNotFound:
|
|
assert False, "Server certificate missing SAN extension - required for modern clients"
|
|
|
|
print(f"✓ Real server certificate valid: {os.path.basename(server_cert_path)}")
|
|
|
|
|
|
def validate_server_certificates_config():
|
|
"""Validate server certificate configuration in Ansible files (CI mode)"""
|
|
openssl_task_file = find_ansible_file("roles/strongswan/tasks/openssl.yml")
|
|
if not openssl_task_file:
|
|
print("⚠ Could not find openssl.yml task file")
|
|
return
|
|
|
|
with open(openssl_task_file) as f:
|
|
content = f.read()
|
|
|
|
# Look for server certificate CSR section
|
|
server_csr_section = re.search(r"Create CSRs for server certificate.*?register: server_csr", content, re.DOTALL)
|
|
if not server_csr_section:
|
|
print("⚠ Could not find server certificate CSR section")
|
|
return
|
|
|
|
server_section = server_csr_section.group(0)
|
|
|
|
# Check server certificate CSR configuration
|
|
server_checks = [
|
|
("subject_alt_name", "Server certificates should have SAN extension"),
|
|
("serverAuth", "Server certificates should have serverAuth EKU"),
|
|
("1.3.6.1.5.5.7.3.17", "Server certificates should have IPsec End Entity EKU"),
|
|
("digitalSignature", "Server certificates should have digital signature usage"),
|
|
("keyEncipherment", "Server certificates should have key encipherment usage"),
|
|
]
|
|
|
|
for check, message in server_checks:
|
|
assert check in server_section, f"Missing server certificate configuration: {message}"
|
|
|
|
# Security check: Server certificates should NOT have clientAuth (Issue #153)
|
|
# Look for clientAuth in extended_key_usage section, not in comments
|
|
eku_lines = [
|
|
line
|
|
for line in server_section.split("\n")
|
|
if "extended_key_usage:" in line or (line.strip().startswith("- ") and "clientAuth" in line)
|
|
]
|
|
has_client_auth = any("clientAuth" in line for line in eku_lines if line.strip().startswith("- "))
|
|
assert not has_client_auth, "Server certificates should NOT have clientAuth EKU for role separation"
|
|
|
|
# Verify SAN extension is configured for Apple compatibility
|
|
assert "subjectAltName" in server_section, "Server certificates missing SAN configuration for Apple compatibility"
|
|
|
|
print("✓ Server certificate configuration has proper EKU and SAN settings")
|
|
|
|
|
|
def test_server_certificates():
|
|
"""Test server certificates - uses real certs if available, else validates config"""
|
|
cert_files = find_generated_certificates()
|
|
if cert_files:
|
|
validate_server_certificates_real(cert_files)
|
|
else:
|
|
validate_server_certificates_config()
|
|
|
|
|
|
def validate_client_certificates_real(cert_files):
|
|
"""Validate actual Ansible-generated client certificates"""
|
|
# Find client certificates (not CA cert, not server cert with IP/DNS name)
|
|
client_certs = []
|
|
for cert_path in cert_files["server_certs"]:
|
|
if "cacert.pem" in cert_path:
|
|
continue
|
|
|
|
with open(cert_path, "rb") as f:
|
|
cert_data = f.read()
|
|
certificate = x509.load_pem_x509_certificate(cert_data)
|
|
|
|
# Check if this looks like a client cert vs server cert
|
|
cn = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
|
|
# Server certs typically have IP addresses or domain names as CN
|
|
if not (cn.replace(".", "").isdigit() or "." in cn and len(cn.split(".")) == 4):
|
|
client_certs.append((cert_path, certificate))
|
|
|
|
if not client_certs:
|
|
print("⚠ No client certificates found")
|
|
return
|
|
|
|
for cert_path, certificate in client_certs:
|
|
# Check it's not a CA certificate
|
|
basic_constraints = certificate.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS).value
|
|
assert basic_constraints.ca is False, "Client certificate should not be a CA"
|
|
|
|
# Check Extended Key Usage restrictions (Issue #75)
|
|
eku = certificate.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE).value
|
|
assert x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in eku, "Client cert must have clientAuth EKU"
|
|
assert x509.ObjectIdentifier("1.3.6.1.5.5.7.3.17") in eku, "Client cert should have IPsec End Entity EKU"
|
|
|
|
# Security check: Client certificates should NOT have serverAuth (prevents impersonation) (Issue #153)
|
|
assert x509.oid.ExtendedKeyUsageOID.SERVER_AUTH not in eku, (
|
|
"Client cert must NOT have serverAuth EKU to prevent server impersonation"
|
|
)
|
|
|
|
# Check SAN extension for email
|
|
try:
|
|
san = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value
|
|
email_sans = [name.value for name in san if isinstance(name, x509.RFC822Name)]
|
|
assert len(email_sans) > 0, "Client certificate should have email SAN"
|
|
except x509.ExtensionNotFound:
|
|
print(f"⚠ Client certificate missing SAN extension: {os.path.basename(cert_path)}")
|
|
|
|
print(f"✓ Real client certificate valid: {os.path.basename(cert_path)}")
|
|
|
|
|
|
def validate_client_certificates_config():
|
|
"""Validate client certificate configuration in Ansible files (CI mode)"""
|
|
openssl_task_file = find_ansible_file("roles/strongswan/tasks/openssl.yml")
|
|
if not openssl_task_file:
|
|
print("⚠ Could not find openssl.yml task file")
|
|
return
|
|
|
|
with open(openssl_task_file) as f:
|
|
content = f.read()
|
|
|
|
# Look for client certificate CSR section
|
|
client_csr_section = re.search(
|
|
r"Create CSRs for client certificates.*?register: client_csr_jobs", content, re.DOTALL
|
|
)
|
|
if not client_csr_section:
|
|
print("⚠ Could not find client certificate CSR section")
|
|
return
|
|
|
|
client_section = client_csr_section.group(0)
|
|
|
|
# Check client certificate configuration
|
|
client_checks = [
|
|
("clientAuth", "Client certificates should have clientAuth EKU"),
|
|
("1.3.6.1.5.5.7.3.17", "Client certificates should have IPsec End Entity EKU"),
|
|
("digitalSignature", "Client certificates should have digital signature usage"),
|
|
("keyEncipherment", "Client certificates should have key encipherment usage"),
|
|
("email:", "Client certificates should have email SAN"),
|
|
]
|
|
|
|
for check, message in client_checks:
|
|
assert check in client_section, f"Missing client certificate configuration: {message}"
|
|
|
|
# Security check: Client certificates should NOT have serverAuth (Issue #153)
|
|
# Look for serverAuth in extended_key_usage section, not in comments
|
|
eku_lines = [
|
|
line
|
|
for line in client_section.split("\n")
|
|
if "extended_key_usage:" in line or (line.strip().startswith("- ") and "serverAuth" in line)
|
|
]
|
|
has_server_auth = any("serverAuth" in line for line in eku_lines if line.strip().startswith("- "))
|
|
assert not has_server_auth, "Client certificates must NOT have serverAuth EKU to prevent server impersonation"
|
|
|
|
# Verify client certificates use unique email domains (Issue #153)
|
|
assert "openssl_constraint_random_id" in client_section, (
|
|
"Client certificates should use unique email domain per deployment"
|
|
)
|
|
|
|
print("✓ Client certificate configuration has proper EKU restrictions (no serverAuth)")
|
|
|
|
|
|
def test_client_certificates():
|
|
"""Test client certificates - uses real certs if available, else validates config (Issue #75, #153)"""
|
|
cert_files = find_generated_certificates()
|
|
if cert_files:
|
|
validate_client_certificates_real(cert_files)
|
|
else:
|
|
validate_client_certificates_config()
|
|
|
|
|
|
def validate_pkcs12_files_real(cert_files):
|
|
"""Validate actual Ansible-generated PKCS#12 files"""
|
|
if not cert_files.get("p12_files"):
|
|
print("⚠ No PKCS#12 files found")
|
|
return
|
|
|
|
major, minor = test_openssl_version_detection()
|
|
|
|
for p12_file in cert_files["p12_files"]:
|
|
assert os.path.exists(p12_file), f"PKCS#12 file should exist: {p12_file}"
|
|
|
|
# Test that PKCS#12 file can be read (validates format)
|
|
legacy_flag = ["-legacy"] if major >= 3 else []
|
|
|
|
result = subprocess.run(
|
|
[
|
|
"openssl",
|
|
"pkcs12",
|
|
"-info",
|
|
"-in",
|
|
p12_file,
|
|
"-passin",
|
|
"pass:", # Try empty password first
|
|
"-noout",
|
|
]
|
|
+ legacy_flag,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
# PKCS#12 files should be readable (even if password-protected)
|
|
# We're just testing format validity, not trying to extract contents
|
|
if result.returncode != 0:
|
|
# Try with common password patterns if empty password fails
|
|
print(f"⚠ PKCS#12 file may require password: {os.path.basename(p12_file)}")
|
|
|
|
print(f"✓ Real PKCS#12 file exists: {os.path.basename(p12_file)}")
|
|
|
|
|
|
def validate_pkcs12_files_config():
|
|
"""Validate PKCS#12 file configuration in Ansible files (CI mode)"""
|
|
openssl_task_file = find_ansible_file("roles/strongswan/tasks/openssl.yml")
|
|
if not openssl_task_file:
|
|
print("⚠ Could not find openssl.yml task file")
|
|
return
|
|
|
|
with open(openssl_task_file) as f:
|
|
content = f.read()
|
|
|
|
# Check PKCS#12 generation configuration
|
|
p12_checks = [
|
|
("openssl_pkcs12", "PKCS#12 generation should be configured"),
|
|
("encryption_level", "PKCS#12 encryption level should be configured"),
|
|
("compatibility2022", "PKCS#12 should use Apple-compatible encryption"),
|
|
("friendly_name", "PKCS#12 should have friendly names"),
|
|
("other_certificates", "PKCS#12 should include CA certificate for full chain"),
|
|
("passphrase", "PKCS#12 files should be password protected"),
|
|
('mode: "0600"', "PKCS#12 files should have secure permissions"),
|
|
]
|
|
|
|
for check, message in p12_checks:
|
|
assert check in content, f"Missing PKCS#12 configuration: {message}"
|
|
|
|
print("✓ PKCS#12 configuration has proper Apple device compatibility settings")
|
|
|
|
|
|
def test_pkcs12_files():
|
|
"""Test PKCS#12 files - uses real files if available, else validates config (Issue #14755, #14718)"""
|
|
cert_files = find_generated_certificates()
|
|
if cert_files:
|
|
validate_pkcs12_files_real(cert_files)
|
|
else:
|
|
validate_pkcs12_files_config()
|
|
|
|
|
|
def validate_certificate_chain_real(cert_files):
|
|
"""Validate actual Ansible-generated certificate chain"""
|
|
# Load CA certificate
|
|
with open(cert_files["ca_cert"], "rb") as f:
|
|
ca_cert_data = f.read()
|
|
ca_certificate = x509.load_pem_x509_certificate(ca_cert_data)
|
|
|
|
# Test that all other certificates are signed by the CA
|
|
other_certs = [f for f in cert_files["server_certs"] if f != cert_files["ca_cert"]]
|
|
|
|
if not other_certs:
|
|
print("⚠ No client/server certificates found to validate")
|
|
return
|
|
|
|
for cert_path in other_certs:
|
|
with open(cert_path, "rb") as f:
|
|
cert_data = f.read()
|
|
certificate = x509.load_pem_x509_certificate(cert_data)
|
|
|
|
# Verify the certificate was signed by our CA
|
|
assert certificate.issuer == ca_certificate.subject, f"Certificate {cert_path} not signed by CA"
|
|
|
|
# Verify certificate is currently valid (not expired)
|
|
from datetime import datetime
|
|
|
|
now = datetime.now(UTC)
|
|
assert certificate.not_valid_before_utc <= now, f"Certificate {cert_path} not yet valid"
|
|
assert certificate.not_valid_after_utc >= now, f"Certificate {cert_path} has expired"
|
|
|
|
print(f"✓ Real certificate chain valid: {os.path.basename(cert_path)}")
|
|
|
|
print("✓ All real certificates properly signed by CA")
|
|
|
|
|
|
def validate_certificate_chain_config():
|
|
"""Validate certificate chain configuration in Ansible files (CI mode)"""
|
|
openssl_task_file = find_ansible_file("roles/strongswan/tasks/openssl.yml")
|
|
if not openssl_task_file:
|
|
print("⚠ Could not find openssl.yml task file")
|
|
return
|
|
|
|
with open(openssl_task_file) as f:
|
|
content = f.read()
|
|
|
|
# Check certificate signing configuration
|
|
chain_checks = [
|
|
("provider: ownca", "Certificates should be signed by own CA"),
|
|
("ownca_path", "CA certificate path should be specified"),
|
|
("ownca_privatekey_path", "CA private key path should be specified"),
|
|
("ownca_privatekey_passphrase", "CA private key should be password protected"),
|
|
("certificate_validity_days: 3650", "Certificate validity should be configurable (default 10 years)"),
|
|
(
|
|
'ownca_not_after: "+{{ certificate_validity_days }}d"',
|
|
"Certificates should use configurable validity period",
|
|
),
|
|
('ownca_not_before: "-1d"', "Certificates should have backdated start time"),
|
|
("curve: secp384r1", "Should use strong elliptic curve cryptography"),
|
|
("type: ECC", "Should use elliptic curve keys for better security"),
|
|
]
|
|
|
|
for check, message in chain_checks:
|
|
assert check in content, f"Missing certificate chain configuration: {message}"
|
|
|
|
print("✓ Certificate chain configuration properly set up for CA signing")
|
|
|
|
|
|
def test_certificate_chain():
|
|
"""Test certificate chain - uses real certs if available, else validates config"""
|
|
cert_files = find_generated_certificates()
|
|
if cert_files:
|
|
validate_certificate_chain_real(cert_files)
|
|
else:
|
|
validate_certificate_chain_config()
|
|
|
|
|
|
def find_ansible_file(relative_path):
|
|
"""Find Ansible file from various possible locations"""
|
|
# Try different base paths
|
|
possible_bases = [
|
|
".", # Current directory
|
|
"..", # Parent directory (from tests/unit)
|
|
"../..", # Grandparent (from tests/unit to project root)
|
|
"../../..", # Alternative deep path
|
|
]
|
|
|
|
for base in possible_bases:
|
|
full_path = os.path.join(base, relative_path)
|
|
if os.path.exists(full_path):
|
|
return full_path
|
|
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tests = [
|
|
test_openssl_version_detection,
|
|
test_ca_certificate,
|
|
test_server_certificates,
|
|
test_client_certificates,
|
|
test_pkcs12_files,
|
|
test_certificate_chain,
|
|
]
|
|
|
|
failed = 0
|
|
for test in tests:
|
|
try:
|
|
test()
|
|
except AssertionError as e:
|
|
print(f"✗ {test.__name__} failed: {e}")
|
|
failed += 1
|
|
except Exception as e:
|
|
print(f"✗ {test.__name__} error: {e}")
|
|
failed += 1
|
|
|
|
if failed > 0:
|
|
print(f"\n{failed} tests failed")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"\nAll {len(tests)} tests passed!")
|