aboutsummaryrefslogtreecommitdiff
path: root/modules/ircv3_sasl
diff options
context:
space:
mode:
authorGravatar jesopo2019-06-03 12:44:04 +0100
committerGravatar jesopo2019-06-03 12:44:04 +0100
commit9a8b345c53e852d7092197cee084d0d3c02bc0ff (patch)
tree408c6833c2f4de7198c354043c8ca265c0616901 /modules/ircv3_sasl
parentCheck from_self, not if target==is_own_nickname, use from_self when adding to (diff)
signature
Prefix names for all IRCv3 modules with "ircv3_"
Diffstat (limited to 'modules/ircv3_sasl')
-rw-r--r--modules/ircv3_sasl/README.md46
-rw-r--r--modules/ircv3_sasl/__init__.py147
-rw-r--r--modules/ircv3_sasl/scram.py130
3 files changed, 323 insertions, 0 deletions
diff --git a/modules/ircv3_sasl/README.md b/modules/ircv3_sasl/README.md
new file mode 100644
index 00000000..30a51e08
--- /dev/null
+++ b/modules/ircv3_sasl/README.md
@@ -0,0 +1,46 @@
+# Configuring SASL
+
+You can either configure SASL through `!serverset sasl` from an registered and identified admin account or directly through sqlite.
+
+## USERPASS Mechanism
+
+BitBot supports a special SASL mechanism name: `USERPASS`. This internally
+represents "pick the strongest username:password algorithm"
+
+## !serverset sasl
+
+These commands are to be executed from a registered admin account
+
+#### USERPASS
+> !serverset sasl userpass <username>:<password>
+
+#### PLAIN
+> !serverset sasl plain <username>:<password>
+
+#### SCRAM-SHA-1
+> !serverset sasl scram-sha-1 <username>:<password>
+
+#### SCRAM-SHA-256
+> !serverset sasl scram-sha-256 <username>:<password>
+
+#### EXTERNAL
+> !serverset sasl external
+
+## sqlite
+
+Execute these against the current bot database file (e.g. `$ sqlite3 databases/bot.db`)
+
+#### USERPASS
+> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "userpass", "args": "<username>:<password>"}');
+
+#### PLAIN
+> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "plain", "args": "<username>:<password>"}');
+
+#### SCRAM-SHA-1
+> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-1", "args": "<username>:<password>"}');
+
+#### SCRAM-SHA-256
+> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-256", "args": "<username>:<password>"}');
+
+#### external
+> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "external"}');
diff --git a/modules/ircv3_sasl/__init__.py b/modules/ircv3_sasl/__init__.py
new file mode 100644
index 00000000..b62309a6
--- /dev/null
+++ b/modules/ircv3_sasl/__init__.py
@@ -0,0 +1,147 @@
+#--depends-on config
+
+import base64, hashlib, hmac, uuid
+from src import ModuleManager, utils
+from . import scram
+
+CAP = utils.irc.Capability("sasl")
+
+USERPASS_MECHANISMS = [
+ "SCRAM-SHA-512",
+ "SCRAM-SHA-256",
+ "SCRAM-SHA-1",
+ "PLAIN"
+]
+
+def _validate(s):
+ mechanism, _, arguments = s.partition(" ")
+ return {"mechanism": mechanism, "args": arguments}
+
+@utils.export("serverset", {"setting": "sasl",
+ "help": "Set the sasl username/password for this server",
+ "validate": _validate, "example": "PLAIN BitBot:hunder2"})
+class Module(ModuleManager.BaseModule):
+ def _best_userpass_mechanism(self, mechanisms):
+ for potential_mechanism in USERPASS_MECHANISMS:
+ if potential_mechanism in mechanisms:
+ return potential_mechanism
+
+ @utils.hook("received.cap.new")
+ @utils.hook("received.cap.ls")
+ def on_cap(self, event):
+ has_sasl = "sasl" in event["capabilities"]
+ our_sasl = event["server"].get_setting("sasl", None)
+
+ do_sasl = False
+ if has_sasl and our_sasl:
+ if not event["capabilities"]["sasl"] == None:
+ our_mechanism = our_sasl["mechanism"].upper()
+ server_mechanisms = event["capabilities"]["sasl"].split(",")
+ if our_mechanism == "USERPASS":
+ our_mechanism = self._best_userpass_mechanism(
+ server_mechanisms)
+ do_sasl = our_mechanism in server_mechanisms
+ else:
+ do_sasl = True
+
+ if do_sasl:
+ cap = CAP.copy()
+ cap.on_ack(lambda: self._sasl_ack(event["server"]))
+ return cap
+
+ def _sasl_ack(self, server):
+ sasl = server.get_setting("sasl")
+ mechanism = sasl["mechanism"].upper()
+ if mechanism == "USERPASS":
+ server_mechanisms = server.server_capabilities["sasl"]
+ server_mechanisms = server_mechanisms or [
+ USERPASS_MECHANISMS[0]]
+ mechanism = self._best_userpass_mechanism(server_mechanisms)
+
+ server.send_authenticate(mechanism)
+ server.sasl_mechanism = mechanism
+ server.wait_for_capability("sasl")
+
+ @utils.hook("received.authenticate")
+ def on_authenticate(self, event):
+ sasl = event["server"].get_setting("sasl")
+ mechanism = event["server"].sasl_mechanism
+
+ auth_text = None
+ if mechanism == "PLAIN":
+ if event["message"] != "+":
+ event["server"].send_authenticate("*")
+ else:
+ sasl_username, sasl_password = sasl["args"].split(":", 1)
+ auth_text = ("%s\0%s\0%s" % (
+ sasl_username, sasl_username, sasl_password)).encode("utf8")
+
+ elif mechanism == "EXTERNAL":
+ if event["message"] != "+":
+ event["server"].send_authenticate("*")
+ else:
+ auth_text = "+"
+
+ elif mechanism.startswith("SCRAM-"):
+
+ if event["message"] == "+":
+ # start SCRAM handshake
+
+ # create SCRAM helper
+ sasl_username, sasl_password = sasl["args"].split(":", 1)
+ algo = mechanism.split("SCRAM-", 1)[1]
+ event["server"]._scram = scram.SCRAM(
+ algo, sasl_username, sasl_password)
+
+ # generate client-first-message
+ auth_text = event["server"]._scram.client_first()
+ else:
+ current_scram = event["server"]._scram
+ data = base64.b64decode(event["message"])
+ if current_scram.state == scram.SCRAMState.ClientFirst:
+ # use server-first-message to generate client-final-message
+ auth_text = current_scram.server_first(data)
+ elif current_scram.state == scram.SCRAMState.ClientFinal:
+ # use server-final-message to check server proof
+ verified = current_scram.server_final(data)
+ del event["server"]._scram
+
+ if verified:
+ auth_text = "+"
+ else:
+ if current_scram.state == scram.SCRAMState.VerifyFailed:
+ # server gave a bad verification so we should panic
+ event["server"].disconnect()
+ raise ValueError("Server SCRAM verification failed")
+
+ else:
+ raise ValueError("unknown sasl mechanism '%s'" % mechanism)
+
+ if not auth_text == None:
+ if not auth_text == "+":
+ auth_text = base64.b64encode(auth_text)
+ auth_text = auth_text.decode("utf8")
+ event["server"].send_authenticate(auth_text)
+
+ def _end_sasl(self, server):
+ server.capability_done("sasl")
+
+ @utils.hook("received.908")
+ def sasl_mechanisms(self, event):
+ server_mechanisms = event["args"][1].split(",")
+ mechanism = self._best_userpass_mechanism(server_mechanimsms)
+ event["server"].sasl_mechanism = mechanism
+ event["server"].send_authenticate(mechanism)
+
+ @utils.hook("received.903")
+ def sasl_success(self, event):
+ self._end_sasl(event["server"])
+ @utils.hook("received.904")
+ def sasl_failure(self, event):
+ self.log.warn("SASL failure for %s: %s",
+ [str(event["server"]), event["args"][1]])
+ self._end_sasl(event["server"])
+
+ @utils.hook("received.907")
+ def sasl_already(self, event):
+ self._end_sasl(event["server"])
diff --git a/modules/ircv3_sasl/scram.py b/modules/ircv3_sasl/scram.py
new file mode 100644
index 00000000..f243d1e6
--- /dev/null
+++ b/modules/ircv3_sasl/scram.py
@@ -0,0 +1,130 @@
+import base64, enum, hashlib, hmac, os, typing
+
+# IANA Hash Function Textual Names
+# https://tools.ietf.org/html/rfc5802#section-4
+# https://www.iana.org/assignments/hash-function-text-names/
+# MD2 has been removed as it's unacceptably weak
+ALGORITHMS = [
+ "MD5", "SHA-1", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
+
+SCRAM_ERRORS = [
+ "invalid-encoding",
+ "extensions-not-supported", # unrecognized 'm' value
+ "invalid-proof",
+ "channel-bindings-dont-match",
+ "server-does-support-channel-binding",
+ "channel-binding-not-supported",
+ "unsupported-channel-binding-type",
+ "unknown-user",
+ "invalid-username-encoding", # invalid utf8 or bad SASLprep
+ "no-resources"
+]
+
+def _scram_nonce() -> bytes:
+ return base64.b64encode(os.urandom(32))
+def _scram_escape(s: bytes) -> bytes:
+ return s.replace(b"=", b"=3D").replace(b",", b"=2C")
+def _scram_unescape(s: bytes) -> bytes:
+ return s.replace(b"=3D", b"=").replace(b"=2C", b",")
+def _scram_xor(s1: bytes, s2: bytes) -> bytes:
+ return bytes(a ^ b for a, b in zip(s1, s2))
+
+class SCRAMState(enum.Enum):
+ Uninitialised = 0
+ ClientFirst = 1
+ ClientFinal = 2
+ Success = 3
+ Failed = 4
+ VerifyFailed = 5
+
+class SCRAMError(Exception):
+ pass
+
+class SCRAM(object):
+ def __init__(self, algo: str, username: str, password: str):
+ if not algo in ALGORITHMS:
+ raise ValueError("Unknown SCRAM algorithm '%s'" % algo)
+
+ self._algo = algo.replace("-", "") # SHA-1 -> SHA1
+ self._username = username.encode("utf8")
+ self._password = password.encode("utf8")
+
+ self.state = SCRAMState.Uninitialised
+ self.error = ""
+ self.raw_error = ""
+
+ self._client_first = b""
+ self._salted_password = b""
+ self._auth_message = b""
+
+ def _get_pieces(self, data: bytes) -> typing.Dict[bytes, bytes]:
+ pieces = (piece.split(b"=", 1) for piece in data.split(b","))
+ return dict((piece[0], piece[1]) for piece in pieces)
+
+ def _hmac(self, key: bytes, msg: bytes) -> bytes:
+ return hmac.new(key, msg, self._algo).digest()
+ def _hash(self, msg: bytes) -> bytes:
+ return hashlib.new(self._algo, msg).digest()
+
+ def _constant_time_compare(self, b1: bytes, b2: bytes):
+ return hmac.compare_digest(b1, b2)
+
+ def client_first(self) -> bytes:
+ self.state = SCRAMState.ClientFirst
+ self._client_first = b"n=%s,r=%s" % (
+ _scram_escape(self._username), _scram_nonce())
+
+ # n,,n=<username>,r=<nonce>
+ return b"n,,%s" % self._client_first
+
+ def server_first(self, data: bytes) -> bytes:
+ self.state = SCRAMState.ClientFinal
+
+ pieces = self._get_pieces(data)
+ nonce = pieces[b"r"] # server combines your nonce with it's own
+ salt = base64.b64decode(pieces[b"s"]) # salt is b64encoded
+ iterations = int(pieces[b"i"])
+
+ salted_password = hashlib.pbkdf2_hmac(self._algo, self._password,
+ salt, iterations, dklen=None)
+ self._salted_password = salted_password
+
+ client_key = self._hmac(salted_password, b"Client Key")
+ stored_key = self._hash(client_key)
+
+ channel = base64.b64encode(b"n,,")
+ auth_noproof = b"c=%s,r=%s" % (channel, nonce)
+ auth_message = b"%s,%s,%s" % (self._client_first, data, auth_noproof)
+ self._auth_message = auth_message
+
+ client_signature = self._hmac(stored_key, auth_message)
+ client_proof_xor = _scram_xor(client_key, client_signature)
+ client_proof = base64.b64encode(client_proof_xor)
+
+ # c=<b64encode("n,,")>,r=<nonce>,p=<proof>
+ return b"%s,p=%s" % (auth_noproof, client_proof)
+
+ def server_final(self, data: bytes) -> bool:
+ pieces = self._get_pieces(data)
+ if b"e" in pieces:
+ error = pieces[b"e"].decode("utf8")
+ self.raw_error = error
+ if error in SCRAM_ERRORS:
+ self.error = error
+ else:
+ self.error = "other-error"
+
+ self.state = SCRAMState.Failed
+ return False
+
+ verifier = base64.b64decode(pieces[b"v"])
+
+ server_key = self._hmac(self._salted_password, b"Server Key")
+ server_signature = self._hmac(server_key, self._auth_message)
+
+ if server_signature == verifier:
+ self.state = SCRAMState.Success
+ return True
+ else:
+ self.state = SCRAMState.VerifyFailed
+ return False