aboutsummaryrefslogtreecommitdiff
path: root/modules/sasl
diff options
context:
space:
mode:
authorGravatar jesopo2019-06-03 12:44:04 +0100
committerGravatar jesopo2019-06-03 12:44:04 +0100
commit9a8b345c53e852d7092197cee084d0d3c02bc0ff (patch)
tree408c6833c2f4de7198c354043c8ca265c0616901 /modules/sasl
parentCheck from_self, not if target==is_own_nickname, use from_self when adding to (diff)
signature
Prefix names for all IRCv3 modules with "ircv3_"
Diffstat (limited to 'modules/sasl')
-rw-r--r--modules/sasl/README.md46
-rw-r--r--modules/sasl/__init__.py147
-rw-r--r--modules/sasl/scram.py130
3 files changed, 0 insertions, 323 deletions
diff --git a/modules/sasl/README.md b/modules/sasl/README.md
deleted file mode 100644
index 30a51e08..00000000
--- a/modules/sasl/README.md
+++ /dev/null
@@ -1,46 +0,0 @@
-# Configuring SASL
-
-You can either configure SASL through `!serverset sasl` from an registered and identified admin account or directly through sqlite.
-
-## USERPASS Mechanism
-
-BitBot supports a special SASL mechanism name: `USERPASS`. This internally
-represents "pick the strongest username:password algorithm"
-
-## !serverset sasl
-
-These commands are to be executed from a registered admin account
-
-#### USERPASS
-> !serverset sasl userpass <username>:<password>
-
-#### PLAIN
-> !serverset sasl plain <username>:<password>
-
-#### SCRAM-SHA-1
-> !serverset sasl scram-sha-1 <username>:<password>
-
-#### SCRAM-SHA-256
-> !serverset sasl scram-sha-256 <username>:<password>
-
-#### EXTERNAL
-> !serverset sasl external
-
-## sqlite
-
-Execute these against the current bot database file (e.g. `$ sqlite3 databases/bot.db`)
-
-#### USERPASS
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "userpass", "args": "<username>:<password>"}');
-
-#### PLAIN
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "plain", "args": "<username>:<password>"}');
-
-#### SCRAM-SHA-1
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-1", "args": "<username>:<password>"}');
-
-#### SCRAM-SHA-256
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-256", "args": "<username>:<password>"}');
-
-#### external
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "external"}');
diff --git a/modules/sasl/__init__.py b/modules/sasl/__init__.py
deleted file mode 100644
index b62309a6..00000000
--- a/modules/sasl/__init__.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#--depends-on config
-
-import base64, hashlib, hmac, uuid
-from src import ModuleManager, utils
-from . import scram
-
-CAP = utils.irc.Capability("sasl")
-
-USERPASS_MECHANISMS = [
- "SCRAM-SHA-512",
- "SCRAM-SHA-256",
- "SCRAM-SHA-1",
- "PLAIN"
-]
-
-def _validate(s):
- mechanism, _, arguments = s.partition(" ")
- return {"mechanism": mechanism, "args": arguments}
-
-@utils.export("serverset", {"setting": "sasl",
- "help": "Set the sasl username/password for this server",
- "validate": _validate, "example": "PLAIN BitBot:hunder2"})
-class Module(ModuleManager.BaseModule):
- def _best_userpass_mechanism(self, mechanisms):
- for potential_mechanism in USERPASS_MECHANISMS:
- if potential_mechanism in mechanisms:
- return potential_mechanism
-
- @utils.hook("received.cap.new")
- @utils.hook("received.cap.ls")
- def on_cap(self, event):
- has_sasl = "sasl" in event["capabilities"]
- our_sasl = event["server"].get_setting("sasl", None)
-
- do_sasl = False
- if has_sasl and our_sasl:
- if not event["capabilities"]["sasl"] == None:
- our_mechanism = our_sasl["mechanism"].upper()
- server_mechanisms = event["capabilities"]["sasl"].split(",")
- if our_mechanism == "USERPASS":
- our_mechanism = self._best_userpass_mechanism(
- server_mechanisms)
- do_sasl = our_mechanism in server_mechanisms
- else:
- do_sasl = True
-
- if do_sasl:
- cap = CAP.copy()
- cap.on_ack(lambda: self._sasl_ack(event["server"]))
- return cap
-
- def _sasl_ack(self, server):
- sasl = server.get_setting("sasl")
- mechanism = sasl["mechanism"].upper()
- if mechanism == "USERPASS":
- server_mechanisms = server.server_capabilities["sasl"]
- server_mechanisms = server_mechanisms or [
- USERPASS_MECHANISMS[0]]
- mechanism = self._best_userpass_mechanism(server_mechanisms)
-
- server.send_authenticate(mechanism)
- server.sasl_mechanism = mechanism
- server.wait_for_capability("sasl")
-
- @utils.hook("received.authenticate")
- def on_authenticate(self, event):
- sasl = event["server"].get_setting("sasl")
- mechanism = event["server"].sasl_mechanism
-
- auth_text = None
- if mechanism == "PLAIN":
- if event["message"] != "+":
- event["server"].send_authenticate("*")
- else:
- sasl_username, sasl_password = sasl["args"].split(":", 1)
- auth_text = ("%s\0%s\0%s" % (
- sasl_username, sasl_username, sasl_password)).encode("utf8")
-
- elif mechanism == "EXTERNAL":
- if event["message"] != "+":
- event["server"].send_authenticate("*")
- else:
- auth_text = "+"
-
- elif mechanism.startswith("SCRAM-"):
-
- if event["message"] == "+":
- # start SCRAM handshake
-
- # create SCRAM helper
- sasl_username, sasl_password = sasl["args"].split(":", 1)
- algo = mechanism.split("SCRAM-", 1)[1]
- event["server"]._scram = scram.SCRAM(
- algo, sasl_username, sasl_password)
-
- # generate client-first-message
- auth_text = event["server"]._scram.client_first()
- else:
- current_scram = event["server"]._scram
- data = base64.b64decode(event["message"])
- if current_scram.state == scram.SCRAMState.ClientFirst:
- # use server-first-message to generate client-final-message
- auth_text = current_scram.server_first(data)
- elif current_scram.state == scram.SCRAMState.ClientFinal:
- # use server-final-message to check server proof
- verified = current_scram.server_final(data)
- del event["server"]._scram
-
- if verified:
- auth_text = "+"
- else:
- if current_scram.state == scram.SCRAMState.VerifyFailed:
- # server gave a bad verification so we should panic
- event["server"].disconnect()
- raise ValueError("Server SCRAM verification failed")
-
- else:
- raise ValueError("unknown sasl mechanism '%s'" % mechanism)
-
- if not auth_text == None:
- if not auth_text == "+":
- auth_text = base64.b64encode(auth_text)
- auth_text = auth_text.decode("utf8")
- event["server"].send_authenticate(auth_text)
-
- def _end_sasl(self, server):
- server.capability_done("sasl")
-
- @utils.hook("received.908")
- def sasl_mechanisms(self, event):
- server_mechanisms = event["args"][1].split(",")
- mechanism = self._best_userpass_mechanism(server_mechanimsms)
- event["server"].sasl_mechanism = mechanism
- event["server"].send_authenticate(mechanism)
-
- @utils.hook("received.903")
- def sasl_success(self, event):
- self._end_sasl(event["server"])
- @utils.hook("received.904")
- def sasl_failure(self, event):
- self.log.warn("SASL failure for %s: %s",
- [str(event["server"]), event["args"][1]])
- self._end_sasl(event["server"])
-
- @utils.hook("received.907")
- def sasl_already(self, event):
- self._end_sasl(event["server"])
diff --git a/modules/sasl/scram.py b/modules/sasl/scram.py
deleted file mode 100644
index f243d1e6..00000000
--- a/modules/sasl/scram.py
+++ /dev/null
@@ -1,130 +0,0 @@
-import base64, enum, hashlib, hmac, os, typing
-
-# IANA Hash Function Textual Names
-# https://tools.ietf.org/html/rfc5802#section-4
-# https://www.iana.org/assignments/hash-function-text-names/
-# MD2 has been removed as it's unacceptably weak
-ALGORITHMS = [
- "MD5", "SHA-1", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
-
-SCRAM_ERRORS = [
- "invalid-encoding",
- "extensions-not-supported", # unrecognized 'm' value
- "invalid-proof",
- "channel-bindings-dont-match",
- "server-does-support-channel-binding",
- "channel-binding-not-supported",
- "unsupported-channel-binding-type",
- "unknown-user",
- "invalid-username-encoding", # invalid utf8 or bad SASLprep
- "no-resources"
-]
-
-def _scram_nonce() -> bytes:
- return base64.b64encode(os.urandom(32))
-def _scram_escape(s: bytes) -> bytes:
- return s.replace(b"=", b"=3D").replace(b",", b"=2C")
-def _scram_unescape(s: bytes) -> bytes:
- return s.replace(b"=3D", b"=").replace(b"=2C", b",")
-def _scram_xor(s1: bytes, s2: bytes) -> bytes:
- return bytes(a ^ b for a, b in zip(s1, s2))
-
-class SCRAMState(enum.Enum):
- Uninitialised = 0
- ClientFirst = 1
- ClientFinal = 2
- Success = 3
- Failed = 4
- VerifyFailed = 5
-
-class SCRAMError(Exception):
- pass
-
-class SCRAM(object):
- def __init__(self, algo: str, username: str, password: str):
- if not algo in ALGORITHMS:
- raise ValueError("Unknown SCRAM algorithm '%s'" % algo)
-
- self._algo = algo.replace("-", "") # SHA-1 -> SHA1
- self._username = username.encode("utf8")
- self._password = password.encode("utf8")
-
- self.state = SCRAMState.Uninitialised
- self.error = ""
- self.raw_error = ""
-
- self._client_first = b""
- self._salted_password = b""
- self._auth_message = b""
-
- def _get_pieces(self, data: bytes) -> typing.Dict[bytes, bytes]:
- pieces = (piece.split(b"=", 1) for piece in data.split(b","))
- return dict((piece[0], piece[1]) for piece in pieces)
-
- def _hmac(self, key: bytes, msg: bytes) -> bytes:
- return hmac.new(key, msg, self._algo).digest()
- def _hash(self, msg: bytes) -> bytes:
- return hashlib.new(self._algo, msg).digest()
-
- def _constant_time_compare(self, b1: bytes, b2: bytes):
- return hmac.compare_digest(b1, b2)
-
- def client_first(self) -> bytes:
- self.state = SCRAMState.ClientFirst
- self._client_first = b"n=%s,r=%s" % (
- _scram_escape(self._username), _scram_nonce())
-
- # n,,n=<username>,r=<nonce>
- return b"n,,%s" % self._client_first
-
- def server_first(self, data: bytes) -> bytes:
- self.state = SCRAMState.ClientFinal
-
- pieces = self._get_pieces(data)
- nonce = pieces[b"r"] # server combines your nonce with it's own
- salt = base64.b64decode(pieces[b"s"]) # salt is b64encoded
- iterations = int(pieces[b"i"])
-
- salted_password = hashlib.pbkdf2_hmac(self._algo, self._password,
- salt, iterations, dklen=None)
- self._salted_password = salted_password
-
- client_key = self._hmac(salted_password, b"Client Key")
- stored_key = self._hash(client_key)
-
- channel = base64.b64encode(b"n,,")
- auth_noproof = b"c=%s,r=%s" % (channel, nonce)
- auth_message = b"%s,%s,%s" % (self._client_first, data, auth_noproof)
- self._auth_message = auth_message
-
- client_signature = self._hmac(stored_key, auth_message)
- client_proof_xor = _scram_xor(client_key, client_signature)
- client_proof = base64.b64encode(client_proof_xor)
-
- # c=<b64encode("n,,")>,r=<nonce>,p=<proof>
- return b"%s,p=%s" % (auth_noproof, client_proof)
-
- def server_final(self, data: bytes) -> bool:
- pieces = self._get_pieces(data)
- if b"e" in pieces:
- error = pieces[b"e"].decode("utf8")
- self.raw_error = error
- if error in SCRAM_ERRORS:
- self.error = error
- else:
- self.error = "other-error"
-
- self.state = SCRAMState.Failed
- return False
-
- verifier = base64.b64decode(pieces[b"v"])
-
- server_key = self._hmac(self._salted_password, b"Server Key")
- server_signature = self._hmac(server_key, self._auth_message)
-
- if server_signature == verifier:
- self.state = SCRAMState.Success
- return True
- else:
- self.state = SCRAMState.VerifyFailed
- return False