__all__ = 'Peerdata _get_peer_info _get_my_info _bird_config _wg_config'.split() from config import * from collections import namedtuple Peerdata = namedtuple('Peerdata', 'asn ipll endpoint port pubkey privkey', defaults=(None,)) def _get_peer_info(name, asn): pass #TODO curs = DB.execute('SELECT asn, ipll, endpoint, port, pubkey FROM peers WHERE name=:name AND asn=:asn', dict(name=name, asn=asn)) row = curs.fetchone() if row is not None: return Peerdata(*row) else: raise Exception(f'{asn}-{name} not found in database') def _get_my_info(asn): return Peerdata( asn=MY_ASN, # TODO: better port logic port=((int(asn) % 100000)), ipll=MY_IPLL, endpoint=MY_ENDPOINT, pubkey=MY_PUBKEY, privkey=MY_PRIVKEY ) def _bird_config(name, far_side, local_side): if local_side.asn == MY_ASN: filter = '9,25,34' else: filter = '' return f''' protocol bgp as{far_side.asn}{name} from dnpeers {{ enable extended messages on; neighbor {far_side.ipll} as {far_side.asn}; ipv4 {{ extended next hop on; import where dn42_import_filter({filter}); export where dn42_export_filter({filter}); }}; ipv6 {{ import where dn42_import_filter({filter}); export where dn42_export_filter({filter}); }}; interface "wg{far_side.asn%10000:04}{name}"; }}; ''' def _wg_config(name, far_side, local_side): if local_side.privkey is not None: privkey = local_side.privkey else: privkey = '' return f''' [Interface] Address = {local_side.ipll}/128 Table = off PostUp = /sbin/ip addr replace dev %i {local_side.ipll}/128 peer {far_side.ipll}/128 ListenPort = {local_side.port} PrivateKey = {privkey} [Peer] PublicKey = {far_side.pubkey} AllowedIPs = fd00::/8, 172.20.0.0/14, fe80::/64, 10.0.0.0/8, 172.31.0.0/16 Endpoint = {far_side.endpoint}:{far_side.port} '''