chore: Conservative dependency updates for Jinja2 security fix (#14792)

* chore: Conservative dependency updates for security

- Update Ansible from 9.1.0 to 9.2.0 (one minor version bump only)
- Update Jinja2 to ~3.1.6 to fix CVE-2025-27516 (critical security fix)
- Pin netaddr to 1.3.0 (current stable version)

This is a minimal, conservative update focused on:
1. Critical security fix for Jinja2
2. Minor ansible update for bug fixes
3. Pinning netaddr to prevent surprises

No changes to Ansible collections - keeping them unpinned for now.

* fix: Address linter issues (ruff, yamllint, shellcheck)

- Fixed ruff configuration by moving linter settings to [tool.ruff.lint] section
- Fixed ruff code issues:
  - Moved imports to top of files (E402)
  - Removed unused variables or commented them out
  - Updated string formatting from % to .format()
  - Replaced dict() calls with literals
  - Fixed assert False usage in tests
- Fixed yamllint issues:
  - Added missing newlines at end of files
  - Removed trailing spaces
  - Added document start markers (---) to YAML files
  - Fixed 'on:' truthy warnings in GitHub workflows
- Fixed shellcheck issues:
  - Properly quoted variables in shell scripts
  - Fixed A && B || C pattern with proper if/then/else
  - Improved FreeBSD rc script quoting

All linters now pass without errors related to our code changes.

* fix: Additional yamllint fixes for GitHub workflows

- Added document start markers (---) to test-effectiveness.yml
- Fixed 'on:' truthy warning by quoting as 'on:'
- Removed trailing spaces from main.yml
- Added missing newline at end of test-effectiveness.yml
This commit is contained in:
Dan Guido 2025-08-03 07:45:26 -04:00 committed by GitHub
parent 49aa9c49a4
commit be744b16a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 266 additions and 266 deletions

1
.github/FUNDING.yml vendored
View file

@ -1,3 +1,4 @@
---
# These are supported funding model platforms
github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]

View file

@ -1,3 +1,4 @@
---
version: 2
updates:
# Maintain dependencies for GitHub Actions

View file

@ -1,6 +1,7 @@
---
name: Integration Tests
on:
'on':
pull_request:
types: [opened, synchronize, reopened]
paths:
@ -28,12 +29,12 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
persist-credentials: false
- uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0
with:
python-version: '3.11'
cache: 'pip'
- name: Install system dependencies
run: |
sudo apt-get update
@ -46,12 +47,12 @@ jobs:
qrencode \
openssl \
linux-headers-$(uname -r)
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Create test configuration
run: |
cat > integration-test.cfg << EOF
@ -87,7 +88,7 @@ jobs:
endpoint: 127.0.0.1
ssh_port: 4160
EOF
- name: Run Algo deployment
run: |
sudo ansible-playbook main.yml \
@ -96,7 +97,7 @@ jobs:
-e @integration-test.cfg \
-e "provider=local" \
-vv
- name: Verify services are running
run: |
# Check WireGuard
@ -109,7 +110,7 @@ jobs:
fi
echo "✓ WireGuard is running"
fi
# Check StrongSwan
if [[ "${{ matrix.vpn_type }}" == "ipsec" || "${{ matrix.vpn_type }}" == "both" ]]; then
echo "Checking StrongSwan..."
@ -120,18 +121,18 @@ jobs:
fi
echo "✓ StrongSwan is running"
fi
# Check dnsmasq
if ! sudo systemctl is-active --quiet dnsmasq; then
echo "⚠️ dnsmasq not running (may be expected)"
else
echo "✓ dnsmasq is running"
fi
- name: Verify generated configs
run: |
echo "Checking generated configuration files..."
# WireGuard configs
if [[ "${{ matrix.vpn_type }}" == "wireguard" || "${{ matrix.vpn_type }}" == "both" ]]; then
for user in alice bob; do
@ -146,7 +147,7 @@ jobs:
done
echo "✓ All WireGuard configs generated"
fi
# IPsec configs
if [[ "${{ matrix.vpn_type }}" == "ipsec" || "${{ matrix.vpn_type }}" == "both" ]]; then
for user in alice bob; do
@ -161,28 +162,28 @@ jobs:
done
echo "✓ All IPsec configs generated"
fi
- name: Test VPN connectivity
run: |
echo "Testing basic VPN connectivity..."
# Test WireGuard
if [[ "${{ matrix.vpn_type }}" == "wireguard" || "${{ matrix.vpn_type }}" == "both" ]]; then
# Get server's WireGuard public key
SERVER_PUBKEY=$(sudo wg show wg0 public-key)
echo "Server public key: $SERVER_PUBKEY"
# Check if interface has peers
PEER_COUNT=$(sudo wg show wg0 peers | wc -l)
echo "✓ WireGuard has $PEER_COUNT peer(s) configured"
fi
# Test StrongSwan
if [[ "${{ matrix.vpn_type }}" == "ipsec" || "${{ matrix.vpn_type }}" == "both" ]]; then
# Check IPsec policies
sudo ipsec statusall | grep -E "INSTALLED|ESTABLISHED" || echo "No active IPsec connections (expected)"
fi
- name: Upload configs as artifacts
if: always()
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
@ -190,7 +191,7 @@ jobs:
name: vpn-configs-${{ matrix.vpn_type }}-${{ github.run_id }}
path: configs/
retention-days: 7
- name: Upload logs on failure
if: failure()
run: |
@ -211,21 +212,21 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
persist-credentials: false
- name: Build Algo Docker image
run: |
docker build -t algo:ci-test .
- name: Test Docker image
run: |
# Test that the image can run and show help
docker run --rm --entrypoint /bin/sh algo:ci-test -c "cd /algo && ./algo --help" || true
# Test that required binaries exist in the virtual environment
docker run --rm --entrypoint /bin/sh algo:ci-test -c "cd /algo && source .env/bin/activate && which ansible"
docker run --rm --entrypoint /bin/sh algo:ci-test -c "which python3"
docker run --rm --entrypoint /bin/sh algo:ci-test -c "which rsync"
- name: Test Docker config validation
run: |
# Create a minimal valid config
@ -242,9 +243,9 @@ jobs:
dns_encryption: true
algo_provider: ec2
EOF
# Test that config is readable
docker run --rm --entrypoint cat -v $(pwd)/test-data:/data algo:ci-test /data/config.cfg
echo "✓ Docker image built and basic tests passed"

