diff --git a/roles/strongswan/handlers/main.yml b/roles/strongswan/handlers/main.yml index 3e2b0354..f60a495a 100644 --- a/roles/strongswan/handlers/main.yml +++ b/roles/strongswan/handlers/main.yml @@ -9,4 +9,14 @@ service: name=apparmor state=restarted - name: rereadcrls - shell: ipsec rereadcrls; ipsec purgecrls + shell: | + # Wait for ipsec daemon to be ready (up to 10 seconds) + for i in $(seq 1 10); do + if ipsec statusall >/dev/null 2>&1; then + ipsec rereadcrls && ipsec purgecrls + exit 0 + fi + sleep 1 + done + # If daemon still not ready, try anyway but don't fail the playbook + ipsec rereadcrls; ipsec purgecrls || true diff --git a/tests/unit/test_strongswan_templates.py b/tests/unit/test_strongswan_templates.py index c74baa87..861555fc 100644 --- a/tests/unit/test_strongswan_templates.py +++ b/tests/unit/test_strongswan_templates.py @@ -6,7 +6,6 @@ Tests all strongswan role templates with various configurations. import os import sys import uuid -from pathlib import Path from jinja2 import Environment, FileSystemLoader, StrictUndefined @@ -48,7 +47,7 @@ def mock_b64decode(value): def get_strongswan_test_variables(scenario='default'): """Get test variables for StrongSwan templates with different scenarios.""" base_vars = load_test_variables() - + # Add StrongSwan specific variables strongswan_vars = { 'ipsec_config_path': '/etc/ipsec.d', @@ -80,10 +79,10 @@ def get_strongswan_test_variables(scenario='default'): 'leftsubnet': '0.0.0.0/0,::/0', 'rightsourceip': '10.19.48.2/24,fd9d:bc11:4021::2/64', } - + # Merge with base variables test_vars = {**base_vars, **strongswan_vars} - + # Apply scenario-specific overrides if scenario == 'ipv4_only': test_vars['ipv6_support'] = False @@ -95,7 +94,7 @@ def get_strongswan_test_variables(scenario='default'): test_vars['subjectAltName_type'] = 'DNS' elif scenario == 'openssl_legacy': test_vars['openssl_version'] = '1.1.1' - + return test_vars @@ -110,64 +109,64 @@ def test_strongswan_templates(): 'roles/strongswan/templates/client_ipsec.secrets.j2', 'roles/strongswan/templates/100-CustomLimitations.conf.j2', ] - + scenarios = ['default', 'ipv4_only', 'dns_hostname', 'openssl_legacy'] errors = [] tested = 0 - + for template_path in templates: if not os.path.exists(template_path): print(f" ⚠️ Skipping {template_path} (not found)") continue - + template_dir = os.path.dirname(template_path) template_name = os.path.basename(template_path) - + for scenario in scenarios: tested += 1 test_vars = get_strongswan_test_variables(scenario) - + try: env = Environment( loader=FileSystemLoader(template_dir), undefined=StrictUndefined ) - + # Add mock filters env.filters['to_uuid'] = mock_to_uuid env.filters['bool'] = mock_bool env.filters['b64encode'] = mock_b64encode env.filters['b64decode'] = mock_b64decode env.tests['version'] = mock_version - + # For client templates, add item context if 'client' in template_name: test_vars['item'] = 'testuser' - + template = env.get_template(template_name) output = template.render(**test_vars) - + # Basic validation assert len(output) > 0, f"Empty output from {template_path} ({scenario})" - + # Specific validations based on template if 'ipsec.conf' in template_name and 'client' not in template_name: assert 'conn' in output, "Missing connection definition" if scenario != 'ipv4_only' and test_vars.get('ipv6_support'): assert '::/0' in output or 'fd9d:bc11' in output, "Missing IPv6 configuration" - + if 'ipsec.secrets' in template_name: assert 'PSK' in output or 'ECDSA' in output, "Missing authentication method" - + if 'strongswan.conf' in template_name: assert 'charon' in output, "Missing charon configuration" - + print(f" ✅ {template_name} ({scenario})") - + except Exception as e: errors.append(f"{template_path} ({scenario}): {str(e)}") print(f" ❌ {template_name} ({scenario}): {str(e)}") - + if errors: print(f"\n❌ StrongSwan template tests failed with {len(errors)} errors") for error in errors[:5]: @@ -182,28 +181,28 @@ def test_openssl_template_constraints(): """Test the OpenSSL task template that had the inline comment issue.""" # This tests the actual openssl.yml task file to ensure our fix works import yaml - + openssl_path = 'roles/strongswan/tasks/openssl.yml' if not os.path.exists(openssl_path): print("⚠️ OpenSSL tasks file not found") return True - + try: - with open(openssl_path, 'r') as f: + with open(openssl_path) as f: content = yaml.safe_load(f) - + # Find the CA CSR task ca_csr_task = None for task in content: if isinstance(task, dict) and task.get('name', '').startswith('Create certificate signing request'): ca_csr_task = task break - + if ca_csr_task: # Check that name_constraints_permitted is properly formatted csr_module = ca_csr_task.get('community.crypto.openssl_csr_pipe', {}) constraints = csr_module.get('name_constraints_permitted', '') - + # The constraints should be a Jinja2 template without inline comments if '#' in str(constraints): # Check if the # is within {{ }} @@ -213,10 +212,10 @@ def test_openssl_template_constraints(): if '#' in block: print("❌ Found inline comment in Jinja2 expression") return False - + print("✅ OpenSSL template constraints validated") return True - + except Exception as e: print(f"⚠️ Error checking OpenSSL tasks: {e}") return True # Don't fail the test for this @@ -225,17 +224,17 @@ def test_openssl_template_constraints(): def test_mobileconfig_template(): """Test the mobileconfig template with various scenarios.""" template_path = 'roles/strongswan/templates/mobileconfig.j2' - + if not os.path.exists(template_path): print("⚠️ Mobileconfig template not found") return True - + # Skip this test - mobileconfig.j2 is too tightly coupled to Ansible runtime # It requires complex mock objects (item.1.stdout) and many dynamic variables # that are generated during playbook execution print("⚠️ Skipping mobileconfig template test (requires Ansible runtime context)") return True - + test_cases = [ { 'name': 'iPhone with cellular on-demand', @@ -254,7 +253,7 @@ def test_mobileconfig_template(): 'algo_ondemand_wifi': 'false', }, ] - + errors = [] for test_case in test_cases: test_vars = get_strongswan_test_variables() @@ -263,7 +262,7 @@ def test_mobileconfig_template(): class MockTaskResult: def __init__(self, content): self.stdout = content - + test_vars['item'] = ('testuser', MockTaskResult('TU9DS19QS0NTMTJfQ09OVEVOVA==')) # Tuple with mock result test_vars['PayloadContentCA_base64'] = 'TU9DS19DQV9DRVJUX0JBU0U2NA==' # Valid base64 test_vars['PayloadContentUser_base64'] = 'TU9DS19VU0VSX0NFUlRfQkFTRTY0' # Valid base64 @@ -273,62 +272,62 @@ def test_mobileconfig_template(): test_vars['VPN_PayloadIdentifier'] = str(uuid.uuid4()) test_vars['CA_PayloadIdentifier'] = str(uuid.uuid4()) test_vars['PayloadContentCA'] = 'TU9DS19DQV9DRVJUX0NPTlRFTlQ=' # Valid base64 - + try: env = Environment( loader=FileSystemLoader('roles/strongswan/templates'), undefined=StrictUndefined ) - + # Add mock filters env.filters['to_uuid'] = mock_to_uuid env.filters['b64encode'] = mock_b64encode env.filters['b64decode'] = mock_b64decode - + template = env.get_template('mobileconfig.j2') output = template.render(**test_vars) - + # Validate output assert ' List[Path]: +def find_jinja2_templates(root_dir: str = '.') -> list[Path]: """Find all Jinja2 template files in the project.""" templates = [] patterns = ['**/*.j2', '**/*.jinja2', '**/*.yml.j2', '**/*.conf.j2'] - + # Skip these directories skip_dirs = {'.git', '.venv', 'venv', '.env', 'configs', '__pycache__', '.cache'} - + for pattern in patterns: for path in Path(root_dir).glob(pattern): # Skip if in a directory we want to ignore if not any(skip_dir in path.parts for skip_dir in skip_dirs): templates.append(path) - + return sorted(templates) -def check_inline_comments_in_expressions(template_content: str, template_path: Path) -> List[str]: +def check_inline_comments_in_expressions(template_content: str, template_path: Path) -> list[str]: """ Check for inline comments (#) within Jinja2 expressions. This is the error we just fixed in openssl.yml. """ errors = [] - + # Pattern to find Jinja2 expressions jinja_pattern = re.compile(r'\{\{.*?\}\}|\{%.*?%\}', re.DOTALL) - + for match in jinja_pattern.finditer(template_content): expression = match.group() lines = expression.split('\n') - + for i, line in enumerate(lines): # Check for # that's not in a string # Simple heuristic: if # appears after non-whitespace and not in quotes @@ -54,7 +52,7 @@ def check_inline_comments_in_expressions(template_content: str, template_path: P # Remove quoted strings to avoid false positives cleaned = re.sub(r'"[^"]*"', '', line) cleaned = re.sub(r"'[^']*'", '', cleaned) - + if '#' in cleaned: # Check if it's likely a comment (has text after it) hash_pos = cleaned.index('#') @@ -64,25 +62,25 @@ def check_inline_comments_in_expressions(template_content: str, template_path: P f"{template_path}:{line_num}: Inline comment (#) found in Jinja2 expression. " f"Move comments outside the expression." ) - + return errors -def check_undefined_variables(template_path: Path) -> List[str]: +def check_undefined_variables(template_path: Path) -> list[str]: """ Parse template and extract all undefined variables. This helps identify what variables need to be provided. """ errors = [] - + try: - with open(template_path, 'r') as f: + with open(template_path) as f: template_content = f.read() - + env = Environment(undefined=StrictUndefined) ast = env.parse(template_content) undefined_vars = meta.find_undeclared_variables(ast) - + # Common Ansible variables that are always available ansible_builtins = { 'ansible_default_ipv4', 'ansible_default_ipv6', 'ansible_hostname', @@ -91,30 +89,30 @@ def check_undefined_variables(template_path: Path) -> List[str]: 'play_hosts', 'ansible_version', 'ansible_user', 'ansible_host', 'item', 'ansible_loop', 'ansible_index', 'lookup' } - + # Filter out known Ansible variables unknown_vars = undefined_vars - ansible_builtins - + # Only report if there are truly unknown variables if unknown_vars and len(unknown_vars) < 20: # Avoid noise from templates with many vars errors.append( f"{template_path}: Uses undefined variables: {', '.join(sorted(unknown_vars))}" ) - - except Exception as e: + + except Exception: # Don't report parse errors here, they're handled elsewhere pass - + return errors -def validate_template_syntax(template_path: Path) -> Tuple[bool, List[str]]: +def validate_template_syntax(template_path: Path) -> tuple[bool, list[str]]: """ Validate a single template for syntax errors. Returns (is_valid, list_of_errors) """ errors = [] - + # Skip full parsing for templates that use Ansible-specific features heavily # We still check for inline comments but skip full template parsing ansible_specific_templates = { @@ -122,30 +120,30 @@ def validate_template_syntax(template_path: Path) -> Tuple[bool, List[str]]: 'mobileconfig.j2', # Uses |to_uuid filter and complex item structures 'vpn-dict.j2', # Uses |to_uuid filter } - + if template_path.name in ansible_specific_templates: # Still check for inline comments but skip full parsing try: - with open(template_path, 'r') as f: + with open(template_path) as f: template_content = f.read() errors.extend(check_inline_comments_in_expressions(template_content, template_path)) except Exception: pass return len(errors) == 0, errors - + try: - with open(template_path, 'r') as f: + with open(template_path) as f: template_content = f.read() - + # Check for inline comments first (our custom check) errors.extend(check_inline_comments_in_expressions(template_content, template_path)) - + # Try to parse the template env = Environment( loader=FileSystemLoader(template_path.parent), undefined=StrictUndefined ) - + # Add mock Ansible filters to avoid syntax errors env.filters['bool'] = lambda x: x env.filters['to_uuid'] = lambda x: x @@ -153,89 +151,89 @@ def validate_template_syntax(template_path: Path) -> Tuple[bool, List[str]]: env.filters['b64decode'] = lambda x: x env.filters['regex_replace'] = lambda x, y, z: x env.filters['default'] = lambda x, d: x if x else d - + # This will raise TemplateSyntaxError if there's a syntax problem env.get_template(template_path.name) - + # Also check for undefined variables (informational) # Commenting out for now as it's too noisy, but useful for debugging # errors.extend(check_undefined_variables(template_path)) - + except TemplateSyntaxError as e: errors.append(f"{template_path}:{e.lineno}: Syntax error: {e.message}") except UnicodeDecodeError: errors.append(f"{template_path}: Unable to decode file (not UTF-8)") except Exception as e: errors.append(f"{template_path}: Error: {str(e)}") - + return len(errors) == 0, errors -def check_common_antipatterns(template_path: Path) -> List[str]: +def check_common_antipatterns(template_path: Path) -> list[str]: """Check for common Jinja2 anti-patterns.""" warnings = [] - + try: - with open(template_path, 'r') as f: + with open(template_path) as f: content = f.read() - + # Check for missing spaces around filters if re.search(r'\{\{[^}]+\|[^ ]', content): warnings.append(f"{template_path}: Missing space after filter pipe (|)") - + # Check for deprecated 'when' in Jinja2 (should use if) if re.search(r'\{%\s*when\s+', content): warnings.append(f"{template_path}: Use 'if' instead of 'when' in Jinja2 templates") - + # Check for extremely long expressions (harder to debug) for match in re.finditer(r'\{\{(.+?)\}\}', content, re.DOTALL): if len(match.group(1)) > 200: line_num = content[:match.start()].count('\n') + 1 warnings.append(f"{template_path}:{line_num}: Very long expression (>200 chars), consider breaking it up") - + except Exception: pass # Ignore errors in anti-pattern checking - + return warnings def main(): """Main validation function.""" print("🔍 Validating Jinja2 templates in Algo...\n") - + # Find all templates templates = find_jinja2_templates() print(f"Found {len(templates)} Jinja2 templates\n") - + all_errors = [] all_warnings = [] valid_count = 0 - + # Validate each template for template in templates: is_valid, errors = validate_template_syntax(template) warnings = check_common_antipatterns(template) - + if is_valid: valid_count += 1 else: all_errors.extend(errors) - + all_warnings.extend(warnings) - + # Report results print(f"✅ {valid_count}/{len(templates)} templates have valid syntax") - + if all_errors: print(f"\n❌ Found {len(all_errors)} errors:\n") for error in all_errors: print(f" ERROR: {error}") - + if all_warnings: print(f"\n⚠️ Found {len(all_warnings)} warnings:\n") for warning in all_warnings: print(f" WARN: {warning}") - + if all_errors: print("\n❌ Template validation FAILED") return 1 @@ -245,4 +243,4 @@ def main(): if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main())