wg-mesh-test/wireguard_mesh.py
2021-09-05 23:12:56 +02:00

129 lines
4.3 KiB
Python

import base64
import itertools
import secrets
from dataclasses import dataclass
from ipaddress import IPv4Network, IPv4Address
from typing import List, Optional, Dict, Iterator, Tuple
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
@dataclass
class MeshNodeConfig:
"""
Holds the config for a wireguard mesh node and relevant information.
Can also render the actual wireguard config into a string that can then be written out into a file.
"""
name: str
ip: str
peers: List['MeshNodeConfig'] = ()
private_key: str = base64.b64encode(
X25519PrivateKey.generate().private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
).decode()
public_key: str = base64.b64encode(
X25519PrivateKey.from_private_bytes(
base64.b64decode(
private_key.encode()
)
).public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
).decode()
psk: Optional[str] = None
endpoint: Optional[str] = None
listen_port: int = 51515
@property
def config(self) -> str:
"""
render a wireguard config file based on the variables set in this class.
:return: wireguard config as string
"""
config = '[Interface]\n' + \
f'Address = {self.ip}\n' + \
f'PrivateKey = {self.private_key}\n' + \
f'ListenPort = {self.listen_port}\n\n'
for peer in self.peers:
config += '[Peer]\n' + \
(f'Endpoint: {peer.endpoint}\n' if peer.endpoint is not None else '') + \
(f'PresharedKey: {peer.psk}\n' if peer.psk is not None else '') + \
f'PublicKey = {peer.public_key}\n' + \
f'AllowedIPs = {peer.ip}/32\n' + \
f'PersistentKeepalive = 25\n\n'
return config
def create_mesh(node_count=3,
ip_pool: Iterator[IPv4Address] = IPv4Network('10.0.0.0/24').hosts(),
) -> Dict[str, MeshNodeConfig]:
"""
Create a number of wireguard mesh configs,
that can then be rolled out on machines to use as a wireguard mesh network.
:param ip_pool: Iterator over IPv4Address objects to draw IP Addresses from
:param node_count: number of configs to generate
:return: a dict with names of instances as keys and their MeshNodeConfig objects as corresponding values
"""
node_names = (
f'node-{idx}' for idx in range(1, node_count + 1)
)
psk = base64.b64encode(secrets.token_bytes(32)).decode()
nodes = dict(
map(
_create_mesh_node,
node_names,
itertools.repeat(ip_pool),
itertools.repeat(psk)
)
)
_connect_nodes(nodes)
return nodes
# TODO this should take a graph / adjacency list as argument and set peers based on that.
# currently all nodes are connected to all other nodes, which scales with O(n!) and
# takes significant amounts of time and compute for even small n < 10.
def _connect_nodes(nodes: Dict[str, MeshNodeConfig]) -> None:
"""
This function implements the actual entanglement of nodes and thus decides for implementing
"who is able to talk to whom" rules within the wireguard mesh network.
:param nodes: dict with names of instances as keys and their MeshNodeConfig objects as corresponding values
:return: nothing
"""
for src, dst in itertools.permutations(nodes.keys(), 2):
nodes[dst].peers.append(nodes[src])
def _create_mesh_node(
node_name: str,
ip_pool: Iterator[IPv4Address],
psk: Optional[str] = None
) -> Tuple[str, MeshNodeConfig]:
"""
Create a single mesh node config
:param node_name: name of the node
:param ip_pool: iterator over IPv4Address objects to draw an IPv4Address for this node from
:param psk: preshared key as string. Optional.
:return: a tuple with name of the new node as first and the MeshNodeConfig as second element
"""
return node_name, MeshNodeConfig(
name=node_name,
peers=[],
ip=str(next(ip_pool)),
psk=psk
)