diff options
| author | 2019-06-03 12:44:04 +0100 | |
|---|---|---|
| committer | 2019-06-03 12:44:04 +0100 | |
| commit | 9a8b345c53e852d7092197cee084d0d3c02bc0ff (patch) | |
| tree | 408c6833c2f4de7198c354043c8ca265c0616901 /modules/ircv3_sasl | |
| parent | Check from_self, not if target==is_own_nickname, use from_self when adding to (diff) | |
| signature | ||
Prefix names for all IRCv3 modules with "ircv3_"
Diffstat (limited to 'modules/ircv3_sasl')
| -rw-r--r-- | modules/ircv3_sasl/README.md | 46 | ||||
| -rw-r--r-- | modules/ircv3_sasl/__init__.py | 147 | ||||
| -rw-r--r-- | modules/ircv3_sasl/scram.py | 130 |
3 files changed, 323 insertions, 0 deletions
diff --git a/modules/ircv3_sasl/README.md b/modules/ircv3_sasl/README.md new file mode 100644 index 00000000..30a51e08 --- /dev/null +++ b/modules/ircv3_sasl/README.md @@ -0,0 +1,46 @@ +# Configuring SASL + +You can either configure SASL through `!serverset sasl` from an registered and identified admin account or directly through sqlite. + +## USERPASS Mechanism + +BitBot supports a special SASL mechanism name: `USERPASS`. This internally +represents "pick the strongest username:password algorithm" + +## !serverset sasl + +These commands are to be executed from a registered admin account + +#### USERPASS +> !serverset sasl userpass <username>:<password> + +#### PLAIN +> !serverset sasl plain <username>:<password> + +#### SCRAM-SHA-1 +> !serverset sasl scram-sha-1 <username>:<password> + +#### SCRAM-SHA-256 +> !serverset sasl scram-sha-256 <username>:<password> + +#### EXTERNAL +> !serverset sasl external + +## sqlite + +Execute these against the current bot database file (e.g. `$ sqlite3 databases/bot.db`) + +#### USERPASS +> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "userpass", "args": "<username>:<password>"}'); + +#### PLAIN +> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "plain", "args": "<username>:<password>"}'); + +#### SCRAM-SHA-1 +> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-1", "args": "<username>:<password>"}'); + +#### SCRAM-SHA-256 +> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-256", "args": "<username>:<password>"}'); + +#### external +> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "external"}'); diff --git a/modules/ircv3_sasl/__init__.py b/modules/ircv3_sasl/__init__.py new file mode 100644 index 00000000..b62309a6 --- /dev/null +++ b/modules/ircv3_sasl/__init__.py @@ -0,0 +1,147 @@ +#--depends-on config + +import base64, hashlib, hmac, uuid +from src import ModuleManager, utils +from . import scram + +CAP = utils.irc.Capability("sasl") + +USERPASS_MECHANISMS = [ + "SCRAM-SHA-512", + "SCRAM-SHA-256", + "SCRAM-SHA-1", + "PLAIN" +] + +def _validate(s): + mechanism, _, arguments = s.partition(" ") + return {"mechanism": mechanism, "args": arguments} + +@utils.export("serverset", {"setting": "sasl", + "help": "Set the sasl username/password for this server", + "validate": _validate, "example": "PLAIN BitBot:hunder2"}) +class Module(ModuleManager.BaseModule): + def _best_userpass_mechanism(self, mechanisms): + for potential_mechanism in USERPASS_MECHANISMS: + if potential_mechanism in mechanisms: + return potential_mechanism + + @utils.hook("received.cap.new") + @utils.hook("received.cap.ls") + def on_cap(self, event): + has_sasl = "sasl" in event["capabilities"] + our_sasl = event["server"].get_setting("sasl", None) + + do_sasl = False + if has_sasl and our_sasl: + if not event["capabilities"]["sasl"] == None: + our_mechanism = our_sasl["mechanism"].upper() + server_mechanisms = event["capabilities"]["sasl"].split(",") + if our_mechanism == "USERPASS": + our_mechanism = self._best_userpass_mechanism( + server_mechanisms) + do_sasl = our_mechanism in server_mechanisms + else: + do_sasl = True + + if do_sasl: + cap = CAP.copy() + cap.on_ack(lambda: self._sasl_ack(event["server"])) + return cap + + def _sasl_ack(self, server): + sasl = server.get_setting("sasl") + mechanism = sasl["mechanism"].upper() + if mechanism == "USERPASS": + server_mechanisms = server.server_capabilities["sasl"] + server_mechanisms = server_mechanisms or [ + USERPASS_MECHANISMS[0]] + mechanism = self._best_userpass_mechanism(server_mechanisms) + + server.send_authenticate(mechanism) + server.sasl_mechanism = mechanism + server.wait_for_capability("sasl") + + @utils.hook("received.authenticate") + def on_authenticate(self, event): + sasl = event["server"].get_setting("sasl") + mechanism = event["server"].sasl_mechanism + + auth_text = None + if mechanism == "PLAIN": + if event["message"] != "+": + event["server"].send_authenticate("*") + else: + sasl_username, sasl_password = sasl["args"].split(":", 1) + auth_text = ("%s\0%s\0%s" % ( + sasl_username, sasl_username, sasl_password)).encode("utf8") + + elif mechanism == "EXTERNAL": + if event["message"] != "+": + event["server"].send_authenticate("*") + else: + auth_text = "+" + + elif mechanism.startswith("SCRAM-"): + + if event["message"] == "+": + # start SCRAM handshake + + # create SCRAM helper + sasl_username, sasl_password = sasl["args"].split(":", 1) + algo = mechanism.split("SCRAM-", 1)[1] + event["server"]._scram = scram.SCRAM( + algo, sasl_username, sasl_password) + + # generate client-first-message + auth_text = event["server"]._scram.client_first() + else: + current_scram = event["server"]._scram + data = base64.b64decode(event["message"]) + if current_scram.state == scram.SCRAMState.ClientFirst: + # use server-first-message to generate client-final-message + auth_text = current_scram.server_first(data) + elif current_scram.state == scram.SCRAMState.ClientFinal: + # use server-final-message to check server proof + verified = current_scram.server_final(data) + del event["server"]._scram + + if verified: + auth_text = "+" + else: + if current_scram.state == scram.SCRAMState.VerifyFailed: + # server gave a bad verification so we should panic + event["server"].disconnect() + raise ValueError("Server SCRAM verification failed") + + else: + raise ValueError("unknown sasl mechanism '%s'" % mechanism) + + if not auth_text == None: + if not auth_text == "+": + auth_text = base64.b64encode(auth_text) + auth_text = auth_text.decode("utf8") + event["server"].send_authenticate(auth_text) + + def _end_sasl(self, server): + server.capability_done("sasl") + + @utils.hook("received.908") + def sasl_mechanisms(self, event): + server_mechanisms = event["args"][1].split(",") + mechanism = self._best_userpass_mechanism(server_mechanimsms) + event["server"].sasl_mechanism = mechanism + event["server"].send_authenticate(mechanism) + + @utils.hook("received.903") + def sasl_success(self, event): + self._end_sasl(event["server"]) + @utils.hook("received.904") + def sasl_failure(self, event): + self.log.warn("SASL failure for %s: %s", + [str(event["server"]), event["args"][1]]) + self._end_sasl(event["server"]) + + @utils.hook("received.907") + def sasl_already(self, event): + self._end_sasl(event["server"]) diff --git a/modules/ircv3_sasl/scram.py b/modules/ircv3_sasl/scram.py new file mode 100644 index 00000000..f243d1e6 --- /dev/null +++ b/modules/ircv3_sasl/scram.py @@ -0,0 +1,130 @@ +import base64, enum, hashlib, hmac, os, typing + +# IANA Hash Function Textual Names +# https://tools.ietf.org/html/rfc5802#section-4 +# https://www.iana.org/assignments/hash-function-text-names/ +# MD2 has been removed as it's unacceptably weak +ALGORITHMS = [ + "MD5", "SHA-1", "SHA-224", "SHA-256", "SHA-384", "SHA-512"] + +SCRAM_ERRORS = [ + "invalid-encoding", + "extensions-not-supported", # unrecognized 'm' value + "invalid-proof", + "channel-bindings-dont-match", + "server-does-support-channel-binding", + "channel-binding-not-supported", + "unsupported-channel-binding-type", + "unknown-user", + "invalid-username-encoding", # invalid utf8 or bad SASLprep + "no-resources" +] + +def _scram_nonce() -> bytes: + return base64.b64encode(os.urandom(32)) +def _scram_escape(s: bytes) -> bytes: + return s.replace(b"=", b"=3D").replace(b",", b"=2C") +def _scram_unescape(s: bytes) -> bytes: + return s.replace(b"=3D", b"=").replace(b"=2C", b",") +def _scram_xor(s1: bytes, s2: bytes) -> bytes: + return bytes(a ^ b for a, b in zip(s1, s2)) + +class SCRAMState(enum.Enum): + Uninitialised = 0 + ClientFirst = 1 + ClientFinal = 2 + Success = 3 + Failed = 4 + VerifyFailed = 5 + +class SCRAMError(Exception): + pass + +class SCRAM(object): + def __init__(self, algo: str, username: str, password: str): + if not algo in ALGORITHMS: + raise ValueError("Unknown SCRAM algorithm '%s'" % algo) + + self._algo = algo.replace("-", "") # SHA-1 -> SHA1 + self._username = username.encode("utf8") + self._password = password.encode("utf8") + + self.state = SCRAMState.Uninitialised + self.error = "" + self.raw_error = "" + + self._client_first = b"" + self._salted_password = b"" + self._auth_message = b"" + + def _get_pieces(self, data: bytes) -> typing.Dict[bytes, bytes]: + pieces = (piece.split(b"=", 1) for piece in data.split(b",")) + return dict((piece[0], piece[1]) for piece in pieces) + + def _hmac(self, key: bytes, msg: bytes) -> bytes: + return hmac.new(key, msg, self._algo).digest() + def _hash(self, msg: bytes) -> bytes: + return hashlib.new(self._algo, msg).digest() + + def _constant_time_compare(self, b1: bytes, b2: bytes): + return hmac.compare_digest(b1, b2) + + def client_first(self) -> bytes: + self.state = SCRAMState.ClientFirst + self._client_first = b"n=%s,r=%s" % ( + _scram_escape(self._username), _scram_nonce()) + + # n,,n=<username>,r=<nonce> + return b"n,,%s" % self._client_first + + def server_first(self, data: bytes) -> bytes: + self.state = SCRAMState.ClientFinal + + pieces = self._get_pieces(data) + nonce = pieces[b"r"] # server combines your nonce with it's own + salt = base64.b64decode(pieces[b"s"]) # salt is b64encoded + iterations = int(pieces[b"i"]) + + salted_password = hashlib.pbkdf2_hmac(self._algo, self._password, + salt, iterations, dklen=None) + self._salted_password = salted_password + + client_key = self._hmac(salted_password, b"Client Key") + stored_key = self._hash(client_key) + + channel = base64.b64encode(b"n,,") + auth_noproof = b"c=%s,r=%s" % (channel, nonce) + auth_message = b"%s,%s,%s" % (self._client_first, data, auth_noproof) + self._auth_message = auth_message + + client_signature = self._hmac(stored_key, auth_message) + client_proof_xor = _scram_xor(client_key, client_signature) + client_proof = base64.b64encode(client_proof_xor) + + # c=<b64encode("n,,")>,r=<nonce>,p=<proof> + return b"%s,p=%s" % (auth_noproof, client_proof) + + def server_final(self, data: bytes) -> bool: + pieces = self._get_pieces(data) + if b"e" in pieces: + error = pieces[b"e"].decode("utf8") + self.raw_error = error + if error in SCRAM_ERRORS: + self.error = error + else: + self.error = "other-error" + + self.state = SCRAMState.Failed + return False + + verifier = base64.b64decode(pieces[b"v"]) + + server_key = self._hmac(self._salted_password, b"Server Key") + server_signature = self._hmac(server_key, self._auth_message) + + if server_signature == verifier: + self.state = SCRAMState.Success + return True + else: + self.state = SCRAMState.VerifyFailed + return False |
