diff options
| author | 2019-06-03 12:44:04 +0100 | |
|---|---|---|
| committer | 2019-06-03 12:44:04 +0100 | |
| commit | 9a8b345c53e852d7092197cee084d0d3c02bc0ff (patch) | |
| tree | 408c6833c2f4de7198c354043c8ca265c0616901 /modules/sasl | |
| parent | Check from_self, not if target==is_own_nickname, use from_self when adding to (diff) | |
| signature | ||
Prefix names for all IRCv3 modules with "ircv3_"
Diffstat (limited to 'modules/sasl')
| -rw-r--r-- | modules/sasl/README.md | 46 | ||||
| -rw-r--r-- | modules/sasl/__init__.py | 147 | ||||
| -rw-r--r-- | modules/sasl/scram.py | 130 |
3 files changed, 0 insertions, 323 deletions
diff --git a/modules/sasl/README.md b/modules/sasl/README.md deleted file mode 100644 index 30a51e08..00000000 --- a/modules/sasl/README.md +++ /dev/null @@ -1,46 +0,0 @@ -# Configuring SASL - -You can either configure SASL through `!serverset sasl` from an registered and identified admin account or directly through sqlite. - -## USERPASS Mechanism - -BitBot supports a special SASL mechanism name: `USERPASS`. This internally -represents "pick the strongest username:password algorithm" - -## !serverset sasl - -These commands are to be executed from a registered admin account - -#### USERPASS -> !serverset sasl userpass <username>:<password> - -#### PLAIN -> !serverset sasl plain <username>:<password> - -#### SCRAM-SHA-1 -> !serverset sasl scram-sha-1 <username>:<password> - -#### SCRAM-SHA-256 -> !serverset sasl scram-sha-256 <username>:<password> - -#### EXTERNAL -> !serverset sasl external - -## sqlite - -Execute these against the current bot database file (e.g. `$ sqlite3 databases/bot.db`) - -#### USERPASS -> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "userpass", "args": "<username>:<password>"}'); - -#### PLAIN -> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "plain", "args": "<username>:<password>"}'); - -#### SCRAM-SHA-1 -> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-1", "args": "<username>:<password>"}'); - -#### SCRAM-SHA-256 -> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-256", "args": "<username>:<password>"}'); - -#### external -> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "external"}'); diff --git a/modules/sasl/__init__.py b/modules/sasl/__init__.py deleted file mode 100644 index b62309a6..00000000 --- a/modules/sasl/__init__.py +++ /dev/null @@ -1,147 +0,0 @@ -#--depends-on config - -import base64, hashlib, hmac, uuid -from src import ModuleManager, utils -from . import scram - -CAP = utils.irc.Capability("sasl") - -USERPASS_MECHANISMS = [ - "SCRAM-SHA-512", - "SCRAM-SHA-256", - "SCRAM-SHA-1", - "PLAIN" -] - -def _validate(s): - mechanism, _, arguments = s.partition(" ") - return {"mechanism": mechanism, "args": arguments} - -@utils.export("serverset", {"setting": "sasl", - "help": "Set the sasl username/password for this server", - "validate": _validate, "example": "PLAIN BitBot:hunder2"}) -class Module(ModuleManager.BaseModule): - def _best_userpass_mechanism(self, mechanisms): - for potential_mechanism in USERPASS_MECHANISMS: - if potential_mechanism in mechanisms: - return potential_mechanism - - @utils.hook("received.cap.new") - @utils.hook("received.cap.ls") - def on_cap(self, event): - has_sasl = "sasl" in event["capabilities"] - our_sasl = event["server"].get_setting("sasl", None) - - do_sasl = False - if has_sasl and our_sasl: - if not event["capabilities"]["sasl"] == None: - our_mechanism = our_sasl["mechanism"].upper() - server_mechanisms = event["capabilities"]["sasl"].split(",") - if our_mechanism == "USERPASS": - our_mechanism = self._best_userpass_mechanism( - server_mechanisms) - do_sasl = our_mechanism in server_mechanisms - else: - do_sasl = True - - if do_sasl: - cap = CAP.copy() - cap.on_ack(lambda: self._sasl_ack(event["server"])) - return cap - - def _sasl_ack(self, server): - sasl = server.get_setting("sasl") - mechanism = sasl["mechanism"].upper() - if mechanism == "USERPASS": - server_mechanisms = server.server_capabilities["sasl"] - server_mechanisms = server_mechanisms or [ - USERPASS_MECHANISMS[0]] - mechanism = self._best_userpass_mechanism(server_mechanisms) - - server.send_authenticate(mechanism) - server.sasl_mechanism = mechanism - server.wait_for_capability("sasl") - - @utils.hook("received.authenticate") - def on_authenticate(self, event): - sasl = event["server"].get_setting("sasl") - mechanism = event["server"].sasl_mechanism - - auth_text = None - if mechanism == "PLAIN": - if event["message"] != "+": - event["server"].send_authenticate("*") - else: - sasl_username, sasl_password = sasl["args"].split(":", 1) - auth_text = ("%s\0%s\0%s" % ( - sasl_username, sasl_username, sasl_password)).encode("utf8") - - elif mechanism == "EXTERNAL": - if event["message"] != "+": - event["server"].send_authenticate("*") - else: - auth_text = "+" - - elif mechanism.startswith("SCRAM-"): - - if event["message"] == "+": - # start SCRAM handshake - - # create SCRAM helper - sasl_username, sasl_password = sasl["args"].split(":", 1) - algo = mechanism.split("SCRAM-", 1)[1] - event["server"]._scram = scram.SCRAM( - algo, sasl_username, sasl_password) - - # generate client-first-message - auth_text = event["server"]._scram.client_first() - else: - current_scram = event["server"]._scram - data = base64.b64decode(event["message"]) - if current_scram.state == scram.SCRAMState.ClientFirst: - # use server-first-message to generate client-final-message - auth_text = current_scram.server_first(data) - elif current_scram.state == scram.SCRAMState.ClientFinal: - # use server-final-message to check server proof - verified = current_scram.server_final(data) - del event["server"]._scram - - if verified: - auth_text = "+" - else: - if current_scram.state == scram.SCRAMState.VerifyFailed: - # server gave a bad verification so we should panic - event["server"].disconnect() - raise ValueError("Server SCRAM verification failed") - - else: - raise ValueError("unknown sasl mechanism '%s'" % mechanism) - - if not auth_text == None: - if not auth_text == "+": - auth_text = base64.b64encode(auth_text) - auth_text = auth_text.decode("utf8") - event["server"].send_authenticate(auth_text) - - def _end_sasl(self, server): - server.capability_done("sasl") - - @utils.hook("received.908") - def sasl_mechanisms(self, event): - server_mechanisms = event["args"][1].split(",") - mechanism = self._best_userpass_mechanism(server_mechanimsms) - event["server"].sasl_mechanism = mechanism - event["server"].send_authenticate(mechanism) - - @utils.hook("received.903") - def sasl_success(self, event): - self._end_sasl(event["server"]) - @utils.hook("received.904") - def sasl_failure(self, event): - self.log.warn("SASL failure for %s: %s", - [str(event["server"]), event["args"][1]]) - self._end_sasl(event["server"]) - - @utils.hook("received.907") - def sasl_already(self, event): - self._end_sasl(event["server"]) diff --git a/modules/sasl/scram.py b/modules/sasl/scram.py deleted file mode 100644 index f243d1e6..00000000 --- a/modules/sasl/scram.py +++ /dev/null @@ -1,130 +0,0 @@ -import base64, enum, hashlib, hmac, os, typing - -# IANA Hash Function Textual Names -# https://tools.ietf.org/html/rfc5802#section-4 -# https://www.iana.org/assignments/hash-function-text-names/ -# MD2 has been removed as it's unacceptably weak -ALGORITHMS = [ - "MD5", "SHA-1", "SHA-224", "SHA-256", "SHA-384", "SHA-512"] - -SCRAM_ERRORS = [ - "invalid-encoding", - "extensions-not-supported", # unrecognized 'm' value - "invalid-proof", - "channel-bindings-dont-match", - "server-does-support-channel-binding", - "channel-binding-not-supported", - "unsupported-channel-binding-type", - "unknown-user", - "invalid-username-encoding", # invalid utf8 or bad SASLprep - "no-resources" -] - -def _scram_nonce() -> bytes: - return base64.b64encode(os.urandom(32)) -def _scram_escape(s: bytes) -> bytes: - return s.replace(b"=", b"=3D").replace(b",", b"=2C") -def _scram_unescape(s: bytes) -> bytes: - return s.replace(b"=3D", b"=").replace(b"=2C", b",") -def _scram_xor(s1: bytes, s2: bytes) -> bytes: - return bytes(a ^ b for a, b in zip(s1, s2)) - -class SCRAMState(enum.Enum): - Uninitialised = 0 - ClientFirst = 1 - ClientFinal = 2 - Success = 3 - Failed = 4 - VerifyFailed = 5 - -class SCRAMError(Exception): - pass - -class SCRAM(object): - def __init__(self, algo: str, username: str, password: str): - if not algo in ALGORITHMS: - raise ValueError("Unknown SCRAM algorithm '%s'" % algo) - - self._algo = algo.replace("-", "") # SHA-1 -> SHA1 - self._username = username.encode("utf8") - self._password = password.encode("utf8") - - self.state = SCRAMState.Uninitialised - self.error = "" - self.raw_error = "" - - self._client_first = b"" - self._salted_password = b"" - self._auth_message = b"" - - def _get_pieces(self, data: bytes) -> typing.Dict[bytes, bytes]: - pieces = (piece.split(b"=", 1) for piece in data.split(b",")) - return dict((piece[0], piece[1]) for piece in pieces) - - def _hmac(self, key: bytes, msg: bytes) -> bytes: - return hmac.new(key, msg, self._algo).digest() - def _hash(self, msg: bytes) -> bytes: - return hashlib.new(self._algo, msg).digest() - - def _constant_time_compare(self, b1: bytes, b2: bytes): - return hmac.compare_digest(b1, b2) - - def client_first(self) -> bytes: - self.state = SCRAMState.ClientFirst - self._client_first = b"n=%s,r=%s" % ( - _scram_escape(self._username), _scram_nonce()) - - # n,,n=<username>,r=<nonce> - return b"n,,%s" % self._client_first - - def server_first(self, data: bytes) -> bytes: - self.state = SCRAMState.ClientFinal - - pieces = self._get_pieces(data) - nonce = pieces[b"r"] # server combines your nonce with it's own - salt = base64.b64decode(pieces[b"s"]) # salt is b64encoded - iterations = int(pieces[b"i"]) - - salted_password = hashlib.pbkdf2_hmac(self._algo, self._password, - salt, iterations, dklen=None) - self._salted_password = salted_password - - client_key = self._hmac(salted_password, b"Client Key") - stored_key = self._hash(client_key) - - channel = base64.b64encode(b"n,,") - auth_noproof = b"c=%s,r=%s" % (channel, nonce) - auth_message = b"%s,%s,%s" % (self._client_first, data, auth_noproof) - self._auth_message = auth_message - - client_signature = self._hmac(stored_key, auth_message) - client_proof_xor = _scram_xor(client_key, client_signature) - client_proof = base64.b64encode(client_proof_xor) - - # c=<b64encode("n,,")>,r=<nonce>,p=<proof> - return b"%s,p=%s" % (auth_noproof, client_proof) - - def server_final(self, data: bytes) -> bool: - pieces = self._get_pieces(data) - if b"e" in pieces: - error = pieces[b"e"].decode("utf8") - self.raw_error = error - if error in SCRAM_ERRORS: - self.error = error - else: - self.error = "other-error" - - self.state = SCRAMState.Failed - return False - - verifier = base64.b64decode(pieces[b"v"]) - - server_key = self._hmac(self._salted_password, b"Server Key") - server_signature = self._hmac(server_key, self._auth_message) - - if server_signature == verifier: - self.state = SCRAMState.Success - return True - else: - self.state = SCRAMState.VerifyFailed - return False |
