aboutsummaryrefslogtreecommitdiff
path: root/modules/sasl/__init__.py
diff options
context:
space:
mode:
authorGravatar jesopo2019-06-03 12:44:04 +0100
committerGravatar jesopo2019-06-03 12:44:04 +0100
commit9a8b345c53e852d7092197cee084d0d3c02bc0ff (patch)
tree408c6833c2f4de7198c354043c8ca265c0616901 /modules/sasl/__init__.py
parentCheck from_self, not if target==is_own_nickname, use from_self when adding to (diff)
Prefix names for all IRCv3 modules with "ircv3_"
Diffstat (limited to 'modules/sasl/__init__.py')
-rw-r--r--modules/sasl/__init__.py147
1 files changed, 0 insertions, 147 deletions
diff --git a/modules/sasl/__init__.py b/modules/sasl/__init__.py
deleted file mode 100644
index b62309a6..00000000
--- a/modules/sasl/__init__.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#--depends-on config
-
-import base64, hashlib, hmac, uuid
-from src import ModuleManager, utils
-from . import scram
-
-CAP = utils.irc.Capability("sasl")
-
-USERPASS_MECHANISMS = [
- "SCRAM-SHA-512",
- "SCRAM-SHA-256",
- "SCRAM-SHA-1",
- "PLAIN"
-]
-
-def _validate(s):
- mechanism, _, arguments = s.partition(" ")
- return {"mechanism": mechanism, "args": arguments}
-
-@utils.export("serverset", {"setting": "sasl",
- "help": "Set the sasl username/password for this server",
- "validate": _validate, "example": "PLAIN BitBot:hunder2"})
-class Module(ModuleManager.BaseModule):
- def _best_userpass_mechanism(self, mechanisms):
- for potential_mechanism in USERPASS_MECHANISMS:
- if potential_mechanism in mechanisms:
- return potential_mechanism
-
- @utils.hook("received.cap.new")
- @utils.hook("received.cap.ls")
- def on_cap(self, event):
- has_sasl = "sasl" in event["capabilities"]
- our_sasl = event["server"].get_setting("sasl", None)
-
- do_sasl = False
- if has_sasl and our_sasl:
- if not event["capabilities"]["sasl"] == None:
- our_mechanism = our_sasl["mechanism"].upper()
- server_mechanisms = event["capabilities"]["sasl"].split(",")
- if our_mechanism == "USERPASS":
- our_mechanism = self._best_userpass_mechanism(
- server_mechanisms)
- do_sasl = our_mechanism in server_mechanisms
- else:
- do_sasl = True
-
- if do_sasl:
- cap = CAP.copy()
- cap.on_ack(lambda: self._sasl_ack(event["server"]))
- return cap
-
- def _sasl_ack(self, server):
- sasl = server.get_setting("sasl")
- mechanism = sasl["mechanism"].upper()
- if mechanism == "USERPASS":
- server_mechanisms = server.server_capabilities["sasl"]
- server_mechanisms = server_mechanisms or [
- USERPASS_MECHANISMS[0]]
- mechanism = self._best_userpass_mechanism(server_mechanisms)
-
- server.send_authenticate(mechanism)
- server.sasl_mechanism = mechanism
- server.wait_for_capability("sasl")
-
- @utils.hook("received.authenticate")
- def on_authenticate(self, event):
- sasl = event["server"].get_setting("sasl")
- mechanism = event["server"].sasl_mechanism
-
- auth_text = None
- if mechanism == "PLAIN":
- if event["message"] != "+":
- event["server"].send_authenticate("*")
- else:
- sasl_username, sasl_password = sasl["args"].split(":", 1)
- auth_text = ("%s\0%s\0%s" % (
- sasl_username, sasl_username, sasl_password)).encode("utf8")
-
- elif mechanism == "EXTERNAL":
- if event["message"] != "+":
- event["server"].send_authenticate("*")
- else:
- auth_text = "+"
-
- elif mechanism.startswith("SCRAM-"):
-
- if event["message"] == "+":
- # start SCRAM handshake
-
- # create SCRAM helper
- sasl_username, sasl_password = sasl["args"].split(":", 1)
- algo = mechanism.split("SCRAM-", 1)[1]
- event["server"]._scram = scram.SCRAM(
- algo, sasl_username, sasl_password)
-
- # generate client-first-message
- auth_text = event["server"]._scram.client_first()
- else:
- current_scram = event["server"]._scram
- data = base64.b64decode(event["message"])
- if current_scram.state == scram.SCRAMState.ClientFirst:
- # use server-first-message to generate client-final-message
- auth_text = current_scram.server_first(data)
- elif current_scram.state == scram.SCRAMState.ClientFinal:
- # use server-final-message to check server proof
- verified = current_scram.server_final(data)
- del event["server"]._scram
-
- if verified:
- auth_text = "+"
- else:
- if current_scram.state == scram.SCRAMState.VerifyFailed:
- # server gave a bad verification so we should panic
- event["server"].disconnect()
- raise ValueError("Server SCRAM verification failed")
-
- else:
- raise ValueError("unknown sasl mechanism '%s'" % mechanism)
-
- if not auth_text == None:
- if not auth_text == "+":
- auth_text = base64.b64encode(auth_text)
- auth_text = auth_text.decode("utf8")
- event["server"].send_authenticate(auth_text)
-
- def _end_sasl(self, server):
- server.capability_done("sasl")
-
- @utils.hook("received.908")
- def sasl_mechanisms(self, event):
- server_mechanisms = event["args"][1].split(",")
- mechanism = self._best_userpass_mechanism(server_mechanimsms)
- event["server"].sasl_mechanism = mechanism
- event["server"].send_authenticate(mechanism)
-
- @utils.hook("received.903")
- def sasl_success(self, event):
- self._end_sasl(event["server"])
- @utils.hook("received.904")
- def sasl_failure(self, event):
- self.log.warn("SASL failure for %s: %s",
- [str(event["server"]), event["args"][1]])
- self._end_sasl(event["server"])
-
- @utils.hook("received.907")
- def sasl_already(self, event):
- self._end_sasl(event["server"])