129 lines
4.3 KiB
Python
129 lines
4.3 KiB
Python
import base64
|
|
import itertools
|
|
import secrets
|
|
from dataclasses import dataclass
|
|
from ipaddress import IPv4Network, IPv4Address
|
|
from typing import List, Optional, Dict, Iterator, Tuple
|
|
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
|
|
|
|
|
|
@dataclass
|
|
class MeshNodeConfig:
|
|
"""
|
|
Holds the config for a wireguard mesh node and relevant information.
|
|
Can also render the actual wireguard config into a string that can then be written out into a file.
|
|
"""
|
|
|
|
name: str
|
|
ip: str
|
|
peers: List['MeshNodeConfig'] = ()
|
|
private_key: str = base64.b64encode(
|
|
X25519PrivateKey.generate().private_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PrivateFormat.Raw,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
).decode()
|
|
public_key: str = base64.b64encode(
|
|
X25519PrivateKey.from_private_bytes(
|
|
base64.b64decode(
|
|
private_key.encode()
|
|
)
|
|
).public_key().public_bytes(
|
|
encoding=serialization.Encoding.Raw,
|
|
format=serialization.PublicFormat.Raw,
|
|
)
|
|
).decode()
|
|
psk: Optional[str] = None
|
|
endpoint: Optional[str] = None
|
|
listen_port: int = 51515
|
|
|
|
@property
|
|
def config(self) -> str:
|
|
"""
|
|
render a wireguard config file based on the variables set in this class.
|
|
:return: wireguard config as string
|
|
"""
|
|
|
|
config = '[Interface]\n' + \
|
|
f'Address = {self.ip}\n' + \
|
|
f'PrivateKey = {self.private_key}\n' + \
|
|
f'ListenPort = {self.listen_port}\n\n'
|
|
|
|
for peer in self.peers:
|
|
config += '[Peer]\n' + \
|
|
(f'Endpoint: {peer.endpoint}\n' if peer.endpoint is not None else '') + \
|
|
(f'PresharedKey: {peer.psk}\n' if peer.psk is not None else '') + \
|
|
f'PublicKey = {peer.public_key}\n' + \
|
|
f'AllowedIPs = {peer.ip}/32\n' + \
|
|
f'PersistentKeepalive = 25\n\n'
|
|
|
|
return config
|
|
|
|
|
|
def create_mesh(node_count=3,
|
|
ip_pool: Iterator[IPv4Address] = IPv4Network('10.0.0.0/24').hosts(),
|
|
) -> Dict[str, MeshNodeConfig]:
|
|
"""
|
|
Create a number of wireguard mesh configs,
|
|
that can then be rolled out on machines to use as a wireguard mesh network.
|
|
|
|
:param ip_pool: Iterator over IPv4Address objects to draw IP Addresses from
|
|
:param node_count: number of configs to generate
|
|
:return: a dict with names of instances as keys and their MeshNodeConfig objects as corresponding values
|
|
"""
|
|
|
|
node_names = (
|
|
f'node-{idx}' for idx in range(1, node_count + 1)
|
|
)
|
|
psk = base64.b64encode(secrets.token_bytes(32)).decode()
|
|
|
|
nodes = dict(
|
|
map(
|
|
_create_mesh_node,
|
|
node_names,
|
|
itertools.repeat(ip_pool),
|
|
itertools.repeat(psk)
|
|
)
|
|
)
|
|
|
|
_connect_nodes(nodes)
|
|
|
|
return nodes
|
|
|
|
|
|
# TODO this should take a graph / adjacency list as argument and set peers based on that.
|
|
# currently all nodes are connected to all other nodes, which scales with O(n!) and
|
|
# takes significant amounts of time and compute for even small n < 10.
|
|
def _connect_nodes(nodes: Dict[str, MeshNodeConfig]) -> None:
|
|
"""
|
|
This function implements the actual entanglement of nodes and thus decides for implementing
|
|
"who is able to talk to whom" rules within the wireguard mesh network.
|
|
:param nodes: dict with names of instances as keys and their MeshNodeConfig objects as corresponding values
|
|
:return: nothing
|
|
"""
|
|
for src, dst in itertools.permutations(nodes.keys(), 2):
|
|
nodes[dst].peers.append(nodes[src])
|
|
|
|
|
|
def _create_mesh_node(
|
|
node_name: str,
|
|
ip_pool: Iterator[IPv4Address],
|
|
psk: Optional[str] = None
|
|
) -> Tuple[str, MeshNodeConfig]:
|
|
"""
|
|
Create a single mesh node config
|
|
:param node_name: name of the node
|
|
:param ip_pool: iterator over IPv4Address objects to draw an IPv4Address for this node from
|
|
:param psk: preshared key as string. Optional.
|
|
:return: a tuple with name of the new node as first and the MeshNodeConfig as second element
|
|
"""
|
|
return node_name, MeshNodeConfig(
|
|
name=node_name,
|
|
peers=[],
|
|
ip=str(next(ip_pool)),
|
|
psk=psk
|
|
)
|