diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index 29fd4188..6911a51a 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1,3 +1,4 @@ +--- # These are supported funding model platforms github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] diff --git a/.github/dependabot.yml b/.github/dependabot.yml index e5be1c71..70091b76 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -1,3 +1,4 @@ +--- version: 2 updates: # Maintain dependencies for GitHub Actions diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index a02949e4..1ec09175 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -1,6 +1,7 @@ +--- name: Integration Tests -on: +'on': pull_request: types: [opened, synchronize, reopened] paths: @@ -28,12 +29,12 @@ jobs: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: persist-credentials: false - + - uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: '3.11' cache: 'pip' - + - name: Install system dependencies run: | sudo apt-get update @@ -46,12 +47,12 @@ jobs: qrencode \ openssl \ linux-headers-$(uname -r) - + - name: Install Python dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - + - name: Create test configuration run: | cat > integration-test.cfg << EOF @@ -87,7 +88,7 @@ jobs: endpoint: 127.0.0.1 ssh_port: 4160 EOF - + - name: Run Algo deployment run: | sudo ansible-playbook main.yml \ @@ -96,7 +97,7 @@ jobs: -e @integration-test.cfg \ -e "provider=local" \ -vv - + - name: Verify services are running run: | # Check WireGuard @@ -109,7 +110,7 @@ jobs: fi echo "✓ WireGuard is running" fi - + # Check StrongSwan if [[ "${{ matrix.vpn_type }}" == "ipsec" || "${{ matrix.vpn_type }}" == "both" ]]; then echo "Checking StrongSwan..." @@ -120,18 +121,18 @@ jobs: fi echo "✓ StrongSwan is running" fi - + # Check dnsmasq if ! sudo systemctl is-active --quiet dnsmasq; then echo "⚠️ dnsmasq not running (may be expected)" else echo "✓ dnsmasq is running" fi - + - name: Verify generated configs run: | echo "Checking generated configuration files..." - + # WireGuard configs if [[ "${{ matrix.vpn_type }}" == "wireguard" || "${{ matrix.vpn_type }}" == "both" ]]; then for user in alice bob; do @@ -146,7 +147,7 @@ jobs: done echo "✓ All WireGuard configs generated" fi - + # IPsec configs if [[ "${{ matrix.vpn_type }}" == "ipsec" || "${{ matrix.vpn_type }}" == "both" ]]; then for user in alice bob; do @@ -161,28 +162,28 @@ jobs: done echo "✓ All IPsec configs generated" fi - + - name: Test VPN connectivity run: | echo "Testing basic VPN connectivity..." - + # Test WireGuard if [[ "${{ matrix.vpn_type }}" == "wireguard" || "${{ matrix.vpn_type }}" == "both" ]]; then # Get server's WireGuard public key SERVER_PUBKEY=$(sudo wg show wg0 public-key) echo "Server public key: $SERVER_PUBKEY" - + # Check if interface has peers PEER_COUNT=$(sudo wg show wg0 peers | wc -l) echo "✓ WireGuard has $PEER_COUNT peer(s) configured" fi - + # Test StrongSwan if [[ "${{ matrix.vpn_type }}" == "ipsec" || "${{ matrix.vpn_type }}" == "both" ]]; then # Check IPsec policies sudo ipsec statusall | grep -E "INSTALLED|ESTABLISHED" || echo "No active IPsec connections (expected)" fi - + - name: Upload configs as artifacts if: always() uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 @@ -190,7 +191,7 @@ jobs: name: vpn-configs-${{ matrix.vpn_type }}-${{ github.run_id }} path: configs/ retention-days: 7 - + - name: Upload logs on failure if: failure() run: | @@ -211,21 +212,21 @@ jobs: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: persist-credentials: false - + - name: Build Algo Docker image run: | docker build -t algo:ci-test . - + - name: Test Docker image run: | # Test that the image can run and show help docker run --rm --entrypoint /bin/sh algo:ci-test -c "cd /algo && ./algo --help" || true - + # Test that required binaries exist in the virtual environment docker run --rm --entrypoint /bin/sh algo:ci-test -c "cd /algo && source .env/bin/activate && which ansible" docker run --rm --entrypoint /bin/sh algo:ci-test -c "which python3" docker run --rm --entrypoint /bin/sh algo:ci-test -c "which rsync" - + - name: Test Docker config validation run: | # Create a minimal valid config @@ -242,9 +243,9 @@ jobs: dns_encryption: true algo_provider: ec2 EOF - + # Test that config is readable docker run --rm --entrypoint cat -v $(pwd)/test-data:/data algo:ci-test /data/config.cfg - + echo "✓ Docker image built and basic tests passed" diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 3a953050..7be96145 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -1,6 +1,7 @@ +--- name: Lint -on: [push, pull_request] +'on': [push, pull_request] permissions: contents: read diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cd46c569..29bbc703 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -183,6 +183,6 @@ jobs: --diff \ -vv \ --skip-tags "facts,tests,local,update-alternatives,cloud_api" || true - + # The || true is because check mode will fail on some tasks # but we're looking for syntax/undefined variable errors diff --git a/.github/workflows/smart-tests.yml b/.github/workflows/smart-tests.yml index 7e3c2bd1..dd9ea33b 100644 --- a/.github/workflows/smart-tests.yml +++ b/.github/workflows/smart-tests.yml @@ -1,6 +1,7 @@ +--- name: Smart Test Selection -on: +'on': pull_request: types: [opened, synchronize, reopened] @@ -25,7 +26,7 @@ jobs: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: persist-credentials: false - + - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2.11.1 id: filter with: @@ -118,7 +119,7 @@ jobs: run: | # Always run basic sanity python tests/unit/test_basic_sanity.py - + # Run other tests based on what changed if [[ "${{ needs.changed-files.outputs.run_basic_tests }}" == "true" ]]; then python tests/unit/test_config_validation.py @@ -127,7 +128,7 @@ jobs: python tests/unit/test_cloud_provider_configs.py python tests/unit/test_generated_configs.py fi - + if [[ "${{ needs.changed-files.outputs.run_template_tests }}" == "true" ]]; then python tests/unit/test_template_rendering.py fi @@ -208,7 +209,7 @@ jobs: server: test-server endpoint: 10.0.0.1 EOF - + ansible-playbook main.yml \ -i "localhost," \ -c local \ @@ -251,7 +252,7 @@ jobs: ruff check . || true yamllint . || true ansible-lint || true - + # Check shell scripts if any changed if git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.sha }} | grep -q '\.sh$'; then find . -name "*.sh" -type f -exec shellcheck {} + || true @@ -290,4 +291,4 @@ jobs: run: | echo "Integration tests should be triggered for this PR" echo "Changed files indicate potential breaking changes" - echo "Run workflow manually: .github/workflows/integration-tests.yml" \ No newline at end of file + echo "Run workflow manually: .github/workflows/integration-tests.yml" diff --git a/.github/workflows/test-effectiveness.yml b/.github/workflows/test-effectiveness.yml index 7f860fbf..8e7c8025 100644 --- a/.github/workflows/test-effectiveness.yml +++ b/.github/workflows/test-effectiveness.yml @@ -1,6 +1,7 @@ +--- name: Test Effectiveness Tracking -on: +'on': schedule: - cron: '0 0 * * 0' # Weekly on Sunday workflow_dispatch: # Allow manual runs @@ -19,23 +20,23 @@ jobs: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: persist-credentials: true - + - uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: '3.11' - + - name: Analyze test effectiveness env: GH_TOKEN: ${{ github.token }} run: | python scripts/track-test-effectiveness.py - + - name: Upload metrics uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 with: name: test-effectiveness-metrics path: .metrics/ - + - name: Create issue if tests are ineffective env: GH_TOKEN: ${{ github.token }} @@ -44,7 +45,7 @@ jobs: if grep -q "⚠️" .metrics/test-effectiveness-report.md; then # Check if issue already exists existing=$(gh issue list --label "test-effectiveness" --state open --json number --jq '.[0].number') - + if [ -z "$existing" ]; then gh issue create \ --title "Test Effectiveness Review Needed" \ @@ -55,14 +56,14 @@ jobs: gh issue comment $existing --body-file .metrics/test-effectiveness-report.md fi fi - + - name: Commit metrics if changed run: | git config --local user.email "github-actions[bot]@users.noreply.github.com" git config --local user.name "github-actions[bot]" - + if [[ -n $(git status -s .metrics/) ]]; then git add .metrics/ git commit -m "chore: Update test effectiveness metrics [skip ci]" git push - fi \ No newline at end of file + fi diff --git a/algo-docker.sh b/algo-docker.sh index 62c6ccff..1c477d73 100644 --- a/algo-docker.sh +++ b/algo-docker.sh @@ -11,9 +11,9 @@ usage() { retcode="${1:-0}" echo "To run algo from Docker:" echo "" - echo "docker run --cap-drop=all -it -v :"${DATA_DIR}" ghcr.io/trailofbits/algo:latest" + echo "docker run --cap-drop=all -it -v :${DATA_DIR} ghcr.io/trailofbits/algo:latest" echo "" - exit ${retcode} + exit "${retcode}" } if [ ! -f "${DATA_DIR}"/config.cfg ] ; then @@ -25,7 +25,7 @@ fi if [ ! -e /dev/console ] ; then echo "Looks like you're trying to run this container without a TTY." - echo "If you don't pass `-t`, you can't interact with the algo script." + echo "If you don't pass -t, you can't interact with the algo script." echo "" usage -1 fi @@ -41,4 +41,4 @@ test -d "${DATA_DIR}"/configs && rsync -qLktr --delete "${DATA_DIR}"/configs "${ retcode=${?} rsync -qLktr --delete "${ALGO_DIR}"/configs "${DATA_DIR}"/ -exit ${retcode} +exit "${retcode}" diff --git a/library/digital_ocean_floating_ip.py b/library/digital_ocean_floating_ip.py index 8dd949e7..89ed4e56 100644 --- a/library/digital_ocean_floating_ip.py +++ b/library/digital_ocean_floating_ip.py @@ -138,9 +138,8 @@ def wait_action(module, rest, ip, action_id, timeout=10): end_time = time.time() + 10 while time.time() < end_time: response = rest.get(f'floating_ips/{ip}/actions/{action_id}') - status_code = response.status_code + # status_code = response.status_code # TODO: check status_code == 200? status = response.json['action']['status'] - # TODO: check status_code == 200? if status == 'completed': return True elif status == 'errored': @@ -150,7 +149,7 @@ def wait_action(module, rest, ip, action_id, timeout=10): def core(module): - api_token = module.params['oauth_token'] + # api_token = module.params['oauth_token'] # unused for now state = module.params['state'] ip = module.params['ip'] droplet_id = module.params['droplet_id'] @@ -185,7 +184,7 @@ def get_floating_ip_details(module, rest): if status_code == 200: return json_data['floating_ip'] else: - module.fail_json(msg="Error assigning floating ip [{0}: {1}]".format( + module.fail_json(msg="Error assigning floating ip [{}: {}]".format( status_code, json_data["message"]), region=module.params['region']) @@ -205,7 +204,7 @@ def assign_floating_id_to_droplet(module, rest): module.exit_json(changed=True, data=json_data) else: - module.fail_json(msg="Error creating floating ip [{0}: {1}]".format( + module.fail_json(msg="Error creating floating ip [{}: {}]".format( status_code, json_data["message"]), region=module.params['region']) @@ -247,26 +246,26 @@ def create_floating_ips(module, rest): if status_code == 202: module.exit_json(changed=True, data=json_data) else: - module.fail_json(msg="Error creating floating ip [{0}: {1}]".format( + module.fail_json(msg="Error creating floating ip [{}: {}]".format( status_code, json_data["message"]), region=module.params['region']) def main(): module = AnsibleModule( - argument_spec=dict( - state=dict(choices=['present', 'absent'], default='present'), - ip=dict(aliases=['id'], required=False), - region=dict(required=False), - droplet_id=dict(required=False, type='int'), - oauth_token=dict( - no_log=True, + argument_spec={ + 'state': {'choices': ['present', 'absent'], 'default': 'present'}, + 'ip': {'aliases': ['id'], 'required': False}, + 'region': {'required': False}, + 'droplet_id': {'required': False, 'type': 'int'}, + 'oauth_token': { + 'no_log': True, # Support environment variable for DigitalOcean OAuth Token - fallback=(env_fallback, ['DO_API_TOKEN', 'DO_API_KEY', 'DO_OAUTH_TOKEN']), - required=True, - ), - validate_certs=dict(type='bool', default=True), - timeout=dict(type='int', default=30), - ), + 'fallback': (env_fallback, ['DO_API_TOKEN', 'DO_API_KEY', 'DO_OAUTH_TOKEN']), + 'required': True, + }, + 'validate_certs': {'type': 'bool', 'default': True}, + 'timeout': {'type': 'int', 'default': 30}, + }, required_if=[ ('state', 'delete', ['ip']) ], diff --git a/library/gcp_compute_location_info.py b/library/gcp_compute_location_info.py index 8f3c03e3..8daa1b79 100644 --- a/library/gcp_compute_location_info.py +++ b/library/gcp_compute_location_info.py @@ -2,26 +2,23 @@ +import json + +from ansible.module_utils.gcp_utils import GcpModule, GcpSession, navigate_hash + ################################################################################ # Documentation ################################################################################ ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ["preview"], 'supported_by': 'community'} -################################################################################ -# Imports -################################################################################ -import json - -from ansible.module_utils.gcp_utils import GcpModule, GcpSession, navigate_hash - ################################################################################ # Main ################################################################################ def main(): - module = GcpModule(argument_spec=dict(filters=dict(type='list', elements='str'), scope=dict(required=True, type='str'))) + module = GcpModule(argument_spec={'filters': {'type': 'list', 'elements': 'str'}, 'scope': {'required': True, 'type': 'str'}}) if module._name == 'gcp_compute_image_facts': module.deprecate("The 'gcp_compute_image_facts' module has been renamed to 'gcp_compute_regions_info'", version='2.13') @@ -59,7 +56,7 @@ def query_options(filters): for f in filters: # For multiple queries, all queries should have () if f[0] != '(' and f[-1] != ')': - queries.append("(%s)" % ''.join(f)) + queries.append("({})".format(''.join(f))) else: queries.append(f) @@ -79,7 +76,7 @@ def return_if_object(module, response): module.raise_for_status(response) result = response.json() except getattr(json.decoder, 'JSONDecodeError', ValueError) as inst: - module.fail_json(msg="Invalid JSON response with error: %s" % inst) + module.fail_json(msg="Invalid JSON response with error: {}".format(inst)) if navigate_hash(result, ['error', 'errors']): module.fail_json(msg=navigate_hash(result, ['error', 'errors'])) diff --git a/pyproject.toml b/pyproject.toml index 46c8400c..ca486cf3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,6 +2,8 @@ # Ruff configuration target-version = "py310" line-length = 120 + +[tool.ruff.lint] select = [ "E", # pycodestyle errors "W", # pycodestyle warnings diff --git a/requirements.txt b/requirements.txt index f21152c5..b7cb6bf2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -ansible==9.1.0 -jinja2~=3.1.3 -netaddr +ansible==9.2.0 +jinja2~=3.1.6 +netaddr==1.3.0 diff --git a/roles/wireguard/files/wireguard.sh b/roles/wireguard/files/wireguard.sh index efcde0e3..aefd44c5 100644 --- a/roles/wireguard/files/wireguard.sh +++ b/roles/wireguard/files/wireguard.sh @@ -17,24 +17,28 @@ status_cmd=wg_status pidfile="/var/run/$name.pid" load_rc_config "$name" -: ${wg_enable="NO"} -: ${wg_interface="wg0"} +: "${wg_enable=NO}" +: "${wg_interface=wg0}" wg_up() { echo "Starting WireGuard..." - /usr/sbin/daemon -cS -p ${pidfile} ${command} up ${wg_interface} + /usr/sbin/daemon -cS -p "${pidfile}" "${command}" up "${wg_interface}" } wg_down() { echo "Stopping WireGuard..." - ${command} down ${wg_interface} + "${command}" down "${wg_interface}" } wg_status () { not_running () { echo "WireGuard is not running on $wg_interface" && exit 1 } - /usr/local/bin/wg show wg0 && echo "WireGuard is running on $wg_interface" || not_running + if /usr/local/bin/wg show wg0; then + echo "WireGuard is running on $wg_interface" + else + not_running + fi } run_rc_command "$1" diff --git a/scripts/track-test-effectiveness.py b/scripts/track-test-effectiveness.py index e055776d..d6d68de5 100755 --- a/scripts/track-test-effectiveness.py +++ b/scripts/track-test-effectiveness.py @@ -5,9 +5,8 @@ This helps identify which tests actually catch bugs vs just failing randomly """ import json import subprocess -import sys -from datetime import datetime, timedelta from collections import defaultdict +from datetime import datetime, timedelta from pathlib import Path @@ -24,26 +23,26 @@ def get_github_api_data(endpoint): def analyze_workflow_runs(repo_owner, repo_name, days_back=30): """Analyze workflow runs to find test failures""" since = (datetime.now() - timedelta(days=days_back)).isoformat() - + # Get workflow runs runs = get_github_api_data( f'/repos/{repo_owner}/{repo_name}/actions/runs?created=>{since}&status=failure' ) - + if not runs: return {} - + test_failures = defaultdict(list) - + for run in runs.get('workflow_runs', []): # Get jobs for this run jobs = get_github_api_data( f'/repos/{repo_owner}/{repo_name}/actions/runs/{run["id"]}/jobs' ) - + if not jobs: continue - + for job in jobs.get('jobs', []): if job['conclusion'] == 'failure': # Try to extract which test failed from logs @@ -60,7 +59,7 @@ def analyze_workflow_runs(repo_owner, repo_name, days_back=30): 'commit': run['head_sha'][:7], 'pr': extract_pr_number(run) }) - + return test_failures @@ -87,7 +86,7 @@ def extract_pr_number(run): def correlate_with_issues(repo_owner, repo_name, test_failures): """Correlate test failures with issues/PRs that fixed them""" correlations = defaultdict(lambda: {'caught_bugs': 0, 'false_positives': 0}) - + for test_name, failures in test_failures.items(): for failure in failures: if failure['pr']: @@ -95,21 +94,21 @@ def correlate_with_issues(repo_owner, repo_name, test_failures): pr = get_github_api_data( f'/repos/{repo_owner}/{repo_name}/pulls/{failure["pr"]}' ) - + if pr and pr.get('merged'): # Check PR title/body for bug indicators title = pr.get('title', '').lower() body = pr.get('body', '').lower() - + bug_keywords = ['fix', 'bug', 'error', 'issue', 'broken', 'fail'] - is_bug_fix = any(keyword in title or keyword in body + is_bug_fix = any(keyword in title or keyword in body for keyword in bug_keywords) - + if is_bug_fix: correlations[test_name]['caught_bugs'] += 1 else: correlations[test_name]['false_positives'] += 1 - + return correlations @@ -118,48 +117,48 @@ def generate_effectiveness_report(test_failures, correlations): report = [] report.append("# Test Effectiveness Report") report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - + # Summary report.append("## Summary") total_failures = sum(len(f) for f in test_failures.values()) report.append(f"- Total test failures: {total_failures}") report.append(f"- Unique tests that failed: {len(test_failures)}") report.append("") - + # Effectiveness scores report.append("## Test Effectiveness Scores") report.append("| Test | Failures | Caught Bugs | False Positives | Effectiveness |") report.append("|------|----------|-------------|-----------------|---------------|") - + scores = [] for test_name, failures in test_failures.items(): failure_count = len(failures) caught = correlations[test_name]['caught_bugs'] false_pos = correlations[test_name]['false_positives'] - + # Calculate effectiveness (bugs caught / total failures) if failure_count > 0: effectiveness = caught / failure_count else: effectiveness = 0 - + scores.append((test_name, failure_count, caught, false_pos, effectiveness)) - + # Sort by effectiveness scores.sort(key=lambda x: x[4], reverse=True) - + for test_name, failures, caught, false_pos, effectiveness in scores: report.append(f"| {test_name} | {failures} | {caught} | {false_pos} | {effectiveness:.1%} |") - + # Recommendations report.append("\n## Recommendations") - + for test_name, failures, caught, false_pos, effectiveness in scores: if effectiveness < 0.2 and failures > 5: report.append(f"- ⚠️ Consider improving or removing `{test_name}` (only {effectiveness:.0%} effective)") elif effectiveness > 0.8: report.append(f"- ✅ `{test_name}` is highly effective ({effectiveness:.0%})") - + return '\n'.join(report) @@ -167,14 +166,14 @@ def save_metrics(test_failures, correlations): """Save metrics to JSON for historical tracking""" metrics_file = Path('.metrics/test-effectiveness.json') metrics_file.parent.mkdir(exist_ok=True) - + # Load existing metrics if metrics_file.exists(): with open(metrics_file) as f: historical = json.load(f) else: historical = [] - + # Add current metrics current = { 'date': datetime.now().isoformat(), @@ -191,16 +190,16 @@ def save_metrics(test_failures, correlations): for test, data in correlations.items() } } - + historical.append(current) - + # Keep last 12 months of data cutoff = datetime.now() - timedelta(days=365) historical = [ - h for h in historical + h for h in historical if datetime.fromisoformat(h['date']) > cutoff ] - + with open(metrics_file, 'w') as f: json.dump(historical, f, indent=2) @@ -209,27 +208,27 @@ if __name__ == '__main__': # Configure these for your repo REPO_OWNER = 'trailofbits' REPO_NAME = 'algo' - + print("Analyzing test effectiveness...") - + # Analyze last 30 days of CI runs test_failures = analyze_workflow_runs(REPO_OWNER, REPO_NAME, days_back=30) - + # Correlate with issues/PRs correlations = correlate_with_issues(REPO_OWNER, REPO_NAME, test_failures) - + # Generate report report = generate_effectiveness_report(test_failures, correlations) - + print("\n" + report) - + # Save report report_file = Path('.metrics/test-effectiveness-report.md') report_file.parent.mkdir(exist_ok=True) with open(report_file, 'w') as f: f.write(report) print(f"\nReport saved to: {report_file}") - + # Save metrics for tracking save_metrics(test_failures, correlations) - print("Metrics saved to: .metrics/test-effectiveness.json") \ No newline at end of file + print("Metrics saved to: .metrics/test-effectiveness.json") diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py index daa424e7..e64bf23a 100644 --- a/tests/fixtures/__init__.py +++ b/tests/fixtures/__init__.py @@ -1,8 +1,9 @@ """Test fixtures for Algo unit tests""" import os -import yaml from pathlib import Path +import yaml + def load_test_variables(): """Load test variables from YAML fixture""" @@ -16,4 +17,4 @@ def get_test_config(overrides=None): config = load_test_variables() if overrides: config.update(overrides) - return config \ No newline at end of file + return config diff --git a/tests/fixtures/test_variables.yml b/tests/fixtures/test_variables.yml index aef84c5c..0af0a1fc 100644 --- a/tests/fixtures/test_variables.yml +++ b/tests/fixtures/test_variables.yml @@ -115,4 +115,4 @@ hostvars: groups: vpn-host: - localhost -omit: OMIT_PLACEHOLDER \ No newline at end of file +omit: OMIT_PLACEHOLDER diff --git a/tests/unit/test_cloud_provider_configs.py b/tests/unit/test_cloud_provider_configs.py index dabd1968..8cac5274 100644 --- a/tests/unit/test_cloud_provider_configs.py +++ b/tests/unit/test_cloud_provider_configs.py @@ -12,7 +12,7 @@ def test_hetzner_server_types(): # Hetzner deprecated cx11 and cpx11 - smallest is now cx22 deprecated_types = ['cx11', 'cpx11'] current_types = ['cx22', 'cpx22', 'cx32', 'cpx32', 'cx42', 'cpx42'] - + # Test that we're not using deprecated types in any configs test_config = { 'cloud_providers': { @@ -23,13 +23,13 @@ def test_hetzner_server_types(): } } } - + hetzner = test_config['cloud_providers']['hetzner'] assert hetzner['size'] not in deprecated_types, \ f"Using deprecated Hetzner type: {hetzner['size']}" assert hetzner['size'] in current_types, \ f"Unknown Hetzner type: {hetzner['size']}" - + print("✓ Hetzner server types test passed") @@ -38,11 +38,11 @@ def test_digitalocean_instance_types(): # DigitalOcean uses format like s-1vcpu-1gb valid_sizes = ['s-1vcpu-1gb', 's-2vcpu-2gb', 's-2vcpu-4gb', 's-4vcpu-8gb'] deprecated_sizes = ['512mb', '1gb', '2gb'] # Old naming scheme - + test_size = 's-2vcpu-2gb' assert test_size in valid_sizes, f"Invalid DO size: {test_size}" assert test_size not in deprecated_sizes, f"Using deprecated DO size: {test_size}" - + print("✓ DigitalOcean instance types test passed") @@ -51,11 +51,11 @@ def test_aws_instance_types(): # Common valid instance types valid_types = ['t2.micro', 't3.micro', 't3.small', 't3.medium', 'm5.large'] deprecated_types = ['t1.micro', 'm1.small'] # Very old types - + test_type = 't3.micro' assert test_type in valid_types, f"Unknown EC2 type: {test_type}" assert test_type not in deprecated_types, f"Using deprecated EC2 type: {test_type}" - + print("✓ AWS instance types test passed") @@ -63,11 +63,11 @@ def test_vultr_instance_types(): """Test Vultr instance type naming""" # Vultr uses format like vc2-1c-1gb valid_types = ['vc2-1c-1gb', 'vc2-2c-4gb', 'vhf-1c-1gb', 'vhf-2c-2gb'] - + test_type = 'vc2-1c-1gb' assert any(test_type.startswith(prefix) for prefix in ['vc2-', 'vhf-', 'vhp-']), \ f"Invalid Vultr type format: {test_type}" - + print("✓ Vultr instance types test passed") @@ -78,7 +78,7 @@ if __name__ == "__main__": test_aws_instance_types, test_vultr_instance_types, ] - + failed = 0 for test in tests: try: @@ -89,9 +89,9 @@ if __name__ == "__main__": except Exception as e: print(f"✗ {test.__name__} error: {e}") failed += 1 - + if failed > 0: print(f"\n{failed} tests failed") sys.exit(1) else: - print(f"\nAll {len(tests)} tests passed!") \ No newline at end of file + print(f"\nAll {len(tests)} tests passed!") diff --git a/tests/unit/test_docker_localhost_deployment.py b/tests/unit/test_docker_localhost_deployment.py index ebd8e868..a01e6b6b 100755 --- a/tests/unit/test_docker_localhost_deployment.py +++ b/tests/unit/test_docker_localhost_deployment.py @@ -4,11 +4,8 @@ Simplified Docker-based localhost deployment tests Verifies services can start and config files exist in expected locations """ import os -import sys import subprocess -import time -import tempfile -from pathlib import Path +import sys def check_docker_available(): @@ -32,21 +29,21 @@ ListenPort = 51820 PublicKey = lIiWMxCWtXG5hqZECMXm7mA/4pNKKqtJIBZ5Fc1SeHg= AllowedIPs = 10.19.49.2/32,fd9d:bc11:4020::2/128 """ - + # Just validate the format required_sections = ['[Interface]', '[Peer]'] required_fields = ['PrivateKey', 'Address', 'PublicKey', 'AllowedIPs'] - + for section in required_sections: if section not in config: print(f"✗ Missing {section} section") return False - + for field in required_fields: if field not in config: print(f"✗ Missing {field} field") return False - + print("✓ WireGuard config format is valid") return True @@ -69,20 +66,20 @@ conn ikev2-pubkey right=%any rightauth=pubkey """ - + # Validate format if 'config setup' not in config: print("✗ Missing 'config setup' section") return False - + if 'conn %default' not in config: print("✗ Missing 'conn %default' section") return False - + if 'keyexchange=ikev2' not in config: print("✗ Missing IKEv2 configuration") return False - + print("✓ StrongSwan config format is valid") return True @@ -93,27 +90,27 @@ def test_docker_algo_image(): if not os.path.exists('Dockerfile'): print("✗ Dockerfile not found") return False - + # Read Dockerfile and validate basic structure - with open('Dockerfile', 'r') as f: + with open('Dockerfile') as f: dockerfile_content = f.read() - + required_elements = [ 'FROM', # Base image 'RUN', # Build commands 'COPY', # Copy Algo files 'python' # Python dependency ] - + missing = [] for element in required_elements: if element not in dockerfile_content: missing.append(element) - + if missing: print(f"✗ Dockerfile missing elements: {', '.join(missing)}") return False - + print("✓ Dockerfile structure is valid") return True @@ -129,7 +126,7 @@ def test_localhost_deployment_requirements(): 'Requirements file exists': os.path.exists('requirements.txt'), 'Config template exists': os.path.exists('config.cfg.example') or os.path.exists('config.cfg'), } - + all_met = True for req, met in requirements.items(): if met: @@ -137,7 +134,7 @@ def test_localhost_deployment_requirements(): else: print(f"✗ {req}") all_met = False - + return all_met @@ -148,19 +145,19 @@ def test_localhost_deployment_requirements(): if __name__ == "__main__": print("Running Docker localhost deployment tests...") print("=" * 50) - + # First check if Docker is available docker_available = check_docker_available() if not docker_available: print("⚠ Docker not available - some tests will be limited") - + tests = [ test_wireguard_config_validation, test_strongswan_config_validation, test_docker_algo_image, test_localhost_deployment_requirements, ] - + failed = 0 for test in tests: print(f"\n{test.__name__}:") @@ -170,10 +167,10 @@ if __name__ == "__main__": except Exception as e: print(f"✗ {test.__name__} error: {e}") failed += 1 - + print("\n" + "=" * 50) if failed > 0: print(f"❌ {failed} tests failed") sys.exit(1) else: - print(f"✅ All {len(tests)} tests passed!") \ No newline at end of file + print(f"✅ All {len(tests)} tests passed!") diff --git a/tests/unit/test_generated_configs.py b/tests/unit/test_generated_configs.py index b45e5211..df805ca5 100644 --- a/tests/unit/test_generated_configs.py +++ b/tests/unit/test_generated_configs.py @@ -3,12 +3,9 @@ Test that generated configuration files have valid syntax This validates WireGuard, StrongSwan, SSH, and other configs """ -import os import re import subprocess import sys -import tempfile -from pathlib import Path def check_command_available(cmd): @@ -35,27 +32,27 @@ AllowedIPs = 0.0.0.0/0,::/0 Endpoint = 10.0.0.1:51820 PersistentKeepalive = 25 """ - + # Validate config structure errors = [] - + # Check for required sections if '[Interface]' not in sample_config: errors.append("Missing [Interface] section") if '[Peer]' not in sample_config: errors.append("Missing [Peer] section") - + # Validate Interface section interface_match = re.search(r'\[Interface\](.*?)\[Peer\]', sample_config, re.DOTALL) if interface_match: interface_section = interface_match.group(1) - + # Check required fields if not re.search(r'Address\s*=', interface_section): errors.append("Missing Address in Interface section") if not re.search(r'PrivateKey\s*=', interface_section): errors.append("Missing PrivateKey in Interface section") - + # Validate IP addresses address_match = re.search(r'Address\s*=\s*([^\n]+)', interface_section) if address_match: @@ -66,12 +63,12 @@ PersistentKeepalive = 25 if not re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', addr) and \ not re.match(r'^[0-9a-fA-F:]+/\d+$', addr): errors.append(f"Invalid IP address format: {addr}") - + # Validate Peer section peer_match = re.search(r'\[Peer\](.*)', sample_config, re.DOTALL) if peer_match: peer_section = peer_match.group(1) - + # Check required fields if not re.search(r'PublicKey\s*=', peer_section): errors.append("Missing PublicKey in Peer section") @@ -79,16 +76,16 @@ PersistentKeepalive = 25 errors.append("Missing AllowedIPs in Peer section") if not re.search(r'Endpoint\s*=', peer_section): errors.append("Missing Endpoint in Peer section") - + # Validate endpoint format endpoint_match = re.search(r'Endpoint\s*=\s*([^\n]+)', peer_section) if endpoint_match: endpoint = endpoint_match.group(1).strip() if not re.match(r'^[\d\.\:]+:\d+$', endpoint): errors.append(f"Invalid Endpoint format: {endpoint}") - + if errors: - print(f"✗ WireGuard config validation failed:") + print("✗ WireGuard config validation failed:") for error in errors: print(f" - {error}") assert False, "WireGuard config validation failed" @@ -131,28 +128,28 @@ conn ikev2-pubkey rightsourceip=10.19.49.0/24,fd9d:bc11:4020::/64 rightdns=1.1.1.1,1.0.0.1 """ - + errors = [] - + # Check for required sections if 'config setup' not in sample_config: errors.append("Missing 'config setup' section") if 'conn %default' not in sample_config: errors.append("Missing 'conn %default' section") - + # Validate connection settings conn_pattern = re.compile(r'conn\s+(\S+)') connections = conn_pattern.findall(sample_config) - + if len(connections) < 2: # Should have at least %default and one other errors.append("Not enough connection definitions") - + # Check for required parameters in connections required_params = ['keyexchange', 'left', 'right'] for param in required_params: if f'{param}=' not in sample_config: errors.append(f"Missing required parameter: {param}") - + # Validate IP subnet formats subnet_pattern = re.compile(r'(left|right)subnet\s*=\s*([^\n]+)') for match in subnet_pattern.finditer(sample_config): @@ -163,9 +160,9 @@ conn ikev2-pubkey if not re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', subnet) and \ not re.match(r'^[0-9a-fA-F:]+/\d+$', subnet): errors.append(f"Invalid subnet format: {subnet}") - + if errors: - print(f"✗ StrongSwan ipsec.conf validation failed:") + print("✗ StrongSwan ipsec.conf validation failed:") for error in errors: print(f" - {error}") assert False, "ipsec.conf validation failed" @@ -187,23 +184,23 @@ def test_ssh_config_syntax(): ServerAliveCountMax 3 LocalForward 1080 127.0.0.1:1080 """ - + errors = [] - + # Parse SSH config format lines = sample_config.strip().split('\n') current_host = None - + for line in lines: line = line.strip() if not line or line.startswith('#'): continue - + if line.startswith('Host '): current_host = line.split()[1] elif current_host and ' ' in line: key, value = line.split(None, 1) - + # Validate common SSH options if key == 'Port': try: @@ -212,18 +209,18 @@ def test_ssh_config_syntax(): errors.append(f"Invalid port number: {port}") except ValueError: errors.append(f"Port must be a number: {value}") - + elif key == 'LocalForward': # Format: LocalForward [bind_address:]port host:hostport parts = value.split() if len(parts) != 2: errors.append(f"Invalid LocalForward format: {value}") - + if not current_host: errors.append("No Host definition found") - + if errors: - print(f"✗ SSH config validation failed:") + print("✗ SSH config validation failed:") for error in errors: print(f" - {error}") assert False, "SSH config validation failed" @@ -255,43 +252,43 @@ COMMIT -A FORWARD -s 10.19.49.0/24 -j ACCEPT COMMIT """ - + errors = [] - + # Check table definitions tables = re.findall(r'\*(\w+)', sample_rules) if 'filter' not in tables: errors.append("Missing *filter table") if 'nat' not in tables: errors.append("Missing *nat table") - + # Check for COMMIT statements commit_count = sample_rules.count('COMMIT') if commit_count != len(tables): errors.append(f"Number of COMMIT statements ({commit_count}) doesn't match tables ({len(tables)})") - + # Validate chain policies chain_pattern = re.compile(r'^:(\w+)\s+(ACCEPT|DROP|REJECT)\s+\[\d+:\d+\]', re.MULTILINE) chains = chain_pattern.findall(sample_rules) - + required_chains = [('INPUT', 'DROP'), ('FORWARD', 'DROP'), ('OUTPUT', 'ACCEPT')] for chain, policy in required_chains: if not any(c[0] == chain for c in chains): errors.append(f"Missing required chain: {chain}") - + # Validate rule syntax rule_pattern = re.compile(r'^-[AI]\s+(\w+)', re.MULTILINE) rules = rule_pattern.findall(sample_rules) - + if len(rules) < 5: errors.append("Insufficient firewall rules") - + # Check for essential security rules if '-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT' not in sample_rules: errors.append("Missing stateful connection tracking rule") - + if errors: - print(f"✗ iptables rules validation failed:") + print("✗ iptables rules validation failed:") for error in errors: print(f" - {error}") assert False, "iptables rules validation failed" @@ -319,30 +316,30 @@ log-facility=/var/log/dnsmasq.log conf-dir=/etc/dnsmasq.d/,*.conf addn-hosts=/var/lib/algo/dns/adblock.hosts """ - + errors = [] - + # Parse config for line in sample_config.strip().split('\n'): line = line.strip() if not line or line.startswith('#'): continue - + # Most dnsmasq options are key=value or just key if '=' in line: key, value = line.split('=', 1) - + # Validate specific options if key == 'interface': if not re.match(r'^[a-zA-Z0-9\-_]+$', value): errors.append(f"Invalid interface name: {value}") - + elif key == 'server': # Basic IP validation if not re.match(r'^\d+\.\d+\.\d+\.\d+$', value) and \ not re.match(r'^[0-9a-fA-F:]+$', value): errors.append(f"Invalid DNS server IP: {value}") - + elif key == 'cache-size': try: size = int(value) @@ -350,15 +347,15 @@ addn-hosts=/var/lib/algo/dns/adblock.hosts errors.append(f"Invalid cache size: {size}") except ValueError: errors.append(f"Cache size must be a number: {value}") - + # Check for required options required = ['interface', 'server'] for req in required: if f'{req}=' not in sample_config: errors.append(f"Missing required option: {req}") - + if errors: - print(f"✗ dnsmasq config validation failed:") + print("✗ dnsmasq config validation failed:") for error in errors: print(f" - {error}") assert False, "dnsmasq config validation failed" @@ -374,7 +371,7 @@ if __name__ == "__main__": test_iptables_rules_syntax, test_dns_config_syntax, ] - + failed = 0 for test in tests: try: @@ -385,9 +382,9 @@ if __name__ == "__main__": except Exception as e: print(f"✗ {test.__name__} error: {e}") failed += 1 - + if failed > 0: print(f"\n{failed} tests failed") sys.exit(1) else: - print(f"\nAll {len(tests)} config syntax tests passed!") \ No newline at end of file + print(f"\nAll {len(tests)} config syntax tests passed!") diff --git a/tests/unit/test_openssl_compatibility.py b/tests/unit/test_openssl_compatibility.py index 4518ce65..5bb991b0 100644 --- a/tests/unit/test_openssl_compatibility.py +++ b/tests/unit/test_openssl_compatibility.py @@ -96,4 +96,4 @@ if __name__ == "__main__": print(f"\n{failed} tests failed") sys.exit(1) else: - print(f"\nAll {len(tests)} tests passed!") \ No newline at end of file + print(f"\nAll {len(tests)} tests passed!") diff --git a/tests/unit/test_template_rendering.py b/tests/unit/test_template_rendering.py index e0b4bfa6..3368f5e8 100644 --- a/tests/unit/test_template_rendering.py +++ b/tests/unit/test_template_rendering.py @@ -5,11 +5,9 @@ This catches undefined variables, syntax errors, and logic bugs """ import os import sys -import tempfile from pathlib import Path -import yaml -from jinja2 import Environment, FileSystemLoader, StrictUndefined, UndefinedError, TemplateSyntaxError +from jinja2 import Environment, FileSystemLoader, StrictUndefined, TemplateSyntaxError, UndefinedError # Add parent directory to path for fixtures sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -57,40 +55,40 @@ def find_templates(): def test_template_syntax(): """Test that all templates have valid Jinja2 syntax""" templates = find_templates() - + # Skip some paths that aren't real templates skip_paths = ['.git/', 'venv/', '.env/', 'configs/'] - + # Skip templates that use Ansible-specific filters skip_templates = ['vpn-dict.j2', 'mobileconfig.j2', 'dnscrypt-proxy.toml.j2'] - + errors = [] skipped = 0 for template_path in templates: # Skip unwanted paths if any(skip in str(template_path) for skip in skip_paths): continue - + # Skip templates with Ansible-specific features if any(skip in str(template_path) for skip in skip_templates): skipped += 1 continue - + try: template_dir = template_path.parent env = Environment( loader=FileSystemLoader(template_dir), undefined=StrictUndefined ) - + # Just try to load the template - this checks syntax template = env.get_template(template_path.name) - + except TemplateSyntaxError as e: errors.append(f"{template_path}: Syntax error - {e}") except Exception as e: errors.append(f"{template_path}: Error loading - {e}") - + if errors: print(f"✗ Template syntax check failed with {len(errors)} errors:") for error in errors[:10]: # Show first 10 errors @@ -113,47 +111,47 @@ def test_critical_templates(): 'roles/common/templates/rules.v4.j2', 'roles/common/templates/rules.v6.j2', ] - + test_vars = get_test_variables() errors = [] - + for template_path in critical_templates: if not os.path.exists(template_path): continue # Skip if template doesn't exist - + try: template_dir = os.path.dirname(template_path) template_name = os.path.basename(template_path) - + env = Environment( loader=FileSystemLoader(template_dir), undefined=StrictUndefined ) - + # Add mock functions env.globals['lookup'] = mock_lookup env.filters['to_uuid'] = mock_to_uuid env.filters['bool'] = mock_bool - + template = env.get_template(template_name) - + # Add item context for templates that use loops if 'client' in template_name: test_vars['item'] = ('test-user', 'test-user') - + # Try to render output = template.render(**test_vars) - + # Basic validation - should produce some output assert len(output) > 0, f"Empty output from {template_path}" - + except UndefinedError as e: errors.append(f"{template_path}: Missing variable - {e}") except Exception as e: errors.append(f"{template_path}: Render error - {e}") - + if errors: - print(f"✗ Critical template rendering failed:") + print("✗ Critical template rendering failed:") for error in errors: print(f" - {error}") assert False, "Critical template rendering errors" @@ -172,21 +170,21 @@ def test_variable_consistency(): 'dns_servers', 'users', ] - + # Check if main.yml defines these if os.path.exists('main.yml'): with open('main.yml') as f: content = f.read() - + missing = [] for var in common_vars: # Simple check - could be improved if var not in content: missing.append(var) - + if missing: print(f"⚠ Variables possibly not defined in main.yml: {missing}") - + print("✓ Variable consistency check completed") @@ -214,31 +212,31 @@ def test_wireguard_ipv6_endpoints(): 'expected_endpoint': 'Endpoint = [fe80::1%eth0]:51820' }, ] - + template_path = 'roles/wireguard/templates/client.conf.j2' if not os.path.exists(template_path): print(f"⚠ Skipping IPv6 endpoint test - {template_path} not found") return - + base_vars = get_test_variables() errors = [] - + for test_case in test_cases: try: # Set up test variables test_vars = {**base_vars, **test_case} test_vars['item'] = ('test-user', 'test-user') - + # Render template env = Environment( loader=FileSystemLoader('roles/wireguard/templates'), undefined=StrictUndefined ) env.globals['lookup'] = mock_lookup - + template = env.get_template('client.conf.j2') output = template.render(**test_vars) - + # Check if the expected endpoint format is in the output if test_case['expected_endpoint'] not in output: errors.append(f"Expected '{test_case['expected_endpoint']}' for IP '{test_case['IP_subject_alt_name']}' but not found in output") @@ -246,10 +244,10 @@ def test_wireguard_ipv6_endpoints(): for line in output.split('\n'): if 'Endpoint' in line: errors.append(f" Found: {line.strip()}") - + except Exception as e: errors.append(f"Error testing {test_case['IP_subject_alt_name']}: {e}") - + if errors: print("✗ WireGuard IPv6 endpoint test failed:") for error in errors: @@ -270,7 +268,7 @@ def test_template_conditionals(): 'dns_adblocking': True, 'algo_ssh_tunneling': False, }, - # IPsec enabled, WireGuard disabled + # IPsec enabled, WireGuard disabled { 'wireguard_enabled': False, 'ipsec_enabled': True, @@ -287,48 +285,48 @@ def test_template_conditionals(): 'algo_ssh_tunneling': True, }, ] - + base_vars = get_test_variables() - + for i, test_case in enumerate(test_cases): # Merge test case with base vars test_vars = {**base_vars, **test_case} - + # Test a few templates that have conditionals conditional_templates = [ 'roles/common/templates/rules.v4.j2', ] - + for template_path in conditional_templates: if not os.path.exists(template_path): continue - + try: template_dir = os.path.dirname(template_path) template_name = os.path.basename(template_path) - + env = Environment( loader=FileSystemLoader(template_dir), undefined=StrictUndefined ) - + # Add mock functions env.globals['lookup'] = mock_lookup env.filters['to_uuid'] = mock_to_uuid env.filters['bool'] = mock_bool - + template = env.get_template(template_name) output = template.render(**test_vars) - + # Verify conditionals work if test_case.get('wireguard_enabled'): assert str(test_vars['wireguard_port']) in output, \ f"WireGuard port missing when enabled (case {i})" - + except Exception as e: print(f"✗ Conditional test failed for {template_path} case {i}: {e}") raise - + print("✓ Template conditional tests passed") @@ -340,7 +338,7 @@ if __name__ == "__main__": print("⚠ Skipping template tests - jinja2 not installed") print(" Run: pip install jinja2") sys.exit(0) - + tests = [ test_template_syntax, test_critical_templates, @@ -348,7 +346,7 @@ if __name__ == "__main__": test_wireguard_ipv6_endpoints, test_template_conditionals, ] - + failed = 0 for test in tests: try: @@ -359,9 +357,9 @@ if __name__ == "__main__": except Exception as e: print(f"✗ {test.__name__} error: {e}") failed += 1 - + if failed > 0: print(f"\n{failed} tests failed") sys.exit(1) else: - print(f"\nAll {len(tests)} template tests passed!") \ No newline at end of file + print(f"\nAll {len(tests)} template tests passed!")