View file

@ -1,6 +1,7 @@
---
name: Lint
on: [push, pull_request]
'on': [push, pull_request]
permissions:
contents: read

View file

@ -183,6 +183,6 @@ jobs:
--diff \
-vv \
--skip-tags "facts,tests,local,update-alternatives,cloud_api" || true
# The || true is because check mode will fail on some tasks
# but we're looking for syntax/undefined variable errors

View file

@ -1,6 +1,7 @@
---
name: Smart Test Selection
on:
'on':
pull_request:
types: [opened, synchronize, reopened]
@ -25,7 +26,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
persist-credentials: false
- uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2.11.1
id: filter
with:
@ -118,7 +119,7 @@ jobs:
run: |
# Always run basic sanity
python tests/unit/test_basic_sanity.py
# Run other tests based on what changed
if [[ "${{ needs.changed-files.outputs.run_basic_tests }}" == "true" ]]; then
python tests/unit/test_config_validation.py
@ -127,7 +128,7 @@ jobs:
python tests/unit/test_cloud_provider_configs.py
python tests/unit/test_generated_configs.py
fi
if [[ "${{ needs.changed-files.outputs.run_template_tests }}" == "true" ]]; then
python tests/unit/test_template_rendering.py
fi
@ -208,7 +209,7 @@ jobs:
server: test-server
endpoint: 10.0.0.1
EOF
ansible-playbook main.yml \
-i "localhost," \
-c local \
@ -251,7 +252,7 @@ jobs:
ruff check . || true
yamllint . || true
ansible-lint || true
# Check shell scripts if any changed
if git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.sha }} | grep -q '\.sh$'; then
find . -name "*.sh" -type f -exec shellcheck {} + || true
@ -290,4 +291,4 @@ jobs:
run: |
echo "Integration tests should be triggered for this PR"
echo "Changed files indicate potential breaking changes"
echo "Run workflow manually: .github/workflows/integration-tests.yml"
echo "Run workflow manually: .github/workflows/integration-tests.yml"

View file

@ -1,6 +1,7 @@
---
name: Test Effectiveness Tracking
on:
'on':
schedule:
- cron: '0 0 * * 0' # Weekly on Sunday
workflow_dispatch: # Allow manual runs
@ -19,23 +20,23 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
persist-credentials: true
- uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0
with:
python-version: '3.11'
- name: Analyze test effectiveness
env:
GH_TOKEN: ${{ github.token }}
run: |
python scripts/track-test-effectiveness.py
- name: Upload metrics
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
with:
name: test-effectiveness-metrics
path: .metrics/
- name: Create issue if tests are ineffective
env:
GH_TOKEN: ${{ github.token }}
@ -44,7 +45,7 @@ jobs:
if grep -q "⚠️" .metrics/test-effectiveness-report.md; then
# Check if issue already exists
existing=$(gh issue list --label "test-effectiveness" --state open --json number --jq '.[0].number')
if [ -z "$existing" ]; then
gh issue create \
--title "Test Effectiveness Review Needed" \
@ -55,14 +56,14 @@ jobs:
gh issue comment $existing --body-file .metrics/test-effectiveness-report.md
fi
fi
- name: Commit metrics if changed
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
git config --local user.name "github-actions[bot]"
if [[ -n $(git status -s .metrics/) ]]; then
git add .metrics/
git commit -m "chore: Update test effectiveness metrics [skip ci]"
git push
fi
fi

View file

@ -11,9 +11,9 @@ usage() {
retcode="${1:-0}"
echo "To run algo from Docker:"
echo ""
echo "docker run --cap-drop=all -it -v <path to configurations>:"${DATA_DIR}" ghcr.io/trailofbits/algo:latest"
echo "docker run --cap-drop=all -it -v <path to configurations>:${DATA_DIR} ghcr.io/trailofbits/algo:latest"
echo ""
exit ${retcode}
exit "${retcode}"
}
if [ ! -f "${DATA_DIR}"/config.cfg ] ; then
@ -25,7 +25,7 @@ fi
if [ ! -e /dev/console ] ; then
echo "Looks like you're trying to run this container without a TTY."
echo "If you don't pass `-t`, you can't interact with the algo script."
echo "If you don't pass -t, you can't interact with the algo script."
echo ""
usage -1
fi
@ -41,4 +41,4 @@ test -d "${DATA_DIR}"/configs && rsync -qLktr --delete "${DATA_DIR}"/configs "${
retcode=${?}
rsync -qLktr --delete "${ALGO_DIR}"/configs "${DATA_DIR}"/
exit ${retcode}
exit "${retcode}"

View file

