#!/usr/bin/python3
import cmd, functools, getpass, os, re, subprocess, sys, time, traceback
try:
import readline
except: pass
from config import *
from lib_autopeer import *
NAME_REGEX = r'^[a-zA-Z][a-zA-Z0-9]{0,8}$'
def as_from_user(which=None):
if which is not None:
try:
which = int(which)
except:
raise Exception('your AS number has to be a number, silly')
user_ases=[]
if not re.match(r'^[a-zA-Z0-9-]+$', MNTNER):
raise Exception('your mntner name has bad characters')
filenames = subprocess.run(['grep', '-l', '-r', '-P', fr'^\s*mnt-by:\s*{MNTNER}\s*$', '/opt/autopeer/dn42-registry/data/aut-num'], capture_output=True, text=True).stdout
for filename in filenames.split('\n'):
if not len(filename):
continue
if (mo := re.match(r'^.*/?AS(\d+)$', filename)):
user_ases.append(int(mo.group(1)))
else:
print(f"oops, something went wrong getting your ASes, specifically: {filename}", file=sys.stderr)
if not user_ases:
raise Exception("you don't mnt any AS?!")
if MY_ASN in user_ases and which: # allow owner to operate as anyone
return which
elif which is None: # at startup, use the first found
return user_ases[0]
elif which in user_ases: # if the user picked an AS in their list
return which
else:
raise Exception('not yours')
def parse(num_args):
def _decorator(f):
@functools.wraps(f)
def real_func(self, arg):
args = arg.split()
if len(args) != num_args:
# TODO: prompt user for each arg that's missing
if f.__doc__:
doc = f.__doc__.split('\n', 1)[0]
else:
doc = '(undocumented args)'
print(f'Error: wrong number of args', file=sys.stderr)
print(f'Expected {num_args}: {doc}', file=sys.stderr)
print(f'Got {len(args)}: {args}', file=sys.stderr)
return
return f(self, *arg.split())
return real_func
return _decorator
USER = getpass.getuser()
MNTNER = USER.upper() + '-MNT'
SELECTED_ASN = as_from_user()
class AutopeerShell(cmd.Cmd):
def preloop(self):
self.intro = f'Welcome to the autopeer shell. Type help or ? to list commands.\nSelected AS: {SELECTED_ASN}'
self.prompt = f'{getpass.getuser()}@{socket.getfqdn()}:{SELECTED_ASN}> '
def postcmd(self, stop, line):
self.prompt = f'{getpass.getuser()}@{socket.getfqdn()}:{SELECTED_ASN}> '
return stop
def onecmd(self, line):
try:
return super().onecmd(line)
except Exception as e:
traceback.print_exc()
return False
def emptyline(self):
return False
@parse(0)
def do_ls(self):
'''(no args)
List your peers'''
curs = DB.execute('SELECT name, endpoint, port, deleted FROM peers WHERE asn=:asn', dict(asn=SELECTED_ASN))
print(f'Active peerings for {SELECTED_ASN}:')
while row := curs.fetchone():
if int(row[3]):
print(f'- {row[0]} (pending deletion)')
else:
print(f'- {row[0]} ({row[1]} {row[2]})')
print()
def help_addpeer(self):
print(self.do_addpeer.__doc__)
me = _get_my_info(SELECTED_ASN)
print(f'''
My endpoint: {me.endpoint}:{me.port}
My ASN: {me.asn}
My Wireguard Public Key: {me.pubkey}
My Tunnel IPv6LL: {me.ipll}
''')
@parse(5)
def do_addpeer(self, name, pubkey, endpoint, port, ipll):
'''<name> <wg pubkey> <wg address> <wg port> <ipv6 link local>
Add a new peer.
<name> is an opaque identifier and must be unique for your ASN
<wg pubkey> is your WireGuard public key
<wg address> is your WireGuard IP or hostname
<wg port> is your WireGuard port
<ipv6 link local> is your WireGuard IPv6LL
Note: if you have a dynamic IP, specify the port as 0 (and some valid garbage for the IP).
The Endpoint (IP and port) will then be omitted from my side, so WireGuard will wait to receive packets from you first.'''
if not re.match(NAME_REGEX, name):
print(f'Error: name must match {NAME_REGEX}', file=sys.stderr)
return
PUBKEY_REGEX = r'^[=a-zA-Z0-9+/-]+$'
if not re.match(PUBKEY_REGEX, pubkey):
print(f'Error: pubkey must match {PUBKEY_REGEX}', file=sys.stderr)
return
ENDPOINT_REGEX = r'^((?#IPv4)[0-9.]+|(?#IPv6)[0-9a-fA-F:]+|(?#DNS)(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]))$'
# (([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]) via https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address
if not re.match(ENDPOINT_REGEX, endpoint):
print(f'Error: wg address must match {ENDPOINT_REGEX}', file=sys.stderr)
return
PORT_REGEX = r'^\d{1,5}$'
if not re.match(PORT_REGEX, port):
print(f'Error: port must match {PORT_REGEX}', file=sys.stderr)
return
IPLL_REGEX = r'^[0-9a-fA-F:]+$'
if not re.match(IPLL_REGEX, ipll):
print(f'Error: ipv6 link local must match {IPLL_REGEX}', file=sys.stderr)
return
if row := DB.execute('SELECT deleted FROM peers WHERE asn=:asn AND name=:name', dict(asn=SELECTED_ASN, name=name)).fetchone():
if int(row[0]):
print(f'Error: {name} already exists and is still pending deletion. Try again in 5 minutes.', file=sys.stderr)
else:
print(f'Error: {name} already exists', file=sys.stderr)
return
sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.getfqdn()}] Created new peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True)
try:
curs = DB.execute(
'INSERT INTO peers (name, asn, pubkey, endpoint, port, ipll, creator_ip, creator_name, creator_date, deleted) VALUES (:name, :asn, :pubkey, :endpoint, :port, :ipll, :creator_ip, :creator_name, :creator_date, 0)',
dict(name=name, asn=SELECTED_ASN, pubkey=pubkey, endpoint=endpoint, port=port, ipll=ipll, creator_ip=os.getenv('SSH_CONNECTION'), creator_name=USER, creator_date=time.time())
)
except sqlite3.IntegrityError as e:
print("Something went wrong! Maybe try using a name you haven't used already...", file=sys.stderr)
return
if not curs.rowcount:
print("Something went wrong! The database didn't give us an error but no row was inserted!", file=sys.stderr)
return
print('🎉')
print()
self.do_showpeer(name)
print()
self.do_showtemplates(name)
print()
print()
@parse(1)
def do_delpeer(self, name):
'''<name>
Delete your peering.
Note: The deletion will be processed the next time the cronjob runs, at which time the name will become available.'''
sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.getfqdn()}] Deleted peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True)
curs = DB.execute(
'UPDATE peers SET deleted = 1 WHERE name = :name AND asn = :asn AND deleted = 0',
dict(name=name, asn=SELECTED_ASN)
)
if curs.rowcount:
print(f'Deleted "{name}"')
else:
print(f'"{name}" (AS {SELECTED_ASN}) not found', file=sys.stderr)
@parse(1)
def do_showpeer(self, name):
'''<name>
Show textual info about a peering'''
me = _get_my_info(SELECTED_ASN)
you = _get_peer_info(name, SELECTED_ASN)
print(f'''
Link name: {name}
My endpoint: {me.endpoint}:{me.port}
My ASN: {me.asn}
My Wireguard Public Key: {me.pubkey}
My Tunnel IPv6LL: {me.ipll}
Your endpoint: {you.endpoint}:{you.port}
Your ASN: {you.asn}
Your Wireguard Public Key: {you.pubkey}
Your Tunnel IPv6LL: {you.ipll}
''')
@parse(1)
def do_showtemplates(self, name):
'''<name>
Show basic config templates for your side'''
print('### USE THESE CONFIGS ON YOUR NODE')
print(_bird_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN)))
print(_wg_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN)))
@parse(1)
def do_showconf(self, name):
'''<name>
Show my side config
'''
print('### MY SIDE CONFIGS')
print(_bird_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN)))
print(_wg_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN)))
@parse(1)
def do_as(self, asn):
'''<as#>
Select another AS you own'''
global SELECTED_ASN
try:
SELECTED_ASN = as_from_user(asn)
except:
print("No way! Try an AS you're mntner of!", file=sys.stderr)
@parse(0)
def do_birdc(self):
'''Run birdc'''
os.system('/usr/sbin/birdc -r')
@parse(1)
def do_stat(self, name):
'''<name>
Show link stats'''
if not re.match(NAME_REGEX, name):
print(f'Error: name must match {NAME_REGEX}', file=sys.stderr)
return
os.system(f'ip stat show dev wg{SELECTED_ASN%10000:04}{name} group link')
@parse(0)
def do_exit(self):
return True
def do_EOF(self, arg):
print("^D")
return True
@parse(0)
def do_reload(self):
os.execvp(sys.argv[0], sys.argv)
@parse(0)
def do_version(self):
print("autopeer version:", end=' ')
sys.stdout.flush()
os.system("git -c safe.directory=/opt/autopeer -C /opt/autopeer/ describe --always")
print("by steering7253 https://steering.dn42 https://st33ri.ng")
print("Source available at https://cgit.dn42/steering/autopeer.git/ https://cgit.space/steering/autopeer.git/")
if __name__ == '__main__':
shell = AutopeerShell()
command = os.getenv('SSH_ORIGINAL_COMMAND', '')
if len(command):
sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.getfqdn()}] {USER} ran {command!r} via {os.getenv('SSH_CONNECTION')}", text=True)
shell.onecmd(command)
else:
sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.getfqdn()}] {USER} logged in via {os.getenv('SSH_CONNECTION')}", text=True)
shell.cmdloop()