#!/usr/bin/python3 import cmd, functools, getpass, os, re, subprocess, sys, time, traceback try: import readline except: pass from config import * from lib_autopeer import * NAME_REGEX = r'^[a-zA-Z][a-zA-Z0-9]{0,8}$' def as_from_user(which=None): if which is not None: try: which = int(which) except: raise Exception('your AS number has to be a number, silly') user_ases=[] if not re.match(r'^[a-zA-Z0-9-]+$', MNTNER): raise Exception('your mntner name has bad characters') filenames = subprocess.run(['grep', '-l', '-r', '-P', fr'^\s*mnt-by:\s*{MNTNER}\s*$', '/opt/autopeer/dn42-registry/data/aut-num'], capture_output=True, text=True).stdout for filename in filenames.split('\n'): if not len(filename): continue if (mo := re.match(r'^.*/?AS(\d+)$', filename)): user_ases.append(int(mo.group(1))) else: print(f"oops, something went wrong getting your ASes, specifically: {filename}", file=sys.stderr) if MY_ASN in user_ases and which: # allow owner to operate as anyone return which elif which is None: # at startup, use the first found return user_ases[0] elif which in user_ases: # if the user picked an AS in their list return which else: raise Exception('not yours') def parse(num_args): def _decorator(f): @functools.wraps(f) def real_func(self, arg): args = arg.split() if len(args) != num_args: # TODO: prompt user for each arg that's missing if f.__doc__: doc = f.__doc__.split('\n', 1)[0] else: doc = '(undocumented args)' print(f'Error: wrong number of args', file=sys.stderr) print(f'Expected {num_args}: {doc}', file=sys.stderr) print(f'Got {len(args)}: {args}', file=sys.stderr) return return f(self, *arg.split()) return real_func return _decorator USER = getpass.getuser() MNTNER = USER.upper() + '-MNT' SELECTED_ASN = as_from_user() class AutopeerShell(cmd.Cmd): def preloop(self): self.intro = f'Welcome to the autopeer shell. Type help or ? to list commands.\nSelected AS: {SELECTED_ASN}' self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> ' def postcmd(self, stop, line): self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> ' return stop def onecmd(self, line): try: return super().onecmd(line) except Exception as e: traceback.print_exc() return False def emptyline(self): return False @parse(0) def do_ls(self): '''(no args) List your peers''' curs = DB.execute('SELECT name, endpoint, port FROM peers WHERE asn=:asn', dict(asn=SELECTED_ASN)) print(f'Active peerings for {SELECTED_ASN}:') while row := curs.fetchone(): print(f'- {row[0]} ({row[1]} {row[2]})') print() def help_addpeer(self): print(self.do_addpeer.__doc__) me = _get_my_info(SELECTED_ASN) print(f''' My endpoint: {me.endpoint}:{me.port} My ASN: {me.asn} My Wireguard Public Key: {me.pubkey} My Tunnel IPv6LL: {me.ipll} ''') @parse(5) def do_addpeer(self, name, pubkey, endpoint, port, ipll): ''' Add a new peer''' if not re.match(NAME_REGEX, name): print(f'Error: name must match {NAME_REGEX}', file=sys.stderr) return PUBKEY_REGEX = r'^[=a-zA-Z0-9+/-]+$' if not re.match(PUBKEY_REGEX, pubkey): print(f'Error: pubkey must match {PUBKEY_REGEX}', file=sys.stderr) return ENDPOINT_REGEX = r'^((?#IPv4)[0-9.]+|(?#IPv6)[0-9a-fA-F:]+|(?#DNS)(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]))$' # (([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]) via https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address if not re.match(ENDPOINT_REGEX, endpoint): print(f'Error: wg address must match {ENDPOINT_REGEX}', file=sys.stderr) return PORT_REGEX = r'^\d{1,5}$' if not re.match(PORT_REGEX, port): print(f'Error: port must match {PORT_REGEX}', file=sys.stderr) return IPLL_REGEX = r'^[0-9a-fA-F:]+$' if not re.match(IPLL_REGEX, ipll): print(f'Error: ipv6 link local must match {IPLL_REGEX}', file=sys.stderr) return sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] Created new peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True) try: curs = DB.execute( 'INSERT INTO peers (name, asn, pubkey, endpoint, port, ipll, creator_ip, creator_name, creator_date, deleted) VALUES (:name, :asn, :pubkey, :endpoint, :port, :ipll, :creator_ip, :creator_name, :creator_date, 0)', dict(name=name, asn=SELECTED_ASN, pubkey=pubkey, endpoint=endpoint, port=port, ipll=ipll, creator_ip=os.getenv('SSH_CONNECTION'), creator_name=USER, creator_date=time.time()) ) except sqlite3.IntegrityError as e: print("Something went wrong! Maybe try using a name you haven't used already...", file=sys.stderr) return if not curs.rowcount: print("Something went wrong! The database didn't give us an error but no row was inserted!", file=sys.stderr) return print('🎉') print() self.do_showpeer(name) print() self.do_showtemplates(name) print() print() @parse(1) def do_delpeer(self, name): ''' Delete your peering''' sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] Deleted peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True) curs = DB.execute( 'UPDATE peers SET deleted = 1 WHERE name = :name AND asn = :asn AND deleted = 0', dict(name=name, asn=SELECTED_ASN) ) if curs.rowcount: print(f'Deleted "{name}"') else: print(f'"{name}" (AS {SELECTED_ASN}) not found', file=sys.stderr) @parse(1) def do_showpeer(self, name): ''' Show textual info about a peering''' me = _get_my_info(SELECTED_ASN) you = _get_peer_info(name, SELECTED_ASN) print(f''' Link name: {name} My endpoint: {me.endpoint}:{me.port} My ASN: {me.asn} My Wireguard Public Key: {me.pubkey} My Tunnel IPv6LL: {me.ipll} Your endpoint: {you.endpoint}:{you.port} Your ASN: {you.asn} Your Wireguard Public Key: {you.pubkey} Your Tunnel IPv6LL: {you.ipll} ''') @parse(1) def do_showtemplates(self, name): ''' Show basic config templates for your side''' print(_bird_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN))) print(_wg_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN))) @parse(1) def do_showconf(self, name): ''' Show my side config ''' print(_bird_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN))) print(_wg_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN))) @parse(1) def do_as(self, asn): ''' Select another AS you own''' global SELECTED_ASN try: SELECTED_ASN = as_from_user(asn) except: print("No way! Try an AS you're mntner of!", file=sys.stderr) @parse(0) def do_birdc(self): '''Run birdc''' os.system('/usr/sbin/birdc -r') @parse(1) def do_stat(self, name): ''' Show link stats''' if not re.match(NAME_REGEX, name): print(f'Error: name must match {NAME_REGEX}', file=sys.stderr) return os.system(f'ip stat show dev wg{SELECTED_ASN%10000:04}{name} group link') @parse(0) def do_exit(self): return True def do_EOF(self, arg): print("^D") return True @parse(0) def do_reload(self): os.execvp(sys.argv[0], sys.argv) @parse(0) def do_version(self): print("autopeer version:") os.system("git -c safe.directory=/opt/autopeer -C /opt/autopeer/ describe --always") print("by steering7253 https://steering.dn42 https://st33ri.ng") if __name__ == '__main__': shell = AutopeerShell() command = os.getenv('SSH_ORIGINAL_COMMAND', '') if len(command): sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] {USER} ran {command} via {os.getenv('SSH_CONNECTION')}", text=True) shell.onecmd(command) else: sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] {USER} logged in via {os.getenv('SSH_CONNECTION')}", text=True) shell.cmdloop()