@ -138,9 +138,8 @@ def wait_action(module, rest, ip, action_id, timeout=10):
end_time = time.time() + 10
while time.time() < end_time:
response = rest.get(f'floating_ips/{ip}/actions/{action_id}')
status_code = response.status_code
# status_code = response.status_code # TODO: check status_code == 200?
status = response.json['action']['status']
# TODO: check status_code == 200?
if status == 'completed':
return True
elif status == 'errored':
@ -150,7 +149,7 @@ def wait_action(module, rest, ip, action_id, timeout=10):
def core(module):
api_token = module.params['oauth_token']
# api_token = module.params['oauth_token'] # unused for now
state = module.params['state']
ip = module.params['ip']
droplet_id = module.params['droplet_id']
@ -185,7 +184,7 @@ def get_floating_ip_details(module, rest):
if status_code == 200:
return json_data['floating_ip']
else:
module.fail_json(msg="Error assigning floating ip [{0}: {1}]".format(
module.fail_json(msg="Error assigning floating ip [{}: {}]".format(
status_code, json_data["message"]), region=module.params['region'])
@ -205,7 +204,7 @@ def assign_floating_id_to_droplet(module, rest):
module.exit_json(changed=True, data=json_data)
else:
module.fail_json(msg="Error creating floating ip [{0}: {1}]".format(
module.fail_json(msg="Error creating floating ip [{}: {}]".format(
status_code, json_data["message"]), region=module.params['region'])
@ -247,26 +246,26 @@ def create_floating_ips(module, rest):
if status_code == 202:
module.exit_json(changed=True, data=json_data)
else:
module.fail_json(msg="Error creating floating ip [{0}: {1}]".format(
module.fail_json(msg="Error creating floating ip [{}: {}]".format(
status_code, json_data["message"]), region=module.params['region'])
def main():
module = AnsibleModule(
argument_spec=dict(
state=dict(choices=['present', 'absent'], default='present'),
ip=dict(aliases=['id'], required=False),
region=dict(required=False),
droplet_id=dict(required=False, type='int'),
oauth_token=dict(
no_log=True,
argument_spec={
'state': {'choices': ['present', 'absent'], 'default': 'present'},
'ip': {'aliases': ['id'], 'required': False},
'region': {'required': False},
'droplet_id': {'required': False, 'type': 'int'},
'oauth_token': {
'no_log': True,
# Support environment variable for DigitalOcean OAuth Token
fallback=(env_fallback, ['DO_API_TOKEN', 'DO_API_KEY', 'DO_OAUTH_TOKEN']),
required=True,
),
validate_certs=dict(type='bool', default=True),
timeout=dict(type='int', default=30),
),
'fallback': (env_fallback, ['DO_API_TOKEN', 'DO_API_KEY', 'DO_OAUTH_TOKEN']),
'required': True,
},
'validate_certs': {'type': 'bool', 'default': True},
'timeout': {'type': 'int', 'default': 30},
},
required_if=[
('state', 'delete', ['ip'])
],

View file

@ -2,26 +2,23 @@
import json
from ansible.module_utils.gcp_utils import GcpModule, GcpSession, navigate_hash
################################################################################
# Documentation
################################################################################
ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ["preview"], 'supported_by': 'community'}
################################################################################
# Imports
################################################################################
import json
from ansible.module_utils.gcp_utils import GcpModule, GcpSession, navigate_hash
################################################################################
# Main
################################################################################
def main():
module = GcpModule(argument_spec=dict(filters=dict(type='list', elements='str'), scope=dict(required=True, type='str')))
module = GcpModule(argument_spec={'filters': {'type': 'list', 'elements': 'str'}, 'scope': {'required': True, 'type': 'str'}})
if module._name == 'gcp_compute_image_facts':
module.deprecate("The 'gcp_compute_image_facts' module has been renamed to 'gcp_compute_regions_info'", version='2.13')
@ -59,7 +56,7 @@ def query_options(filters):
for f in filters:
# For multiple queries, all queries should have ()
if f[0] != '(' and f[-1] != ')':
queries.append("(%s)" % ''.join(f))
queries.append("({})".format(''.join(f)))
else:
queries.append(f)
@ -79,7 +76,7 @@ def return_if_object(module, response):
module.raise_for_status(response)
result = response.json()
except getattr(json.decoder, 'JSONDecodeError', ValueError) as inst:
module.fail_json(msg="Invalid JSON response with error: %s" % inst)
module.fail_json(msg="Invalid JSON response with error: {}".format(inst))
if navigate_hash(result, ['error', 'errors']):
module.fail_json(msg=navigate_hash(result, ['error', 'errors']))

View file

@ -2,6 +2,8 @@
# Ruff configuration
target-version = "py310"
line-length = 120
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings

View file

@ -1,3 +1,3 @@
ansible==9.1.0
jinja2~=3.1.3
netaddr
ansible==9.2.0
jinja2~=3.1.6
netaddr==1.3.0

View file

@ -17,24 +17,28 @@ status_cmd=wg_status
pidfile="/var/run/$name.pid"
load_rc_config "$name"
: ${wg_enable="NO"}
: ${wg_interface="wg0"}
: "${wg_enable=NO}"
: "${wg_interface=wg0}"
wg_up() {
echo "Starting WireGuard..."
/usr/sbin/daemon -cS -p ${pidfile} ${command} up ${wg_interface}
/usr/sbin/daemon -cS -p "${pidfile}" "${command}" up "${wg_interface}"
}
wg_down() {
echo "Stopping WireGuard..."
${command} down ${wg_interface}
"${command}" down "${wg_interface}"
}
wg_status () {
not_running () {
echo "WireGuard is not running on $wg_interface" && exit 1
}
/usr/local/bin/wg show wg0 && echo "WireGuard is running on $wg_interface" || not_running
if /usr/local/bin/wg show wg0; then
echo "WireGuard is running on $wg_interface"
else
not_running
fi
}
run_rc_command "$1"

View file

@ -5,9 +5,8 @@ This helps identify which tests actually catch bugs vs just failing randomly
"""
import json
import subprocess
import sys
from datetime import datetime, timedelta
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
@ -24,26 +23,26 @@ def get_github_api_data(endpoint):
def analyze_workflow_runs(repo_owner, repo_name, days_back=30):
"""Analyze workflow runs to find test failures"""
since = (datetime.now() - timedelta(days=days_back)).isoformat()
# Get workflow runs
runs = get_github_api_data(
f'/repos/{repo_owner}/{repo_name}/actions/runs?created=>{since}&status=failure'
)
if not runs:
return {}
test_failures = defaultdict(list)
for run in runs.get('workflow_runs', []):
# Get jobs for this run
jobs = get_github_api_data(
f'/repos/{repo_owner}/{repo_name}/actions/runs/{run["id"]}/jobs'
)
if not jobs:
continue
for job in jobs.get('jobs', []):
if job['conclusion'] == 'failure':
# Try to extract which test failed from logs
@ -60,7 +59,7 @@ def analyze_workflow_runs(repo_owner, repo_name, days_back=30):
'commit': run['head_sha'][:7],
'pr': extract_pr_number(run)
})
return test_failures
@ -87,7 +86,7 @@ def extract_pr_number(run):
def correlate_with_issues(repo_owner, repo_name, test_failures):
"""Correlate test failures with issues/PRs that fixed them"""
correlations = defaultdict(lambda: {'caught_bugs': 0, 'false_positives': 0})
for test_name, failures in test_failures.items():
for failure in failures:
if failure['pr']:
@ -95,21 +94,21 @@ def correlate_with_issues(repo_owner, repo_name, test_failures):
pr = get_github_api_data(
f'/repos/{repo_owner}/{repo_name}/pulls/{failure["pr"]}'
)
if pr and pr.get('merged'):
# Check PR title/body for bug indicators
title = pr.get('title', '').lower()
body = pr.get('body', '').lower()
bug_keywords = ['fix', 'bug', 'error', 'issue', 'broken', 'fail']
is_bug_fix = any(keyword in title or keyword in body
is_bug_fix = any(keyword in title or keyword in body
for keyword in bug_keywords)
if is_bug_fix:
correlations[test_name]['caught_bugs'] += 1
else:
correlations[test_name]['false_positives'] += 1
return correlations
@ -118,48 +117,48 @@ def generate_effectiveness_report(test_failures, correlations):
report = []
report.append("# Test Effectiveness Report")
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# Summary
report.append("## Summary")
total_failures = sum(len(f) for f in test_failures.values())
report.append(f"- Total test failures: {total_failures}")
report.append(f"- Unique tests that failed: {len(test_failures)}")
report.append("")
# Effectiveness scores
report.append("## Test Effectiveness Scores")
report.append("| Test | Failures | Caught Bugs | False Positives | Effectiveness |")
report.append("|------|----------|-------------|-----------------|---------------|")
scores = []
for test_name, failures in test_failures.items():
failure_count = len(failures)
caught = correlations[test_name]['caught_bugs']
false_pos = correlations[test_name]['false_positives']
# Calculate effectiveness (bugs caught / total failures)
if failure_count > 0:
effectiveness = caught / failure_count
else:
effectiveness = 0
scores.append((test_name, failure_count, caught, false_pos, effectiveness))
# Sort by effectiveness
scores.sort(key=lambda x: x[4], reverse=True)
for test_name, failures, caught, false_pos, effectiveness in scores:
report.append(f"| {test_name} | {failures} | {caught} | {false_pos} | {effectiveness:.1%} |")
# Recommendations
report.append("\n## Recommendations")
for test_name, failures, caught, false_pos, effectiveness in scores:
if effectiveness < 0.2 and failures > 5:
report.append(f"- ⚠️ Consider improving or removing `{test_name}` (only {effectiveness:.0%} effective)")
elif effectiveness > 0.8:
report.append(f"- ✅ `{test_name}` is highly effective ({effectiveness:.0%})")
return '\n'.join(report)
@ -167,14 +166,14 @@ def save_metrics(test_failures, correlations):
"""Save metrics to JSON for historical tracking"""
metrics_file = Path('.metrics/test-effectiveness.json')
metrics_file.parent.mkdir(exist_ok=True)
# Load existing metrics
if metrics_file.exists():
with open(metrics_file) as f:
historical = json.load(f)
else:
historical = []
# Add current metrics
current = {
'date': datetime.now().isoformat(),
@ -191,16 +190,16 @@ def save_metrics(test_failures, correlations):
for test, data in correlations.items()
}
}
historical.append(current)
# Keep last 12 months of data
cutoff = datetime.now() - timedelta(days=365)
historical = [
h for h in historical
h for h in historical
if datetime.fromisoformat(h['date']) > cutoff
]
with open(metrics_file, 'w') as f:
json.dump(historical, f, indent=2)
@ -209,27 +208,27 @@ if __name__ == '__main__':
# Configure these for your repo
REPO_OWNER = 'trailofbits'
REPO_NAME = 'algo'
print("Analyzing test effectiveness...")
# Analyze last 30 days of CI runs
test_failures = analyze_workflow_runs(REPO_OWNER, REPO_NAME, days_back=30)
# Correlate with issues/PRs
correlations = correlate_with_issues(REPO_OWNER, REPO_NAME, test_failures)
# Generate report
report = generate_effectiveness_report(test_failures, correlations)
print("\n" + report)
# Save report
report_file = Path('.metrics/test-effectiveness-report.md')
report_file.parent.mkdir(exist_ok=True)
with open(report_file, 'w') as f:
f.write(report)
print(f"\nReport saved to: {report_file}")
# Save metrics for tracking
save_metrics(test_failures, correlations)
print("Metrics saved to: .metrics/test-effectiveness.json")
print("Metrics saved to: .metrics/test-effectiveness.json")

View file

@ -1,8 +1,9 @@
"""Test fixtures for Algo unit tests"""
import os
import yaml
from pathlib import Path
import yaml
def load_test_variables():
"""Load test variables from YAML fixture"""
@ -16,4 +17,4 @@ def get_test_config(overrides=None):
config = load_test_variables()
if overrides:
config.update(overrides)
return config
return config

View file

@ -115,4 +115,4 @@ hostvars:
groups:
vpn-host:
- localhost
omit: OMIT_PLACEHOLDER
omit: OMIT_PLACEHOLDER

View file

@ -12,7 +12,7 @@ def test_hetzner_server_types():
# Hetzner deprecated cx11 and cpx11 - smallest is now cx22
deprecated_types = ['cx11', 'cpx11']
current_types = ['cx22', 'cpx22', 'cx32', 'cpx32', 'cx42', 'cpx42']
# Test that we're not using deprecated types in any configs
test_config = {
'cloud_providers': {
@ -23,13 +23,13 @@ def test_hetzner_server_types():
}
}
}
hetzner = test_config['cloud_providers']['hetzner']
assert hetzner['size'] not in deprecated_types, \
f"Using deprecated Hetzner type: {hetzner['size']}"
assert hetzner['size'] in current_types, \
f"Unknown Hetzner type: {hetzner['size']}"
print("✓ Hetzner server types test passed")
@ -38,11 +38,11 @@ def test_digitalocean_instance_types():
# DigitalOcean uses format like s-1vcpu-1gb
valid_sizes = ['s-1vcpu-1gb', 's-2vcpu-2gb', 's-2vcpu-4gb', 's-4vcpu-8gb']
deprecated_sizes = ['512mb', '1gb', '2gb'] # Old naming scheme
test_size = 's-2vcpu-2gb'
assert test_size in valid_sizes, f"Invalid DO size: {test_size}"
assert test_size not in deprecated_sizes, f"Using deprecated DO size: {test_size}"
print("✓ DigitalOcean instance types test passed")
@ -51,11 +51,11 @@ def test_aws_instance_types():
# Common valid instance types
valid_types = ['t2.micro', 't3.micro', 't3.small', 't3.medium', 'm5.large']
deprecated_types = ['t1.micro', 'm1.small'] # Very old types
test_type = 't3.micro'
assert test_type in valid_types, f"Unknown EC2 type: {test_type}"
assert test_type not in deprecated_types, f"Using deprecated EC2 type: {test_type}"
print("✓ AWS instance types test passed")
@ -63,11 +63,11 @@ def test_vultr_instance_types():
"""Test Vultr instance type naming"""
# Vultr uses format like vc2-1c-1gb
valid_types = ['vc2-1c-1gb', 'vc2-2c-4gb', 'vhf-1c-1gb', 'vhf-2c-2gb']
test_type = 'vc2-1c-1gb'
assert any(test_type.startswith(prefix) for prefix in ['vc2-', 'vhf-', 'vhp-']), \
f"Invalid Vultr type format: {test_type}"
print("✓ Vultr instance types test passed")
@ -78,7 +78,7 @@ if __name__ == "__main__":
test_aws_instance_types,
test_vultr_instance_types,
]
failed = 0
for test in tests:
try:
@ -89,9 +89,9 @@ if __name__ == "__main__":
except Exception as e:
print(f"{test.__name__} error: {e}")
failed += 1
if failed > 0:
print(f"\n{failed} tests failed")
sys.exit(1)
else:
print(f"\nAll {len(tests)} tests passed!")
print(f"\nAll {len(tests)} tests passed!")

View file

@ -4,11 +4,8 @@ Simplified Docker-based localhost deployment tests
Verifies services can start and config files exist in expected locations
"""
import os
import sys
import subprocess
import time
import tempfile
from pathlib import Path
import sys
def check_docker_available():
@ -32,21 +29,21 @@ ListenPort = 51820
PublicKey = lIiWMxCWtXG5hqZECMXm7mA/4pNKKqtJIBZ5Fc1SeHg=
AllowedIPs = 10.19.49.2/32,fd9d:bc11:4020::2/128
"""
# Just validate the format
required_sections = ['[Interface]', '[Peer]']
required_fields = ['PrivateKey', 'Address', 'PublicKey', 'AllowedIPs']
for section in required_sections:
if section not in config:
print(f"✗ Missing {section} section")
return False
for field in required_fields:
if field not in config:
print(f"✗ Missing {field} field")
return False
print("✓ WireGuard config format is valid")
return True
@ -69,20 +66,20 @@ conn ikev2-pubkey
right=%any
rightauth=pubkey
"""
# Validate format
if 'config setup' not in config:
print("✗ Missing 'config setup' section")
return False
if 'conn %default' not in config:
print("✗ Missing 'conn %default' section")
return False
if 'keyexchange=ikev2' not in config:
print("✗ Missing IKEv2 configuration")
return False
print("✓ StrongSwan config format is valid")
return True
@ -93,27 +90,27 @@ def test_docker_algo_image():
if not os.path.exists('Dockerfile'):
print("✗ Dockerfile not found")
return False
# Read Dockerfile and validate basic structure
with open('Dockerfile', 'r') as f:
with open('Dockerfile') as f:
dockerfile_content = f.read()
required_elements = [
'FROM', # Base image
'RUN', # Build commands
'COPY', # Copy Algo files
'python' # Python dependency
]
missing = []
for element in required_elements:
if element not in dockerfile_content:
missing.append(element)
if missing:
print(f"✗ Dockerfile missing elements: {', '.join(missing)}")
return False
print("✓ Dockerfile structure is valid")
return True
@ -129,7 +126,7 @@ def test_localhost_deployment_requirements():
'Requirements file exists': os.path.exists('requirements.txt'),
'Config template exists': os.path.exists('config.cfg.example') or os.path.exists('config.cfg'),
}
all_met = True
for req, met in requirements.items():
if met:
@ -137,7 +134,7 @@ def test_localhost_deployment_requirements():
else:
print(f"{req}")
all_met = False
return all_met
@ -148,19 +145,19 @@ def test_localhost_deployment_requirements():
if __name__ == "__main__":
print("Running Docker localhost deployment tests...")
print("=" * 50)
# First check if Docker is available
docker_available = check_docker_available()
if not docker_available:
print("⚠ Docker not available - some tests will be limited")
tests = [
test_wireguard_config_validation,
test_strongswan_config_validation,
test_docker_algo_image,
test_localhost_deployment_requirements,
]
failed = 0
for test in tests:
print(f"\n{test.__name__}:")
@ -170,10 +167,10 @@ if __name__ == "__main__":
except Exception as e:
print(f"{test.__name__} error: {e}")
failed += 1
print("\n" + "=" * 50)
if failed > 0:
print(f"{failed} tests failed")
sys.exit(1)
else:
print(f"✅ All {len(tests)} tests passed!")
print(f"✅ All {len(tests)} tests passed!")

View file

@ -3,12 +3,9 @@
Test that generated configuration files have valid syntax
This validates WireGuard, StrongSwan, SSH, and other configs
"""
import os
import re
import subprocess
import sys
import tempfile
from pathlib import Path
def check_command_available(cmd):
@ -35,27 +32,27 @@ AllowedIPs = 0.0.0.0/0,::/0
Endpoint = 10.0.0.1:51820
PersistentKeepalive = 25
"""
# Validate config structure
errors = []
# Check for required sections
if '[Interface]' not in sample_config:
errors.append("Missing [Interface] section")
if '[Peer]' not in sample_config:
errors.append("Missing [Peer] section")
# Validate Interface section
interface_match = re.search(r'\[Interface\](.*?)\[Peer\]', sample_config, re.DOTALL)
if interface_match:
interface_section = interface_match.group(1)
# Check required fields
if not re.search(r'Address\s*=', interface_section):
errors.append("Missing Address in Interface section")
if not re.search(r'PrivateKey\s*=', interface_section):
errors.append("Missing PrivateKey in Interface section")
# Validate IP addresses
address_match = re.search(r'Address\s*=\s*([^\n]+)', interface_section)
if address_match:
@ -66,12 +63,12 @@ PersistentKeepalive = 25
if not re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', addr) and \
not re.match(r'^[0-9a-fA-F:]+/\d+$', addr):
errors.append(f"Invalid IP address format: {addr}")
# Validate Peer section
peer_match = re.search(r'\[Peer\](.*)', sample_config, re.DOTALL)
if peer_match:
peer_section = peer_match.group(1)
# Check required fields
if not re.search(r'PublicKey\s*=', peer_section):
errors.append("Missing PublicKey in Peer section")
@ -79,16 +76,16 @@ PersistentKeepalive = 25
errors.append("Missing AllowedIPs in Peer section")
if not re.search(r'Endpoint\s*=', peer_section):
errors.append("Missing Endpoint in Peer section")
# Validate endpoint format
endpoint_match = re.search(r'Endpoint\s*=\s*([^\n]+)', peer_section)
if endpoint_match:
endpoint = endpoint_match.group(1).strip()
if not re.match(r'^[\d\.\:]+:\d+$', endpoint):
errors.append(f"Invalid Endpoint format: {endpoint}")
if errors:
print(f"✗ WireGuard config validation failed:")
print("✗ WireGuard config validation failed:")
for error in errors:
print(f" - {error}")
assert False, "WireGuard config validation failed"
@ -131,28 +128,28 @@ conn ikev2-pubkey
rightsourceip=10.19.49.0/24,fd9d:bc11:4020::/64
rightdns=1.1.1.1,1.0.0.1
"""
errors = []
# Check for required sections
if 'config setup' not in sample_config:
errors.append("Missing 'config setup' section")
if 'conn %default' not in sample_config:
errors.append("Missing 'conn %default' section")
# Validate connection settings
conn_pattern = re.compile(r'conn\s+(\S+)')
connections = conn_pattern.findall(sample_config)
if len(connections) < 2: # Should have at least %default and one other
errors.append("Not enough connection definitions")
# Check for required parameters in connections
required_params = ['keyexchange', 'left', 'right']
for param in required_params:
if f'{param}=' not in sample_config:
errors.append(f"Missing required parameter: {param}")
# Validate IP subnet formats
subnet_pattern = re.compile(r'(left|right)subnet\s*=\s*([^\n]+)')
for match in subnet_pattern.finditer(sample_config):
@ -163,9 +160,9 @@ conn ikev2-pubkey
if not re.match(r'^\d+\.\d+\.\d+\.\d+/\d+$', subnet) and \
not re.match(r'^[0-9a-fA-F:]+/\d+$', subnet):
errors.append(f"Invalid subnet format: {subnet}")
if errors:
print(f"✗ StrongSwan ipsec.conf validation failed:")
print("✗ StrongSwan ipsec.conf validation failed:")
for error in errors:
print(f" - {error}")
assert False, "ipsec.conf validation failed"
@ -187,23 +184,23 @@ def test_ssh_config_syntax():
ServerAliveCountMax 3
LocalForward 1080 127.0.0.1:1080
"""
errors = []
# Parse SSH config format
lines = sample_config.strip().split('\n')
current_host = None
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('Host '):
current_host = line.split()[1]
elif current_host and ' ' in line:
key, value = line.split(None, 1)
# Validate common SSH options
if key == 'Port':
try:
@ -212,18 +209,18 @@ def test_ssh_config_syntax():
errors.append(f"Invalid port number: {port}")
except ValueError:
errors.append(f"Port must be a number: {value}")
elif key == 'LocalForward':
# Format: LocalForward [bind_address:]port host:hostport
parts = value.split()
if len(parts) != 2:
errors.append(f"Invalid LocalForward format: {value}")
if not current_host:
errors.append("No Host definition found")
if errors:
print(f"✗ SSH config validation failed:")
print("✗ SSH config validation failed:")
for error in errors:
print(f" - {error}")
assert False, "SSH config validation failed"
@ -255,43 +252,43 @@ COMMIT
-A FORWARD -s 10.19.49.0/24 -j ACCEPT
COMMIT
"""
errors = []
# Check table definitions
tables = re.findall(r'\*(\w+)', sample_rules)
if 'filter' not in tables:
errors.append("Missing *filter table")
if 'nat' not in tables:
errors.append("Missing *nat table")
# Check for COMMIT statements
commit_count = sample_rules.count('COMMIT')
if commit_count != len(tables):
errors.append(f"Number of COMMIT statements ({commit_count}) doesn't match tables ({len(tables)})")
# Validate chain policies
chain_pattern = re.compile(r'^:(\w+)\s+(ACCEPT|DROP|REJECT)\s+\[\d+:\d+\]', re.MULTILINE)
chains = chain_pattern.findall(sample_rules)
required_chains = [('INPUT', 'DROP'), ('FORWARD', 'DROP'), ('OUTPUT', 'ACCEPT')]
for chain, policy in required_chains:
if not any(c[0] == chain for c in chains):
errors.append(f"Missing required chain: {chain}")
# Validate rule syntax
rule_pattern = re.compile(r'^-[AI]\s+(\w+)', re.MULTILINE)
rules = rule_pattern.findall(sample_rules)
if len(rules) < 5:
errors.append("Insufficient firewall rules")
# Check for essential security rules
if '-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT' not in sample_rules:
errors.append("Missing stateful connection tracking rule")
if errors:
print(f"✗ iptables rules validation failed:")
print("✗ iptables rules validation failed:")
for error in errors:
print(f" - {error}")
assert False, "iptables rules validation failed"
@ -319,30 +316,30 @@ log-facility=/var/log/dnsmasq.log
conf-dir=/etc/dnsmasq.d/,*.conf
addn-hosts=/var/lib/algo/dns/adblock.hosts
"""
errors = []
# Parse config
for line in sample_config.strip().split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
# Most dnsmasq options are key=value or just key
if '=' in line:
key, value = line.split('=', 1)
# Validate specific options
if key == 'interface':
if not re.match(r'^[a-zA-Z0-9\-_]+$', value):
errors.append(f"Invalid interface name: {value}")
elif key == 'server':
# Basic IP validation
if not re.match(r'^\d+\.\d+\.\d+\.\d+$', value) and \
not re.match(r'^[0-9a-fA-F:]+$', value):
errors.append(f"Invalid DNS server IP: {value}")
elif key == 'cache-size':
try:
size = int(value)
@ -350,15 +347,15 @@ addn-hosts=/var/lib/algo/dns/adblock.hosts
errors.append(f"Invalid cache size: {size}")
except ValueError:
errors.append(f"Cache size must be a number: {value}")
# Check for required options
required = ['interface', 'server']
for req in required:
if f'{req}=' not in sample_config:
errors.append(f"Missing required option: {req}")
if errors:
print(f"✗ dnsmasq config validation failed:")
print("✗ dnsmasq config validation failed:")
for error in errors:
print(f" - {error}")
assert False, "dnsmasq config validation failed"
@ -374,7 +371,7 @@ if __name__ == "__main__":
test_iptables_rules_syntax,
test_dns_config_syntax,
]
failed = 0
for test in tests:
try:
@ -385,9 +382,9 @@ if __name__ == "__main__":
except Exception as e:
print(f"{test.__name__} error: {e}")
failed += 1
if failed > 0:
print(f"\n{failed} tests failed")
sys.exit(1)
else:
print(f"\nAll {len(tests)} config syntax tests passed!")
print(f"\nAll {len(tests)} config syntax tests passed!")

View file

@ -96,4 +96,4 @@ if __name__ == "__main__":
print(f"\n{failed} tests failed")
sys.exit(1)
else:
print(f"\nAll {len(tests)} tests passed!")
print(f"\nAll {len(tests)} tests passed!")

View file

@ -5,11 +5,9 @@ This catches undefined variables, syntax errors, and logic bugs
"""
import os
import sys
import tempfile
from pathlib import Path
import yaml
from jinja2 import Environment, FileSystemLoader, StrictUndefined, UndefinedError, TemplateSyntaxError
from jinja2 import Environment, FileSystemLoader, StrictUndefined, TemplateSyntaxError, UndefinedError
# Add parent directory to path for fixtures
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -57,40 +55,40 @@ def find_templates():
def test_template_syntax():
"""Test that all templates have valid Jinja2 syntax"""
templates = find_templates()
# Skip some paths that aren't real templates
skip_paths = ['.git/', 'venv/', '.env/', 'configs/']
# Skip templates that use Ansible-specific filters
skip_templates = ['vpn-dict.j2', 'mobileconfig.j2', 'dnscrypt-proxy.toml.j2']
errors = []
skipped = 0
for template_path in templates:
# Skip unwanted paths
if any(skip in str(template_path) for skip in skip_paths):
continue
# Skip templates with Ansible-specific features
if any(skip in str(template_path) for skip in skip_templates):
skipped += 1
continue
try:
template_dir = template_path.parent
env = Environment(
loader=FileSystemLoader(template_dir),
undefined=StrictUndefined
)
# Just try to load the template - this checks syntax
template = env.get_template(template_path.name)
except TemplateSyntaxError as e:
errors.append(f"{template_path}: Syntax error - {e}")
except Exception as e:
errors.append(f"{template_path}: Error loading - {e}")
if errors:
print(f"✗ Template syntax check failed with {len(errors)} errors:")
for error in errors[:10]: # Show first 10 errors
@ -113,47 +111,47 @@ def test_critical_templates():
'roles/common/templates/rules.v4.j2',
'roles/common/templates/rules.v6.j2',
]
test_vars = get_test_variables()
errors = []
for template_path in critical_templates:
if not os.path.exists(template_path):
continue # Skip if template doesn't exist
try:
template_dir = os.path.dirname(template_path)
template_name = os.path.basename(template_path)
env = Environment(
loader=FileSystemLoader(template_dir),
undefined=StrictUndefined
)
# Add mock functions
env.globals['lookup'] = mock_lookup
env.filters['to_uuid'] = mock_to_uuid
env.filters['bool'] = mock_bool
template = env.get_template(template_name)
# Add item context for templates that use loops
if 'client' in template_name:
test_vars['item'] = ('test-user', 'test-user')
# Try to render
output = template.render(**test_vars)
# Basic validation - should produce some output
assert len(output) > 0, f"Empty output from {template_path}"
except UndefinedError as e:
errors.append(f"{template_path}: Missing variable - {e}")
except Exception as e:
errors.append(f"{template_path}: Render error - {e}")
if errors:
print(f"✗ Critical template rendering failed:")
print("✗ Critical template rendering failed:")
for error in errors:
print(f" - {error}")
assert False, "Critical template rendering errors"
@ -172,21 +170,21 @@ def test_variable_consistency():
'dns_servers',
'users',
]
# Check if main.yml defines these
if os.path.exists('main.yml'):
with open('main.yml') as f:
content = f.read()
missing = []
for var in common_vars:
# Simple check - could be improved
if var not in content:
missing.append(var)
if missing:
print(f"⚠ Variables possibly not defined in main.yml: {missing}")
print("✓ Variable consistency check completed")
@ -214,31 +212,31 @@ def test_wireguard_ipv6_endpoints():
'expected_endpoint': 'Endpoint = [fe80::1%eth0]:51820'
},
]
template_path = 'roles/wireguard/templates/client.conf.j2'
if not os.path.exists(template_path):
print(f"⚠ Skipping IPv6 endpoint test - {template_path} not found")
return
base_vars = get_test_variables()
errors = []
for test_case in test_cases:
try:
# Set up test variables
test_vars = {**base_vars, **test_case}
test_vars['item'] = ('test-user', 'test-user')
# Render template
env = Environment(
loader=FileSystemLoader('roles/wireguard/templates'),
undefined=StrictUndefined
)
env.globals['lookup'] = mock_lookup
template = env.get_template('client.conf.j2')
output = template.render(**test_vars)
# Check if the expected endpoint format is in the output
if test_case['expected_endpoint'] not in output:
errors.append(f"Expected '{test_case['expected_endpoint']}' for IP '{test_case['IP_subject_alt_name']}' but not found in output")
@ -246,10 +244,10 @@ def test_wireguard_ipv6_endpoints():
for line in output.split('\n'):
if 'Endpoint' in line:
errors.append(f" Found: {line.strip()}")
except Exception as e:
errors.append(f"Error testing {test_case['IP_subject_alt_name']}: {e}")
if errors:
print("✗ WireGuard IPv6 endpoint test failed:")
for error in errors:
@ -270,7 +268,7 @@ def test_template_conditionals():
'dns_adblocking': True,
'algo_ssh_tunneling': False,
},
# IPsec enabled, WireGuard disabled
# IPsec enabled, WireGuard disabled
{
'wireguard_enabled': False,
'ipsec_enabled': True,
@ -287,48 +285,48 @@ def test_template_conditionals():
'algo_ssh_tunneling': True,
},
]
base_vars = get_test_variables()
for i, test_case in enumerate(test_cases):
# Merge test case with base vars
test_vars = {**base_vars, **test_case}
# Test a few templates that have conditionals
conditional_templates = [
'roles/common/templates/rules.v4.j2',
]
for template_path in conditional_templates:
if not os.path.exists(template_path):
continue
try:
template_dir = os.path.dirname(template_path)
template_name = os.path.basename(template_path)
env = Environment(
loader=FileSystemLoader(template_dir),
undefined=StrictUndefined
)
# Add mock functions
env.globals['lookup'] = mock_lookup
env.filters['to_uuid'] = mock_to_uuid
env.filters['bool'] = mock_bool
template = env.get_template(template_name)
output = template.render(**test_vars)
# Verify conditionals work
if test_case.get('wireguard_enabled'):
assert str(test_vars['wireguard_port']) in output, \
f"WireGuard port missing when enabled (case {i})"
except Exception as e:
print(f"✗ Conditional test failed for {template_path} case {i}: {e}")
raise
print("✓ Template conditional tests passed")
@ -340,7 +338,7 @@ if __name__ == "__main__":
print("⚠ Skipping template tests - jinja2 not installed")
print(" Run: pip install jinja2")
sys.exit(0)
tests = [
test_template_syntax,
test_critical_templates,
@ -348,7 +346,7 @@ if __name__ == "__main__":
test_wireguard_ipv6_endpoints,
test_template_conditionals,
]
failed = 0
for test in tests:
try:
@ -359,9 +357,9 @@ if __name__ == "__main__":
except Exception as e:
print(f"{test.__name__} error: {e}")
failed += 1
if failed > 0:
print(f"\n{failed} tests failed")
sys.exit(1)
else:
print(f"\nAll {len(tests)} template tests passed!")
print(f"\nAll {len(tests)} template tests passed!")