summaryrefslogtreecommitdiff
path: root/autopeer_shell.py
blob: 4fe30b463f3c2dd70e30e1287fcc2bf96b5e66fe (about) (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#!/usr/bin/python3

import cmd, functools, getpass, os, re, subprocess, sys, time, traceback
try:
	import readline
except: pass

from config import *
from lib_autopeer import *

NAME_REGEX = r'^[a-zA-Z][a-zA-Z0-9]{0,8}$'

def as_from_user(which=None):
	if which is not None:
		try:
			which = int(which)
		except:
			raise Exception('your AS number has to be a number, silly')

	user_ases=[]
	if not re.match(r'^[a-zA-Z0-9-]+$', MNTNER):
		raise Exception('your mntner name has bad characters')
	filenames = subprocess.run(['grep', '-l', '-r', '-P', fr'^\s*mnt-by:\s*{MNTNER}\s*$', '/opt/autopeer/dn42-registry/data/aut-num'], capture_output=True, text=True).stdout
	for filename in filenames.split('\n'):
		if not len(filename):
			continue
		if (mo := re.match(r'^.*/?AS(\d+)$', filename)):
			user_ases.append(int(mo.group(1)))
		else:
			print(f"oops, something went wrong getting your ASes, specifically: {filename}", file=sys.stderr)

	if MY_ASN in user_ases and which: # allow owner to operate as anyone
		return which
	elif which is None: # at startup, use the first found
		return user_ases[0]
	elif which in user_ases: # if the user picked an AS in their list
		return which
	else:
		raise Exception('not yours')

def parse(num_args):
	def _decorator(f):
		@functools.wraps(f)
		def real_func(self, arg):
			args = arg.split()
			if len(args) != num_args:
				# TODO: prompt user for each arg that's missing
				if f.__doc__:
					doc = f.__doc__.split('\n', 1)[0]
				else:
					doc = '(undocumented args)'
				print(f'Error: wrong number of args', file=sys.stderr)
				print(f'Expected {num_args}: {doc}', file=sys.stderr)
				print(f'Got {len(args)}: {args}', file=sys.stderr)
				return
			return f(self, *arg.split())
		return real_func
	return _decorator

USER = getpass.getuser()
MNTNER = USER.upper() + '-MNT'
SELECTED_ASN = as_from_user()

class AutopeerShell(cmd.Cmd):
	def preloop(self):
		self.intro = f'Welcome to the autopeer shell.   Type help or ? to list commands.\nSelected AS: {SELECTED_ASN}'
		self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> '

	def postcmd(self, stop, line):
		self.prompt = f'{getpass.getuser()}@{socket.gethostname()}:{SELECTED_ASN}> '
		return stop

	def onecmd(self, line):
		try:
			return super().onecmd(line)
		except Exception as e:
			traceback.print_exc()
			return False

	def emptyline(self):
		return False

	@parse(0)
	def do_ls(self):
		'''(no args)
			List your peers'''
		curs = DB.execute('SELECT name, endpoint, port FROM peers WHERE asn=:asn', dict(asn=SELECTED_ASN))
		print(f'Active peerings for {SELECTED_ASN}:')
		while row := curs.fetchone():
			print(f'- {row[0]} ({row[1]} {row[2]})')
		print()

	def help_addpeer(self):
		print(self.do_addpeer.__doc__)
		me = _get_my_info(SELECTED_ASN)
		print(f'''
My endpoint: {me.endpoint}:{me.port}
My ASN: {me.asn}
My Wireguard Public Key: {me.pubkey}
My Tunnel IPv6LL: {me.ipll}
			''')

	@parse(5)
	def do_addpeer(self, name, pubkey, endpoint, port, ipll):
		'''<name> <wg pubkey> <wg address> <wg port> <ipv6 link local>
			Add a new peer'''
		if not re.match(NAME_REGEX, name):
			print(f'Error: name must match {NAME_REGEX}', file=sys.stderr)
			return
		PUBKEY_REGEX = r'^[=a-zA-Z0-9+/-]+$'
		if not re.match(PUBKEY_REGEX, pubkey):
			print(f'Error: pubkey must match {PUBKEY_REGEX}', file=sys.stderr)
			return
		ENDPOINT_REGEX = r'^((?#IPv4)[0-9.]+|(?#IPv6)[0-9a-fA-F:]+|(?#DNS)(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]))$'
		# (([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]) via https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address
		if not re.match(ENDPOINT_REGEX, endpoint):
			print(f'Error: wg address must match {ENDPOINT_REGEX}', file=sys.stderr)
			return
		PORT_REGEX = r'^\d{1,5}$'
		if not re.match(PORT_REGEX, port):
			print(f'Error: port must match {PORT_REGEX}', file=sys.stderr)
			return
		IPLL_REGEX = r'^[0-9a-fA-F:]+$'
		if not re.match(IPLL_REGEX, ipll):
			print(f'Error: ipv6 link local must match {IPLL_REGEX}', file=sys.stderr)
			return

		sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] Created new peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True)

		try:
			curs = DB.execute(
				'INSERT INTO peers (name, asn, pubkey, endpoint, port, ipll, creator_ip, creator_name, creator_date, deleted) VALUES (:name, :asn, :pubkey, :endpoint, :port, :ipll, :creator_ip, :creator_name, :creator_date, 0)',
				dict(name=name, asn=SELECTED_ASN, pubkey=pubkey, endpoint=endpoint, port=port, ipll=ipll, creator_ip=os.getenv('SSH_CONNECTION'), creator_name=USER, creator_date=time.time())
			)
		except sqlite3.IntegrityError as e:
			print("Something went wrong! Maybe try using a name you haven't used already...", file=sys.stderr)
			return
		if not curs.rowcount:
			print("Something went wrong! The database didn't give us an error but no row was inserted!", file=sys.stderr)
			return
		print('🎉')
		print()
		self.do_showpeer(name)
		print()
		self.do_showtemplates(name)
		print()
		print()

	@parse(1)
	def do_delpeer(self, name):
		'''<name>
			Delete your peering'''
		sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] Deleted peering {name!r} for AS{SELECTED_ASN} by {USER}", text=True)
		curs = DB.execute(
			'UPDATE peers SET deleted = 1 WHERE name = :name AND asn = :asn AND deleted = 0',
			dict(name=name, asn=SELECTED_ASN)
		)
		if curs.rowcount:
			print(f'Deleted "{name}"')
		else:
			print(f'"{name}" (AS {SELECTED_ASN}) not found', file=sys.stderr)

	@parse(1)
	def do_showpeer(self, name):
		'''<name>
			Show textual info about a peering'''
		me = _get_my_info(SELECTED_ASN)
		you = _get_peer_info(name, SELECTED_ASN)
		print(f'''
Link name: {name}

My endpoint: {me.endpoint}:{me.port}
My ASN: {me.asn}
My Wireguard Public Key: {me.pubkey}
My Tunnel IPv6LL: {me.ipll}

Your endpoint: {you.endpoint}:{you.port}
Your ASN: {you.asn}
Your Wireguard Public Key: {you.pubkey}
Your Tunnel IPv6LL: {you.ipll}
		''')

	@parse(1)
	def do_showtemplates(self, name):
		'''<name>
			Show basic config templates for your side'''
		print(_bird_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN)))
		print(_wg_config(name, _get_my_info(SELECTED_ASN), _get_peer_info(name, SELECTED_ASN)))

	@parse(1)
	def do_showconf(self, name):
		'''<name>
			Show my side config
		'''
		print(_bird_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN)))
		print(_wg_config(name, _get_peer_info(name, SELECTED_ASN), _get_my_info(SELECTED_ASN)))

	@parse(1)
	def do_as(self, asn):
		'''<as#>
			Select another AS you own'''
		global SELECTED_ASN
		try:
			SELECTED_ASN = as_from_user(asn)
		except:
			print("No way! Try an AS you're mntner of!", file=sys.stderr)

	@parse(0)
	def do_birdc(self):
		'''Run birdc'''
		os.system('/usr/sbin/birdc -r')

	@parse(1)
	def do_stat(self, name):
		'''<name>
			Show link stats'''
		if not re.match(NAME_REGEX, name):
			print(f'Error: name must match {NAME_REGEX}', file=sys.stderr)
			return
		os.system(f'ip stat show dev wg{SELECTED_ASN%10000:04}{name} group link')

	@parse(0)
	def do_exit(self):
		return True

	def do_EOF(self, arg):
		print("^D")
		return True

	@parse(0)
	def do_reload(self):
		os.execvp(sys.argv[0], sys.argv)

	@parse(0)
	def do_version(self):
		print("autopeer version:")
		os.system("git -c safe.directory=/opt/autopeer -C /opt/autopeer/ describe --always")
		print("by steering7253 https://steering.dn42 https://st33ri.ng")



if __name__ == '__main__':
	shell = AutopeerShell()
	command = os.getenv('SSH_ORIGINAL_COMMAND', '')
	if len(command):
		sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] {USER} ran {command} via {os.getenv('SSH_CONNECTION')}", text=True)
		shell.onecmd(command)
	else:
		sp = subprocess.run(['socat', 'stdio', NOTIFY_TO], input=f"[autopeer {socket.gethostname()}] {USER} logged in via {os.getenv('SSH_CONNECTION')}", text=True)
		shell.cmdloop()