From 4be204f1d505ab1d4d7d213673a656eb935fe38b Mon Sep 17 00:00:00 2001 From: Dan Guido Date: Wed, 6 Aug 2025 21:22:11 -0700 Subject: [PATCH] Fix StrongSwan CRL reread handler race condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ipsec rereadcrls command was failing with exit code 7 when the IPsec daemon wasn't fully started yet. This is a timing issue that can occur during initial setup. Added retry logic to: 1. Wait up to 10 seconds for the IPsec daemon to be ready 2. Check daemon status before attempting CRL operations 3. Gracefully handle the case where daemon isn't ready Also fixed Python linting issues (whitespace) in test files caught by ruff. šŸ¤– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- roles/strongswan/handlers/main.yml | 12 ++- tests/unit/test_strongswan_templates.py | 93 +++++++++++---------- tests/validate_jinja2_templates.py | 104 ++++++++++++------------ 3 files changed, 108 insertions(+), 101 deletions(-) diff --git a/roles/strongswan/handlers/main.yml b/roles/strongswan/handlers/main.yml index 3e2b0354..f60a495a 100644 --- a/roles/strongswan/handlers/main.yml +++ b/roles/strongswan/handlers/main.yml @@ -9,4 +9,14 @@ service: name=apparmor state=restarted - name: rereadcrls - shell: ipsec rereadcrls; ipsec purgecrls + shell: | + # Wait for ipsec daemon to be ready (up to 10 seconds) + for i in $(seq 1 10); do + if ipsec statusall >/dev/null 2>&1; then + ipsec rereadcrls && ipsec purgecrls + exit 0 + fi + sleep 1 + done + # If daemon still not ready, try anyway but don't fail the playbook + ipsec rereadcrls; ipsec purgecrls || true diff --git a/tests/unit/test_strongswan_templates.py b/tests/unit/test_strongswan_templates.py index c74baa87..861555fc 100644 --- a/tests/unit/test_strongswan_templates.py +++ b/tests/unit/test_strongswan_templates.py @@ -6,7 +6,6 @@ Tests all strongswan role templates with various configurations. import os import sys import uuid -from pathlib import Path from jinja2 import Environment, FileSystemLoader, StrictUndefined @@ -48,7 +47,7 @@ def mock_b64decode(value): def get_strongswan_test_variables(scenario='default'): """Get test variables for StrongSwan templates with different scenarios.""" base_vars = load_test_variables() - + # Add StrongSwan specific variables strongswan_vars = { 'ipsec_config_path': '/etc/ipsec.d', @@ -80,10 +79,10 @@ def get_strongswan_test_variables(scenario='default'): 'leftsubnet': '0.0.0.0/0,::/0', 'rightsourceip': '10.19.48.2/24,fd9d:bc11:4021::2/64', } - + # Merge with base variables test_vars = {**base_vars, **strongswan_vars} - + # Apply scenario-specific overrides if scenario == 'ipv4_only': test_vars['ipv6_support'] = False @@ -95,7 +94,7 @@ def get_strongswan_test_variables(scenario='default'): test_vars['subjectAltName_type'] = 'DNS' elif scenario == 'openssl_legacy': test_vars['openssl_version'] = '1.1.1' - + return test_vars @@ -110,64 +109,64 @@ def test_strongswan_templates(): 'roles/strongswan/templates/client_ipsec.secrets.j2', 'roles/strongswan/templates/100-CustomLimitations.conf.j2', ] - + scenarios = ['default', 'ipv4_only', 'dns_hostname', 'openssl_legacy'] errors = [] tested = 0 - + for template_path in templates: if not os.path.exists(template_path): print(f" āš ļø Skipping {template_path} (not found)") continue - + template_dir = os.path.dirname(template_path) template_name = os.path.basename(template_path) - + for scenario in scenarios: tested += 1 test_vars = get_strongswan_test_variables(scenario) - + try: env = Environment( loader=FileSystemLoader(template_dir), undefined=StrictUndefined ) - + # Add mock filters env.filters['to_uuid'] = mock_to_uuid env.filters['bool'] = mock_bool env.filters['b64encode'] = mock_b64encode env.filters['b64decode'] = mock_b64decode env.tests['version'] = mock_version - + # For client templates, add item context if 'client' in template_name: test_vars['item'] = 'testuser' - + template = env.get_template(template_name) output = template.render(**test_vars) - + # Basic validation assert len(output) > 0, f"Empty output from {template_path} ({scenario})" - + # Specific validations based on template if 'ipsec.conf' in template_name and 'client' not in template_name: assert 'conn' in output, "Missing connection definition" if scenario != 'ipv4_only' and test_vars.get('ipv6_support'): assert '::/0' in output or 'fd9d:bc11' in output, "Missing IPv6 configuration" - + if 'ipsec.secrets' in template_name: assert 'PSK' in output or 'ECDSA' in output, "Missing authentication method" - + if 'strongswan.conf' in template_name: assert 'charon' in output, "Missing charon configuration" - + print(f" āœ… {template_name} ({scenario})") - + except Exception as e: errors.append(f"{template_path} ({scenario}): {str(e)}") print(f" āŒ {template_name} ({scenario}): {str(e)}") - + if errors: print(f"\nāŒ StrongSwan template tests failed with {len(errors)} errors") for error in errors[:5]: @@ -182,28 +181,28 @@ def test_openssl_template_constraints(): """Test the OpenSSL task template that had the inline comment issue.""" # This tests the actual openssl.yml task file to ensure our fix works import yaml - + openssl_path = 'roles/strongswan/tasks/openssl.yml' if not os.path.exists(openssl_path): print("āš ļø OpenSSL tasks file not found") return True - + try: - with open(openssl_path, 'r') as f: + with open(openssl_path) as f: content = yaml.safe_load(f) - + # Find the CA CSR task ca_csr_task = None for task in content: if isinstance(task, dict) and task.get('name', '').startswith('Create certificate signing request'): ca_csr_task = task break - + if ca_csr_task: # Check that name_constraints_permitted is properly formatted csr_module = ca_csr_task.get('community.crypto.openssl_csr_pipe', {}) constraints = csr_module.get('name_constraints_permitted', '') - + # The constraints should be a Jinja2 template without inline comments if '#' in str(constraints): # Check if the # is within {{ }} @@ -213,10 +212,10 @@ def test_openssl_template_constraints(): if '#' in block: print("āŒ Found inline comment in Jinja2 expression") return False - + print("āœ… OpenSSL template constraints validated") return True - + except Exception as e: print(f"āš ļø Error checking OpenSSL tasks: {e}") return True # Don't fail the test for this @@ -225,17 +224,17 @@ def test_openssl_template_constraints(): def test_mobileconfig_template(): """Test the mobileconfig template with various scenarios.""" template_path = 'roles/strongswan/templates/mobileconfig.j2' - + if not os.path.exists(template_path): print("āš ļø Mobileconfig template not found") return True - + # Skip this test - mobileconfig.j2 is too tightly coupled to Ansible runtime # It requires complex mock objects (item.1.stdout) and many dynamic variables # that are generated during playbook execution print("āš ļø Skipping mobileconfig template test (requires Ansible runtime context)") return True - + test_cases = [ { 'name': 'iPhone with cellular on-demand', @@ -254,7 +253,7 @@ def test_mobileconfig_template(): 'algo_ondemand_wifi': 'false', }, ] - + errors = [] for test_case in test_cases: test_vars = get_strongswan_test_variables() @@ -263,7 +262,7 @@ def test_mobileconfig_template(): class MockTaskResult: def __init__(self, content): self.stdout = content - + test_vars['item'] = ('testuser', MockTaskResult('TU9DS19QS0NTMTJfQ09OVEVOVA==')) # Tuple with mock result test_vars['PayloadContentCA_base64'] = 'TU9DS19DQV9DRVJUX0JBU0U2NA==' # Valid base64 test_vars['PayloadContentUser_base64'] = 'TU9DS19VU0VSX0NFUlRfQkFTRTY0' # Valid base64 @@ -273,62 +272,62 @@ def test_mobileconfig_template(): test_vars['VPN_PayloadIdentifier'] = str(uuid.uuid4()) test_vars['CA_PayloadIdentifier'] = str(uuid.uuid4()) test_vars['PayloadContentCA'] = 'TU9DS19DQV9DRVJUX0NPTlRFTlQ=' # Valid base64 - + try: env = Environment( loader=FileSystemLoader('roles/strongswan/templates'), undefined=StrictUndefined ) - + # Add mock filters env.filters['to_uuid'] = mock_to_uuid env.filters['b64encode'] = mock_b64encode env.filters['b64decode'] = mock_b64decode - + template = env.get_template('mobileconfig.j2') output = template.render(**test_vars) - + # Validate output assert ' List[Path]: +def find_jinja2_templates(root_dir: str = '.') -> list[Path]: """Find all Jinja2 template files in the project.""" templates = [] patterns = ['**/*.j2', '**/*.jinja2', '**/*.yml.j2', '**/*.conf.j2'] - + # Skip these directories skip_dirs = {'.git', '.venv', 'venv', '.env', 'configs', '__pycache__', '.cache'} - + for pattern in patterns: for path in Path(root_dir).glob(pattern): # Skip if in a directory we want to ignore if not any(skip_dir in path.parts for skip_dir in skip_dirs): templates.append(path) - + return sorted(templates) -def check_inline_comments_in_expressions(template_content: str, template_path: Path) -> List[str]: +def check_inline_comments_in_expressions(template_content: str, template_path: Path) -> list[str]: """ Check for inline comments (#) within Jinja2 expressions. This is the error we just fixed in openssl.yml. """ errors = [] - + # Pattern to find Jinja2 expressions jinja_pattern = re.compile(r'\{\{.*?\}\}|\{%.*?%\}', re.DOTALL) - + for match in jinja_pattern.finditer(template_content): expression = match.group() lines = expression.split('\n') - + for i, line in enumerate(lines): # Check for # that's not in a string # Simple heuristic: if # appears after non-whitespace and not in quotes @@ -54,7 +52,7 @@ def check_inline_comments_in_expressions(template_content: str, template_path: P # Remove quoted strings to avoid false positives cleaned = re.sub(r'"[^"]*"', '', line) cleaned = re.sub(r"'[^']*'", '', cleaned) - + if '#' in cleaned: # Check if it's likely a comment (has text after it) hash_pos = cleaned.index('#') @@ -64,25 +62,25 @@ def check_inline_comments_in_expressions(template_content: str, template_path: P f"{template_path}:{line_num}: Inline comment (#) found in Jinja2 expression. " f"Move comments outside the expression." ) - + return errors -def check_undefined_variables(template_path: Path) -> List[str]: +def check_undefined_variables(template_path: Path) -> list[str]: """ Parse template and extract all undefined variables. This helps identify what variables need to be provided. """ errors = [] - + try: - with open(template_path, 'r') as f: + with open(template_path) as f: template_content = f.read() - + env = Environment(undefined=StrictUndefined) ast = env.parse(template_content) undefined_vars = meta.find_undeclared_variables(ast) - + # Common Ansible variables that are always available ansible_builtins = { 'ansible_default_ipv4', 'ansible_default_ipv6', 'ansible_hostname', @@ -91,30 +89,30 @@ def check_undefined_variables(template_path: Path) -> List[str]: 'play_hosts', 'ansible_version', 'ansible_user', 'ansible_host', 'item', 'ansible_loop', 'ansible_index', 'lookup' } - + # Filter out known Ansible variables unknown_vars = undefined_vars - ansible_builtins - + # Only report if there are truly unknown variables if unknown_vars and len(unknown_vars) < 20: # Avoid noise from templates with many vars errors.append( f"{template_path}: Uses undefined variables: {', '.join(sorted(unknown_vars))}" ) - - except Exception as e: + + except Exception: # Don't report parse errors here, they're handled elsewhere pass - + return errors -def validate_template_syntax(template_path: Path) -> Tuple[bool, List[str]]: +def validate_template_syntax(template_path: Path) -> tuple[bool, list[str]]: """ Validate a single template for syntax errors. Returns (is_valid, list_of_errors) """ errors = [] - + # Skip full parsing for templates that use Ansible-specific features heavily # We still check for inline comments but skip full template parsing ansible_specific_templates = { @@ -122,30 +120,30 @@ def validate_template_syntax(template_path: Path) -> Tuple[bool, List[str]]: 'mobileconfig.j2', # Uses |to_uuid filter and complex item structures 'vpn-dict.j2', # Uses |to_uuid filter } - + if template_path.name in ansible_specific_templates: # Still check for inline comments but skip full parsing try: - with open(template_path, 'r') as f: + with open(template_path) as f: template_content = f.read() errors.extend(check_inline_comments_in_expressions(template_content, template_path)) except Exception: pass return len(errors) == 0, errors - + try: - with open(template_path, 'r') as f: + with open(template_path) as f: template_content = f.read() - + # Check for inline comments first (our custom check) errors.extend(check_inline_comments_in_expressions(template_content, template_path)) - + # Try to parse the template env = Environment( loader=FileSystemLoader(template_path.parent), undefined=StrictUndefined ) - + # Add mock Ansible filters to avoid syntax errors env.filters['bool'] = lambda x: x env.filters['to_uuid'] = lambda x: x @@ -153,89 +151,89 @@ def validate_template_syntax(template_path: Path) -> Tuple[bool, List[str]]: env.filters['b64decode'] = lambda x: x env.filters['regex_replace'] = lambda x, y, z: x env.filters['default'] = lambda x, d: x if x else d - + # This will raise TemplateSyntaxError if there's a syntax problem env.get_template(template_path.name) - + # Also check for undefined variables (informational) # Commenting out for now as it's too noisy, but useful for debugging # errors.extend(check_undefined_variables(template_path)) - + except TemplateSyntaxError as e: errors.append(f"{template_path}:{e.lineno}: Syntax error: {e.message}") except UnicodeDecodeError: errors.append(f"{template_path}: Unable to decode file (not UTF-8)") except Exception as e: errors.append(f"{template_path}: Error: {str(e)}") - + return len(errors) == 0, errors -def check_common_antipatterns(template_path: Path) -> List[str]: +def check_common_antipatterns(template_path: Path) -> list[str]: """Check for common Jinja2 anti-patterns.""" warnings = [] - + try: - with open(template_path, 'r') as f: + with open(template_path) as f: content = f.read() - + # Check for missing spaces around filters if re.search(r'\{\{[^}]+\|[^ ]', content): warnings.append(f"{template_path}: Missing space after filter pipe (|)") - + # Check for deprecated 'when' in Jinja2 (should use if) if re.search(r'\{%\s*when\s+', content): warnings.append(f"{template_path}: Use 'if' instead of 'when' in Jinja2 templates") - + # Check for extremely long expressions (harder to debug) for match in re.finditer(r'\{\{(.+?)\}\}', content, re.DOTALL): if len(match.group(1)) > 200: line_num = content[:match.start()].count('\n') + 1 warnings.append(f"{template_path}:{line_num}: Very long expression (>200 chars), consider breaking it up") - + except Exception: pass # Ignore errors in anti-pattern checking - + return warnings def main(): """Main validation function.""" print("šŸ” Validating Jinja2 templates in Algo...\n") - + # Find all templates templates = find_jinja2_templates() print(f"Found {len(templates)} Jinja2 templates\n") - + all_errors = [] all_warnings = [] valid_count = 0 - + # Validate each template for template in templates: is_valid, errors = validate_template_syntax(template) warnings = check_common_antipatterns(template) - + if is_valid: valid_count += 1 else: all_errors.extend(errors) - + all_warnings.extend(warnings) - + # Report results print(f"āœ… {valid_count}/{len(templates)} templates have valid syntax") - + if all_errors: print(f"\nāŒ Found {len(all_errors)} errors:\n") for error in all_errors: print(f" ERROR: {error}") - + if all_warnings: print(f"\nāš ļø Found {len(all_warnings)} warnings:\n") for warning in all_warnings: print(f" WARN: {warning}") - + if all_errors: print("\nāŒ Template validation FAILED") return 1 @@ -245,4 +243,4 @@ def main(): if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main())