aboutsummaryrefslogtreecommitdiff
path: root/modules/ircv3_sasl
diff options
context:
space:
mode:
authorGravatar jesopo2019-12-10 05:27:35 +0000
committerGravatar jesopo2019-12-10 05:27:35 +0000
commit638eee0d685c06d258cb55287204ca97bca7c344 (patch)
tree33442439317ae2846f1efb7674b7a3758c8990a1 /modules/ircv3_sasl
parentmove sys.exit() codes to an enum in utils.consts (diff)
move core modules to src/core_modules, make them uneffected by white/black list
Diffstat (limited to 'modules/ircv3_sasl')
-rw-r--r--modules/ircv3_sasl/README.md46
-rw-r--r--modules/ircv3_sasl/__init__.py195
-rw-r--r--modules/ircv3_sasl/scram.py130
3 files changed, 0 insertions, 371 deletions
diff --git a/modules/ircv3_sasl/README.md b/modules/ircv3_sasl/README.md
deleted file mode 100644
index 30a51e08..00000000
--- a/modules/ircv3_sasl/README.md
+++ /dev/null
@@ -1,46 +0,0 @@
-# Configuring SASL
-
-You can either configure SASL through `!serverset sasl` from an registered and identified admin account or directly through sqlite.
-
-## USERPASS Mechanism
-
-BitBot supports a special SASL mechanism name: `USERPASS`. This internally
-represents "pick the strongest username:password algorithm"
-
-## !serverset sasl
-
-These commands are to be executed from a registered admin account
-
-#### USERPASS
-> !serverset sasl userpass <username>:<password>
-
-#### PLAIN
-> !serverset sasl plain <username>:<password>
-
-#### SCRAM-SHA-1
-> !serverset sasl scram-sha-1 <username>:<password>
-
-#### SCRAM-SHA-256
-> !serverset sasl scram-sha-256 <username>:<password>
-
-#### EXTERNAL
-> !serverset sasl external
-
-## sqlite
-
-Execute these against the current bot database file (e.g. `$ sqlite3 databases/bot.db`)
-
-#### USERPASS
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "userpass", "args": "<username>:<password>"}');
-
-#### PLAIN
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "plain", "args": "<username>:<password>"}');
-
-#### SCRAM-SHA-1
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-1", "args": "<username>:<password>"}');
-
-#### SCRAM-SHA-256
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "scram-sha-256", "args": "<username>:<password>"}');
-
-#### external
-> INSERT INTO server_settings (<serverid>, 'sasl', '{"mechanism": "external"}');
diff --git a/modules/ircv3_sasl/__init__.py b/modules/ircv3_sasl/__init__.py
deleted file mode 100644
index 9f7fac5f..00000000
--- a/modules/ircv3_sasl/__init__.py
+++ /dev/null
@@ -1,195 +0,0 @@
-#--depends-on config
-
-import base64, hashlib, hmac, typing, uuid
-from src import ModuleManager, utils
-from . import scram
-
-CAP = utils.irc.Capability("sasl")
-
-USERPASS_MECHANISMS = [
- "SCRAM-SHA-512",
- "SCRAM-SHA-256",
- "SCRAM-SHA-1",
- "PLAIN"
-]
-ALL_MECHANISMS = USERPASS_MECHANISMS+["EXTERNAL"]
-
-def _parse(value):
- mechanism, _, arguments = value.partition(" ")
- mechanism = mechanism.upper()
-
- if mechanism in ALL_MECHANISMS:
- return {"mechanism": mechanism.upper(), "args": arguments}
- else:
- raise utils.settings.SettingParseException(
- "Unknown SASL mechanism '%s'" % mechanism)
-
-SASL_TIMEOUT = 15 # 15 seconds
-
-HARDFAIL = utils.BoolSetting("sasl-hard-fail",
- "Set whether a SASL failure should cause a disconnect")
-
-@utils.export("serverset", utils.FunctionSetting(_parse, "sasl",
- "Set the sasl username/password for this server",
- example="PLAIN BitBot:hunter2", format=utils.sensitive_format))
-@utils.export("serverset", HARDFAIL)
-@utils.export("botset", HARDFAIL)
-class Module(ModuleManager.BaseModule):
- @utils.hook("new.server")
- def new_server(self, event):
- event["server"]._sasl_timeout = None
- event["server"]._sasl_retry = False
-
- def _best_userpass_mechanism(self, mechanisms):
- for potential_mechanism in USERPASS_MECHANISMS:
- if potential_mechanism in mechanisms:
- return potential_mechanism
-
- def _mech_match(self, server, server_mechanisms):
- our_sasl = server.get_setting("sasl", None)
- if not our_sasl:
- return None
-
- our_mechanism = our_sasl["mechanism"].upper()
-
- if not server_mechanisms and our_mechanism in ALL_MECHANISMS:
- return our_mechanism
- elif our_mechanism in server_mechanisms:
- return our_mechanism
- elif our_mechanism == "USERPASS":
- if server_mechanisms:
- return self._best_userpass_mechanism(server_mechanisms)
- else:
- return USERPASS_MECHANISMS[0]
- return None
-
- @utils.hook("received.cap.new")
- @utils.hook("received.cap.ls")
- def on_cap(self, event):
- has_sasl = "sasl" in event["capabilities"]
- if has_sasl:
- server_mechanisms = event["capabilities"]["sasl"]
- if server_mechanisms:
- server_mechanisms = server_mechanisms.split(",")
- else:
- server_mechanisms = []
-
- mechanism = self._mech_match(event["server"], server_mechanisms)
-
- if mechanism:
- cap = CAP.copy()
- cap.on_ack(
- lambda: self._sasl_ack(event["server"], mechanism))
- return cap
-
- def _sasl_ack(self, server, mechanism):
- server.send_authenticate(mechanism)
- server._sasl_timeout = self.timers.add("sasl-timeout",
- self._sasl_timeout, SASL_TIMEOUT, server=server)
- server._sasl_mechanism = mechanism
-
- server.wait_for_capability("sasl")
-
- def _sasl_timeout(self, timer):
- server = timer.kwargs["server"]
- self._panic(server, "SASL handshake timed out")
-
- @utils.hook("received.authenticate")
- def on_authenticate(self, event):
- sasl = event["server"].get_setting("sasl")
- mechanism = event["server"]._sasl_mechanism
-
- auth_text = None
- if mechanism == "PLAIN":
- if event["message"] != "+":
- event["server"].send_authenticate("*")
- else:
- sasl_username, sasl_password = sasl["args"].split(":", 1)
- auth_text = ("%s\0%s\0%s" % (
- sasl_username, sasl_username, sasl_password)).encode("utf8")
-
- elif mechanism == "EXTERNAL":
- if event["message"] != "+":
- event["server"].send_authenticate("*")
- else:
- auth_text = "+"
-
- elif mechanism.startswith("SCRAM-"):
-
- if event["message"] == "+":
- # start SCRAM handshake
-
- # create SCRAM helper
- sasl_username, sasl_password = sasl["args"].split(":", 1)
- algo = mechanism.split("SCRAM-", 1)[1]
- event["server"]._scram = scram.SCRAM(
- algo, sasl_username, sasl_password)
-
- # generate client-first-message
- auth_text = event["server"]._scram.client_first()
- else:
- current_scram = event["server"]._scram
- data = base64.b64decode(event["message"])
- if current_scram.state == scram.SCRAMState.ClientFirst:
- # use server-first-message to generate client-final-message
- auth_text = current_scram.server_first(data)
- elif current_scram.state == scram.SCRAMState.ClientFinal:
- # use server-final-message to check server proof
- verified = current_scram.server_final(data)
- del event["server"]._scram
-
- if verified:
- auth_text = "+"
- else:
- if current_scram.state == scram.SCRAMState.VerifyFailed:
- # server gave a bad verification so we should panic
- self._panic(event["server"], "SCRAM VerifyFailed")
-
- else:
- raise ValueError("unknown sasl mechanism '%s'" % mechanism)
-
- if not auth_text == None:
- if not auth_text == "+":
- auth_text = base64.b64encode(auth_text)
- auth_text = auth_text.decode("utf8")
- event["server"].send_authenticate(auth_text)
-
- def _end_sasl(self, server):
- server.capability_done("sasl")
- if not server._sasl_timeout == None:
- server._sasl_timeout.cancel()
- server._sasl_timeout = None
-
- @utils.hook("received.908")
- def sasl_mechanisms(self, event):
- server_mechanisms = event["line"].args[1].split(",")
- mechanism = self._mech_match(event["server"], server_mechanisms)
- if mechanism:
- event["server"]._sasl_mechanism = mechanism
- event["server"].send_authenticate(mechanism)
- event["server"]._sasl_retry = True
-
- @utils.hook("received.903")
- def sasl_success(self, event):
- self._end_sasl(event["server"])
- @utils.hook("received.904")
- def sasl_failure(self, event):
- if not event["server"]._sasl_retry:
- self._panic(event["server"], "ERR_SASLFAIL (%s)" %
- event["line"].args[1])
- else:
- event["server"]._sasl_retry = False
-
- @utils.hook("received.907")
- def sasl_already(self, event):
- self._end_sasl(event["server"])
-
- def _panic(self, server, message):
- if server.get_setting("sasl-hard-fail",
- self.bot.get_setting("sasl-hard-fail", False)):
- message = "SASL panic for %s: %s" % (str(server), message)
- self.log.error(message)
- self.bot.disconnect(server)
- else:
- self.log.warn("SASL failure for %s: %s" % (str(server), message))
- self._end_sasl(server)
diff --git a/modules/ircv3_sasl/scram.py b/modules/ircv3_sasl/scram.py
deleted file mode 100644
index f243d1e6..00000000
--- a/modules/ircv3_sasl/scram.py
+++ /dev/null
@@ -1,130 +0,0 @@
-import base64, enum, hashlib, hmac, os, typing
-
-# IANA Hash Function Textual Names
-# https://tools.ietf.org/html/rfc5802#section-4
-# https://www.iana.org/assignments/hash-function-text-names/
-# MD2 has been removed as it's unacceptably weak
-ALGORITHMS = [
- "MD5", "SHA-1", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
-
-SCRAM_ERRORS = [
- "invalid-encoding",
- "extensions-not-supported", # unrecognized 'm' value
- "invalid-proof",
- "channel-bindings-dont-match",
- "server-does-support-channel-binding",
- "channel-binding-not-supported",
- "unsupported-channel-binding-type",
- "unknown-user",
- "invalid-username-encoding", # invalid utf8 or bad SASLprep
- "no-resources"
-]
-
-def _scram_nonce() -> bytes:
- return base64.b64encode(os.urandom(32))
-def _scram_escape(s: bytes) -> bytes:
- return s.replace(b"=", b"=3D").replace(b",", b"=2C")
-def _scram_unescape(s: bytes) -> bytes:
- return s.replace(b"=3D", b"=").replace(b"=2C", b",")
-def _scram_xor(s1: bytes, s2: bytes) -> bytes:
- return bytes(a ^ b for a, b in zip(s1, s2))
-
-class SCRAMState(enum.Enum):
- Uninitialised = 0
- ClientFirst = 1
- ClientFinal = 2
- Success = 3
- Failed = 4
- VerifyFailed = 5
-
-class SCRAMError(Exception):
- pass
-
-class SCRAM(object):
- def __init__(self, algo: str, username: str, password: str):
- if not algo in ALGORITHMS:
- raise ValueError("Unknown SCRAM algorithm '%s'" % algo)
-
- self._algo = algo.replace("-", "") # SHA-1 -> SHA1
- self._username = username.encode("utf8")
- self._password = password.encode("utf8")
-
- self.state = SCRAMState.Uninitialised
- self.error = ""
- self.raw_error = ""
-
- self._client_first = b""
- self._salted_password = b""
- self._auth_message = b""
-
- def _get_pieces(self, data: bytes) -> typing.Dict[bytes, bytes]:
- pieces = (piece.split(b"=", 1) for piece in data.split(b","))
- return dict((piece[0], piece[1]) for piece in pieces)
-
- def _hmac(self, key: bytes, msg: bytes) -> bytes:
- return hmac.new(key, msg, self._algo).digest()
- def _hash(self, msg: bytes) -> bytes:
- return hashlib.new(self._algo, msg).digest()
-
- def _constant_time_compare(self, b1: bytes, b2: bytes):
- return hmac.compare_digest(b1, b2)
-
- def client_first(self) -> bytes:
- self.state = SCRAMState.ClientFirst
- self._client_first = b"n=%s,r=%s" % (
- _scram_escape(self._username), _scram_nonce())
-
- # n,,n=<username>,r=<nonce>
- return b"n,,%s" % self._client_first
-
- def server_first(self, data: bytes) -> bytes:
- self.state = SCRAMState.ClientFinal
-
- pieces = self._get_pieces(data)
- nonce = pieces[b"r"] # server combines your nonce with it's own
- salt = base64.b64decode(pieces[b"s"]) # salt is b64encoded
- iterations = int(pieces[b"i"])
-
- salted_password = hashlib.pbkdf2_hmac(self._algo, self._password,
- salt, iterations, dklen=None)
- self._salted_password = salted_password
-
- client_key = self._hmac(salted_password, b"Client Key")
- stored_key = self._hash(client_key)
-
- channel = base64.b64encode(b"n,,")
- auth_noproof = b"c=%s,r=%s" % (channel, nonce)
- auth_message = b"%s,%s,%s" % (self._client_first, data, auth_noproof)
- self._auth_message = auth_message
-
- client_signature = self._hmac(stored_key, auth_message)
- client_proof_xor = _scram_xor(client_key, client_signature)
- client_proof = base64.b64encode(client_proof_xor)
-
- # c=<b64encode("n,,")>,r=<nonce>,p=<proof>
- return b"%s,p=%s" % (auth_noproof, client_proof)
-
- def server_final(self, data: bytes) -> bool:
- pieces = self._get_pieces(data)
- if b"e" in pieces:
- error = pieces[b"e"].decode("utf8")
- self.raw_error = error
- if error in SCRAM_ERRORS:
- self.error = error
- else:
- self.error = "other-error"
-
- self.state = SCRAMState.Failed
- return False
-
- verifier = base64.b64decode(pieces[b"v"])
-
- server_key = self._hmac(self._salted_password, b"Server Key")
- server_signature = self._hmac(server_key, self._auth_message)
-
- if server_signature == verifier:
- self.state = SCRAMState.Success
- return True
- else:
- self.state = SCRAMState.VerifyFailed
- return False