diff --git a/library/x25519_pubkey.py b/library/x25519_pubkey.py new file mode 100755 index 00000000..d56c2fa6 --- /dev/null +++ b/library/x25519_pubkey.py @@ -0,0 +1,135 @@ +#!/usr/bin/python + +# x25519_pubkey.py - Ansible module to derive a base64-encoded WireGuard-compatible public key +# from a base64-encoded 32-byte X25519 private key. +# +# Why: community.crypto does not provide raw public key derivation for X25519 keys. + +import base64 + +from ansible.module_utils.basic import AnsibleModule +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import x25519 + +""" +Ansible module to derive base64-encoded X25519 public keys from private keys. + +Supports both base64-encoded strings and raw 32-byte key files. +Used for WireGuard key generation where community.crypto lacks raw public key derivation. + +Parameters: +- private_key_b64: Base64-encoded X25519 private key string +- private_key_path: Path to file containing X25519 private key (base64 or raw 32 bytes) +- public_key_path: Path where the derived public key should be written + +Returns: +- public_key: Base64-encoded X25519 public key +- changed: Whether the public key file was modified +- public_key_path: Path where public key was written (if specified) +""" + + +def run_module(): + """ + Main execution function for the x25519_pubkey Ansible module. + + Handles parameter validation, private key processing, public key derivation, + and optional file output with idempotent behavior. + """ + module_args = { + 'private_key_b64': {'type': 'str', 'required': False}, + 'private_key_path': {'type': 'path', 'required': False}, + 'public_key_path': {'type': 'path', 'required': False}, + } + + result = { + 'changed': False, + 'public_key': '', + } + + module = AnsibleModule( + argument_spec=module_args, + required_one_of=[['private_key_b64', 'private_key_path']], + supports_check_mode=True + ) + + priv_b64 = None + + if module.params['private_key_path']: + try: + with open(module.params['private_key_path'], 'rb') as f: + data = f.read() + try: + # First attempt: assume file contains base64 text data + # Strip whitespace from edges for text files (safe for base64 strings) + stripped_data = data.strip() + base64.b64decode(stripped_data, validate=True) + priv_b64 = stripped_data.decode() + except (base64.binascii.Error, ValueError): + # Second attempt: assume file contains raw binary data + # CRITICAL: Do NOT strip raw binary data - X25519 keys can contain + # whitespace-like bytes (0x09, 0x0A, etc.) that must be preserved + # Stripping would corrupt the key and cause "got 31 bytes" errors + if len(data) != 32: + module.fail_json(msg=f"Private key file must be either base64 or exactly 32 raw bytes, got {len(data)} bytes") + priv_b64 = base64.b64encode(data).decode() + except OSError as e: + module.fail_json(msg=f"Failed to read private key file: {e}") + else: + priv_b64 = module.params['private_key_b64'] + + # Validate input parameters + if not priv_b64: + module.fail_json(msg="No private key provided") + + try: + priv_raw = base64.b64decode(priv_b64, validate=True) + except Exception as e: + module.fail_json(msg=f"Invalid base64 private key format: {e}") + + if len(priv_raw) != 32: + module.fail_json(msg=f"Private key must decode to exactly 32 bytes, got {len(priv_raw)}") + + try: + priv_key = x25519.X25519PrivateKey.from_private_bytes(priv_raw) + pub_key = priv_key.public_key() + pub_raw = pub_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw + ) + pub_b64 = base64.b64encode(pub_raw).decode() + result['public_key'] = pub_b64 + + if module.params['public_key_path']: + pub_path = module.params['public_key_path'] + existing = None + + try: + with open(pub_path) as f: + existing = f.read().strip() + except OSError: + existing = None + + if existing != pub_b64: + try: + with open(pub_path, 'w') as f: + f.write(pub_b64) + result['changed'] = True + except OSError as e: + module.fail_json(msg=f"Failed to write public key file: {e}") + + result['public_key_path'] = pub_path + + except Exception as e: + module.fail_json(msg=f"Failed to derive public key: {e}") + + module.exit_json(**result) + + +def main(): + """Entry point when module is executed directly.""" + run_module() + + +if __name__ == '__main__': + main() diff --git a/roles/wireguard/tasks/keys.yml b/roles/wireguard/tasks/keys.yml index ae4b5fb7..3c37d887 100644 --- a/roles/wireguard/tasks/keys.yml +++ b/roles/wireguard/tasks/keys.yml @@ -1,205 +1,46 @@ --- -# ANSIBLE ASYNC + LOOP PATTERN DOCUMENTATION -# ============================================ -# This file uses a complex async pattern that creates triple-nested data structures. -# When using async + register + loop + async_status + register + loop, the result is: -# -# Step 1: async task with loop -> register creates: { results: [job1, job2] } -# Step 2: async_status with loop -> register creates: { results: [status1, status2] } -# Step 3: Each status contains the original job as nested 'item' field -# -# Access pattern: item.item.item where: -# - First 'item' = current async_status result -# - Second 'item' = original async job object -# - Third 'item' = actual username from original loop -# -# Reference: https://docs.ansible.com/ansible/latest/playbook_guide/playbooks_loops.html -# See also: tests/test-wireguard-real-async.yml for structure debugging - -- name: Delete the lock files - file: - dest: "{{ config_prefix|default('/') }}etc/wireguard/private_{{ item }}.lock" - state: absent - when: keys_clean_all|bool +- name: Generate raw private keys + community.crypto.openssl_privatekey: + type: X25519 + path: "{{ wireguard_pki_path }}/private/{{ item }}.raw" + format: raw + mode: "0600" with_items: - "{{ users }}" - "{{ IP_subject_alt_name }}" -- name: Generate private keys (parallel) - command: wg genkey - register: wg_genkey - args: - creates: "{{ config_prefix|default('/') }}etc/wireguard/private_{{ item }}.lock" - with_items: - - "{{ users }}" - - "{{ IP_subject_alt_name }}" - async: 30 - poll: 0 - # Result structure: wg_genkey.results = [ - # { "ansible_job_id": "j123", "item": "user1", "changed": true }, - # { "ansible_job_id": "j456", "item": "user2", "changed": true } - # ] - -- name: Wait for private key generation to complete - async_status: - jid: "{{ item.ansible_job_id }}" - with_items: "{{ wg_genkey.results }}" - register: wg_genkey_results - until: wg_genkey_results.finished - retries: 15 - delay: 2 - # CRITICAL: Complex nested structure created here! - # wg_genkey_results.results = [ - # { - # "item": { "ansible_job_id": "j123", "item": "user1", "changed": true }, - # "stdout": "actual_private_key_content", - # "finished": 1, "changed": true - # } - # ] - # To access username: item.item.item (triple nested!) - -- name: Display WireGuard private key generation failure details (if any) - debug: - msg: | - WireGuard private key generation failed for user: {{ item.item.item }} - Error details: {{ item.stderr | default('No stderr available') }} - Return code: {{ item.rc | default('Unknown') }} - - Common causes: - - WireGuard tools not properly installed - - Insufficient entropy for key generation - - Permission issues in key directory - - Filesystem full or read-only - - Troubleshooting: - - Verify WireGuard installation: wg --version - - Check entropy: cat /proc/sys/kernel/random/entropy_avail - - Check disk space: df -h /opt/algo - when: item.rc is defined and item.rc != 0 - with_items: "{{ wg_genkey_results.results | default([]) }}" - failed_when: false - -- block: - - name: Save private keys - copy: - dest: "{{ wireguard_pki_path }}/private/{{ item.item.item }}" # See structure docs above - content: "{{ item.stdout }}" - mode: "0600" - no_log: "{{ algo_no_log|bool }}" - when: item.changed - with_items: "{{ wg_genkey_results.results }}" - delegate_to: localhost - become: false - # DATA STRUCTURE EXPLANATION: - # item = current result from wg_genkey_results.results - # item.item = original job object from wg_genkey.results - # item.item.item = actual username from original loop - # item.stdout = the generated private key content - - - name: Touch the lock file - file: - dest: "{{ config_prefix|default('/') }}etc/wireguard/private_{{ item }}.lock" - state: touch - with_items: - - "{{ users }}" - - "{{ IP_subject_alt_name }}" - when: wg_genkey_results.changed - -- name: Delete the preshared lock files - file: - dest: "{{ config_prefix|default('/') }}etc/wireguard/preshared_{{ item }}.lock" - state: absent - when: keys_clean_all|bool +- name: Save base64 encoded private key + copy: + dest: "{{ wireguard_pki_path }}/private/{{ item }}" + content: "{{ lookup('file', wireguard_pki_path + '/private/' + item + '.raw') | b64encode }}" + mode: "0600" with_items: - "{{ users }}" - "{{ IP_subject_alt_name }}" -- name: Generate preshared keys (parallel) - command: wg genpsk - register: wg_genpsk - args: - creates: "{{ config_prefix|default('/') }}etc/wireguard/preshared_{{ item }}.lock" +- name: Generate raw preshared keys + community.crypto.openssl_privatekey: + type: X25519 + path: "{{ wireguard_pki_path }}/preshared/{{ item }}.raw" + format: raw + mode: "0600" with_items: - "{{ users }}" - "{{ IP_subject_alt_name }}" - async: 30 - poll: 0 - # Same structure pattern as private keys above -- name: Wait for preshared key generation to complete - async_status: - jid: "{{ item.ansible_job_id }}" - with_items: "{{ wg_genpsk.results }}" - register: wg_genpsk_results - until: wg_genpsk_results.finished - retries: 15 - delay: 2 - # Creates same triple-nested structure as wg_genkey_results - failed_when: > - wg_genpsk_results.failed or - (wg_genpsk_results.finished and wg_genpsk_results.rc != 0) - -- name: Display WireGuard preshared key generation failure details (if any) - debug: - msg: | - WireGuard preshared key generation failed for user: {{ item.item.item }} - Error details: {{ item.stderr | default('No stderr available') }} - Return code: {{ item.rc | default('Unknown') }} - - Common causes: - - WireGuard tools not properly installed - - Insufficient entropy for key generation - - Permission issues in key directory - - Filesystem full or read-only - - Troubleshooting: - - Verify WireGuard installation: wg --version - - Check entropy: cat /proc/sys/kernel/random/entropy_avail - - Check disk space: df -h /opt/algo - when: item.rc is defined and item.rc != 0 - with_items: "{{ wg_genpsk_results.results | default([]) }}" - failed_when: false - -- block: - - name: Save preshared keys - copy: - dest: "{{ wireguard_pki_path }}/preshared/{{ item.item.item }}" # Triple nested (see docs above) - content: "{{ item.stdout }}" - mode: "0600" - no_log: "{{ algo_no_log|bool }}" - when: item.changed - with_items: "{{ wg_genpsk_results.results }}" - delegate_to: localhost - become: false - - - name: Touch the preshared lock file - file: - dest: "{{ config_prefix|default('/') }}etc/wireguard/preshared_{{ item }}.lock" - state: touch - with_items: - - "{{ users }}" - - "{{ IP_subject_alt_name }}" - when: wg_genpsk_results.changed +- name: Save base64 encoded preshared keys + copy: + dest: "{{ wireguard_pki_path }}/preshared/{{ item }}" + content: "{{ lookup('file', wireguard_pki_path + '/preshared/' + item + '.raw') | b64encode }}" + mode: "0600" + with_items: + - "{{ users }}" + - "{{ IP_subject_alt_name }}" - name: Generate public keys - shell: | - set -o pipefail - echo "{{ lookup('file', wireguard_pki_path + '/private/' + item) }}" | - wg pubkey - register: wg_pubkey - changed_when: false - args: - executable: bash + x25519_pubkey: + private_key_path: "{{ wireguard_pki_path }}/private/{{ item }}.raw" + public_key_path: "{{ wireguard_pki_path }}/public/{{ item }}" with_items: - "{{ users }}" - "{{ IP_subject_alt_name }}" - -- name: Save public keys - copy: - dest: "{{ wireguard_pki_path }}/public/{{ item['item'] }}" - content: "{{ item['stdout'] }}" - mode: "0600" - no_log: "{{ algo_no_log|bool }}" - with_items: "{{ wg_pubkey['results'] }}" - delegate_to: localhost - become: false diff --git a/roles/wireguard/tasks/main.yml b/roles/wireguard/tasks/main.yml index 6241ef01..8f06575b 100644 --- a/roles/wireguard/tasks/main.yml +++ b/roles/wireguard/tasks/main.yml @@ -25,6 +25,8 @@ - name: Generate keys import_tasks: keys.yml + delegate_to: localhost + become: false tags: update-users - block: diff --git a/tests/unit/test_wireguard_key_generation.py b/tests/unit/test_wireguard_key_generation.py new file mode 100644 index 00000000..6c5f9a13 --- /dev/null +++ b/tests/unit/test_wireguard_key_generation.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python3 +""" +Test WireGuard key generation - focused on x25519_pubkey module integration +Addresses test gap identified in tests/README.md line 63-67: WireGuard private/public key generation +""" +import base64 +import os +import subprocess +import sys +import tempfile +import shutil + +# Add library directory to path to import our custom module +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'library')) + + +def test_wireguard_tools_available(): + """Test that WireGuard tools are available for validation""" + try: + result = subprocess.run(['wg', '--version'], capture_output=True, text=True) + assert result.returncode == 0, "WireGuard tools not available" + print(f"✓ WireGuard tools available: {result.stdout.strip()}") + return True + except FileNotFoundError: + print("⚠ WireGuard tools not available - skipping validation tests") + return False + + +def test_x25519_module_import(): + """Test that our custom x25519_pubkey module can be imported and used""" + try: + from x25519_pubkey import run_module + print("✓ x25519_pubkey module imports successfully") + return True + except ImportError as e: + assert False, f"Cannot import x25519_pubkey module: {e}" + + +def generate_test_private_key(): + """Generate a test private key using the same method as Algo""" + with tempfile.NamedTemporaryFile(suffix='.raw', delete=False) as temp_file: + raw_key_path = temp_file.name + + try: + # Generate 32 random bytes for X25519 private key (same as community.crypto does) + import secrets + raw_data = secrets.token_bytes(32) + + # Write raw key to file (like community.crypto openssl_privatekey with format: raw) + with open(raw_key_path, 'wb') as f: + f.write(raw_data) + + assert len(raw_data) == 32, f"Private key should be 32 bytes, got {len(raw_data)}" + + b64_key = base64.b64encode(raw_data).decode() + + print(f"✓ Generated private key (base64): {b64_key[:12]}...") + + return raw_key_path, b64_key + + except Exception: + # Clean up on error + if os.path.exists(raw_key_path): + os.unlink(raw_key_path) + raise + + +def test_x25519_pubkey_from_raw_file(): + """Test our x25519_pubkey module with raw private key file""" + raw_key_path, b64_key = generate_test_private_key() + + try: + # Import here so we can mock the module_utils if needed + from unittest.mock import Mock + + # Mock the AnsibleModule for testing + class MockModule: + def __init__(self, params): + self.params = params + self.result = {} + + def fail_json(self, **kwargs): + raise Exception(f"Module failed: {kwargs}") + + def exit_json(self, **kwargs): + self.result = kwargs + + with tempfile.NamedTemporaryFile(suffix='.pub', delete=False) as temp_pub: + public_key_path = temp_pub.name + + try: + # Test the module logic directly + from x25519_pubkey import run_module + import x25519_pubkey + + original_AnsibleModule = x25519_pubkey.AnsibleModule + + try: + # Mock the module call + mock_module = MockModule({ + 'private_key_path': raw_key_path, + 'public_key_path': public_key_path, + 'private_key_b64': None + }) + + x25519_pubkey.AnsibleModule = lambda **kwargs: mock_module + + # Run the module + run_module() + + # Check the result + assert 'public_key' in mock_module.result + assert mock_module.result['changed'] == True + assert os.path.exists(public_key_path) + + with open(public_key_path, 'r') as f: + derived_pubkey = f.read().strip() + + # Validate base64 format + try: + decoded = base64.b64decode(derived_pubkey, validate=True) + assert len(decoded) == 32, f"Public key should be 32 bytes, got {len(decoded)}" + except Exception as e: + assert False, f"Invalid base64 public key: {e}" + + print(f"✓ Derived public key from raw file: {derived_pubkey[:12]}...") + + return derived_pubkey + + finally: + x25519_pubkey.AnsibleModule = original_AnsibleModule + + finally: + if os.path.exists(public_key_path): + os.unlink(public_key_path) + + finally: + if os.path.exists(raw_key_path): + os.unlink(raw_key_path) + + +def test_x25519_pubkey_from_b64_string(): + """Test our x25519_pubkey module with base64 private key string""" + raw_key_path, b64_key = generate_test_private_key() + + try: + from unittest.mock import Mock + + class MockModule: + def __init__(self, params): + self.params = params + self.result = {} + + def fail_json(self, **kwargs): + raise Exception(f"Module failed: {kwargs}") + + def exit_json(self, **kwargs): + self.result = kwargs + + from x25519_pubkey import run_module + import x25519_pubkey + + original_AnsibleModule = x25519_pubkey.AnsibleModule + + try: + mock_module = MockModule({ + 'private_key_b64': b64_key, + 'private_key_path': None, + 'public_key_path': None + }) + + x25519_pubkey.AnsibleModule = lambda **kwargs: mock_module + + # Run the module + run_module() + + # Check the result + assert 'public_key' in mock_module.result + derived_pubkey = mock_module.result['public_key'] + + # Validate base64 format + try: + decoded = base64.b64decode(derived_pubkey, validate=True) + assert len(decoded) == 32, f"Public key should be 32 bytes, got {len(decoded)}" + except Exception as e: + assert False, f"Invalid base64 public key: {e}" + + print(f"✓ Derived public key from base64 string: {derived_pubkey[:12]}...") + + return derived_pubkey + + finally: + x25519_pubkey.AnsibleModule = original_AnsibleModule + + finally: + if os.path.exists(raw_key_path): + os.unlink(raw_key_path) + + +def test_wireguard_validation(): + """Test that our derived keys work with actual WireGuard tools""" + if not test_wireguard_tools_available(): + return + + # Generate keys using our method + raw_key_path, b64_key = generate_test_private_key() + + try: + # Derive public key using our module + from unittest.mock import Mock + + class MockModule: + def __init__(self, params): + self.params = params + self.result = {} + + def fail_json(self, **kwargs): + raise Exception(f"Module failed: {kwargs}") + + def exit_json(self, **kwargs): + self.result = kwargs + + from x25519_pubkey import run_module + import x25519_pubkey + + original_AnsibleModule = x25519_pubkey.AnsibleModule + + try: + mock_module = MockModule({ + 'private_key_b64': b64_key, + 'private_key_path': None, + 'public_key_path': None + }) + + x25519_pubkey.AnsibleModule = lambda **kwargs: mock_module + run_module() + + derived_pubkey = mock_module.result['public_key'] + + finally: + x25519_pubkey.AnsibleModule = original_AnsibleModule + + with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as temp_config: + # Create a WireGuard config using our keys + wg_config = f"""[Interface] +PrivateKey = {b64_key} +Address = 10.19.49.1/24 + +[Peer] +PublicKey = {derived_pubkey} +AllowedIPs = 10.19.49.2/32 +""" + temp_config.write(wg_config) + config_path = temp_config.name + + try: + # Test that WireGuard can parse our config + result = subprocess.run([ + 'wg-quick', 'strip', config_path + ], capture_output=True, text=True) + + assert result.returncode == 0, f"WireGuard rejected our config: {result.stderr}" + + # Test key derivation with wg pubkey command + wg_result = subprocess.run([ + 'wg', 'pubkey' + ], input=b64_key, capture_output=True, text=True) + + if wg_result.returncode == 0: + wg_derived = wg_result.stdout.strip() + assert wg_derived == derived_pubkey, f"Key mismatch: wg={wg_derived} vs ours={derived_pubkey}" + print(f"✓ WireGuard validation: keys match wg pubkey output") + else: + print(f"⚠ Could not validate with wg pubkey: {wg_result.stderr}") + + print("✓ WireGuard accepts our generated configuration") + + finally: + if os.path.exists(config_path): + os.unlink(config_path) + + finally: + if os.path.exists(raw_key_path): + os.unlink(raw_key_path) + + +def test_key_consistency(): + """Test that the same private key always produces the same public key""" + # Generate one private key to reuse + raw_key_path, b64_key = generate_test_private_key() + + try: + def derive_pubkey_from_same_key(): + from unittest.mock import Mock + + class MockModule: + def __init__(self, params): + self.params = params + self.result = {} + + def fail_json(self, **kwargs): + raise Exception(f"Module failed: {kwargs}") + + def exit_json(self, **kwargs): + self.result = kwargs + + from x25519_pubkey import run_module + import x25519_pubkey + + original_AnsibleModule = x25519_pubkey.AnsibleModule + + try: + mock_module = MockModule({ + 'private_key_b64': b64_key, # SAME key each time + 'private_key_path': None, + 'public_key_path': None + }) + + x25519_pubkey.AnsibleModule = lambda **kwargs: mock_module + run_module() + + return mock_module.result['public_key'] + + finally: + x25519_pubkey.AnsibleModule = original_AnsibleModule + + # Derive public key multiple times from same private key + pubkey1 = derive_pubkey_from_same_key() + pubkey2 = derive_pubkey_from_same_key() + + assert pubkey1 == pubkey2, f"Key derivation not consistent: {pubkey1} vs {pubkey2}" + print("✓ Key derivation is consistent") + + finally: + if os.path.exists(raw_key_path): + os.unlink(raw_key_path) + + +if __name__ == "__main__": + tests = [ + test_x25519_module_import, + test_x25519_pubkey_from_raw_file, + test_x25519_pubkey_from_b64_string, + test_key_consistency, + test_wireguard_validation, + ] + + failed = 0 + for test in tests: + try: + test() + except AssertionError as e: + print(f"✗ {test.__name__} failed: {e}") + failed += 1 + except Exception as e: + print(f"✗ {test.__name__} error: {e}") + failed += 1 + + if failed > 0: + print(f"\n{failed} tests failed") + sys.exit(1) + else: + print(f"\nAll {len(tests)} tests passed!") \ No newline at end of file