aboutsummaryrefslogtreecommitdiff
path: root/src/core_modules/permissions
diff options
context:
space:
mode:
authorGravatar jesopo2019-12-10 05:27:35 +0000
committerGravatar jesopo2019-12-10 05:27:35 +0000
commit638eee0d685c06d258cb55287204ca97bca7c344 (patch)
tree33442439317ae2846f1efb7674b7a3758c8990a1 /src/core_modules/permissions
parentmove sys.exit() codes to an enum in utils.consts (diff)
signature
move core modules to src/core_modules, make them uneffected by white/black list
Diffstat (limited to 'src/core_modules/permissions')
-rw-r--r--src/core_modules/permissions/__init__.py357
1 files changed, 357 insertions, 0 deletions
diff --git a/src/core_modules/permissions/__init__.py b/src/core_modules/permissions/__init__.py
new file mode 100644
index 00000000..0559774c
--- /dev/null
+++ b/src/core_modules/permissions/__init__.py
@@ -0,0 +1,357 @@
+#--depends-on commands
+
+import base64, binascii, os
+import scrypt
+from src import ModuleManager, utils
+
+HOSTMASKS_SETTING = "hostmask-account"
+NO_PERMISSION = "You do not have permission to do that"
+
+class Module(ModuleManager.BaseModule):
+ def on_load(self):
+ self.exports.add("is-identified", self._is_identified)
+ self.exports.add("account-name", self._account_name)
+
+ @utils.hook("new.server")
+ def new_server(self, event):
+ event["server"]._hostmasks = {}
+
+ for account, user_hostmasks in event["server"].get_all_user_settings(
+ HOSTMASKS_SETTING):
+ for hostmask in user_hostmasks:
+ self._add_hostmask(event["server"],
+ utils.irc.hostmask_parse(hostmask), account)
+
+ def _add_hostmask(self, server, hostmask, account):
+ server._hostmasks[hostmask.original] = (hostmask, account)
+ def _remove_hostmask(self, server, hostmask):
+ if hostmask in server._hostmasks:
+ del server._hostmasks[hostmask]
+
+ def _make_salt(self):
+ return base64.b64encode(os.urandom(64)).decode("utf8")
+
+ def _random_password(self):
+ return binascii.hexlify(os.urandom(32)).decode("utf8")
+
+ def _make_hash(self, password, salt=None):
+ salt = salt or self._make_salt()
+ hash = base64.b64encode(scrypt.hash(password, salt)).decode("utf8")
+ return hash, salt
+
+ def _get_hash(self, server, account):
+ hash, salt = server.get_user(account).get_setting("authentication",
+ (None, None))
+ return hash, salt
+
+ def _master_password(self):
+ master_password = self._random_password()
+ hash, salt = self._make_hash(master_password)
+ self.bot.set_setting("master-password", [hash, salt])
+ return master_password
+ @utils.hook("control.master-password")
+ def command_line(self, event):
+ master_password = self._master_password()
+ return "One-time master password: %s" % master_password
+
+ def _has_identified(self, server, user, account):
+ user._id_override = server.get_user_id(account)
+ def _is_identified(self, user):
+ return not user._id_override == None
+ def _signout(self, user):
+ user._id_override = None
+
+ def _find_hostmask(self, server, user):
+ user_hostmask = user.hostmask()
+ for hostmask, (hostmask_pattern, account) in server._hostmasks.items():
+ if utils.irc.hostmask_match(user_hostmask, hostmask_pattern):
+ return (hostmask, account)
+ def _specific_hostmask(self, server, hostmask, account):
+ for user in server.users.values():
+ if utils.irc.hostmask_match(user.hostmask(), hostmask):
+ if account == None:
+ user._hostmask_account = None
+ self._signout(user)
+ else:
+ user._hostmask_account = (hostmask, account)
+ self._has_identified(server, user, account)
+
+ def _account_name(self, user):
+ if not user.account == None:
+ return user.account
+ elif not user._account_override == None:
+ return user._account_override
+ elif not user._hostmask_account == None:
+ return user._hostmask_account[1]
+
+ @utils.hook("new.user")
+ def new_user(self, event):
+ event["user"]._hostmask_account = None
+ event["user"]._account_override = None
+ event["user"]._master_admin = False
+
+ def _set_hostmask(self, server, user):
+ account = self._find_hostmask(server, user)
+ if not account == None:
+ hostmask, account = account
+ user._hostmask_account = (hostmask, account)
+ self._has_identified(server, user, account)
+
+ @utils.hook("received.chghost")
+ @utils.hook("received.nick")
+ @utils.hook("received.who")
+ @utils.hook("received.whox")
+ @utils.hook("received.message.private")
+ def chghost(self, event):
+ if not self._is_identified(event["user"]):
+ self._set_hostmask(event["server"], event["user"])
+ @utils.hook("received.whox")
+ @utils.hook("received.account")
+ @utils.hook("received.account.login")
+ @utils.hook("received.account.logout")
+ @utils.hook("received.join")
+ def check_account(self, event):
+ if not self._is_identified(event["user"]):
+ if event["user"].account:
+ self._has_identified(event["server"], event["user"],
+ event["user"].account)
+ else:
+ self._set_hostmask(event["server"], event["user"])
+
+ def _get_permissions(self, user):
+ if self._is_identified(user):
+ return user.get_setting("permissions", [])
+ return []
+
+ def _has_permission(self, user, permission):
+ if user._master_admin:
+ return True
+
+ permissions = self._get_permissions(user)
+ if permission in permissions:
+ return True
+ else:
+ permission_parts = permission.split(".")
+ for user_permission in permissions:
+ user_permission_parts = user_permission.split(".")
+ for i, part in enumerate(permission_parts):
+ last = i==(len(permission_parts)-1)
+ user_last = i==(len(user_permission_parts)-1)
+ if not permission_parts[i] == user_permission_parts[i]:
+ if user_permission_parts[i] == "*" and user_last:
+ return True
+ else:
+ break
+ else:
+ if last and user_last:
+ return True
+ return False
+
+ @utils.hook("received.command.masterlogin")
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("private_only", True)
+ def master_login(self, event):
+ saved_hash, saved_salt = self.bot.get_setting("master-password",
+ (None, None))
+ if saved_hash and saved_salt:
+ given_hash, _ = self._make_hash(event["args"], saved_salt)
+ if utils.security.constant_time_compare(given_hash, saved_hash):
+ self.bot.del_setting("master-password")
+ event["user"]._master_admin = True
+ event["stdout"].write("Master login successful")
+ return
+ event["stderr"].write("Master login failed")
+
+ @utils.hook("received.command.mypermissions")
+ @utils.kwarg("authenticated", True)
+ def my_permissions(self, event):
+ """
+ :help: Show your permissions
+ """
+ permissions = event["user"].get_setting("permissions", [])
+ event["stdout"].write("Your permissions: %s" % ", ".join(permissions))
+
+
+ @utils.hook("received.command.register", private_only=True, min_args=1)
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("private_only", True)
+ @utils.kwarg("help", "Register your nickname")
+ @utils.kwarg("usage", "<password>")
+ def register(self, event):
+ hash, salt = self._get_hash(event["server"], event["user"].nickname)
+ if not hash and not salt:
+ password = event["args"]
+ hash, salt = self._make_hash(password)
+ event["user"].set_setting("authentication", [hash, salt])
+
+ event["user"]._account_override = event["user"].nickname
+ self._has_identified(event["server"], event["user"],
+ event["user"].nickname)
+
+ event["stdout"].write("Nickname registered successfully")
+ else:
+ event["stderr"].write("This nickname is already registered")
+
+ @utils.hook("received.command.identify", private_only=True, min_args=1)
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("private_only", True)
+ @utils.kwarg("help", "Identify for your current nickname")
+ @utils.kwarg("usage", "[account] <password>")
+ def identify(self, event):
+ if not event["user"].channels:
+ raise utils.EventError("You must share at least one channel "
+ "with me before you can identify")
+
+ if not self._is_identified(event["user"]):
+ if len(event["args_split"]) > 1:
+ account = event["args_split"][0]
+ password = " ".join(event["args_split"][1:])
+ else:
+ account = event["user"].nickname
+ password = event["args"]
+
+ hash, salt = self._get_hash(event["server"], account)
+ if hash and salt:
+ attempt, _ = self._make_hash(password, salt)
+ if utils.security.constant_time_compare(attempt, hash):
+ event["user"]._account_override = account
+ self._has_identified(event["server"], event["user"], account)
+
+ event["stdout"].write("Correct password, you have "
+ "been identified as %s." % account)
+ self.events.on("internal.identified").call(
+ user=event["user"])
+ else:
+ event["stderr"].write("Incorrect password for '%s'" %
+ account)
+ else:
+ event["stderr"].write("Account '%s' is not registered" %
+ account)
+ else:
+ event["stderr"].write("You are already identified as %s" %
+ self._account_name(event["user"]))
+
+ @utils.hook("received.command.permission")
+ @utils.kwarg("min_args", 2)
+ @utils.kwarg("usage", "list <account>")
+ @utils.kwarg("usage", "clear <account>")
+ @utils.kwarg("usage", "add <account> <permission>")
+ @utils.kwarg("usage", "remove <account> <permission>")
+ @utils.kwarg("permission", "permissions.change")
+ def permission(self, event):
+ subcommand = event["args_split"][0].lower()
+ account = event["args_split"][1]
+ target_user = event["server"].get_user(account)
+
+ if subcommand == "list":
+ event["stdout"].write("Permissions for %s: %s" % (
+ account, ", ".join(self._get_permissions(target_user))))
+ elif subcommand == "clear":
+ if not self._get_permissions(target_user):
+ raise utils.EventError("%s has no permissions" % account)
+ target_user.del_setting("permissions")
+ event["stdout"].write("Cleared permissions for %s" % account)
+ else:
+ permissions = event["args_split"][2:]
+ if not permissions:
+ raise utils.EventError("Please provide at least 1 permission")
+ user_permissions = self._get_permissions(target_user)
+
+ if subcommand == "add":
+ new = list(set(permissions)-set(user_permissions))
+ if not new:
+ raise utils.EventError("No new permissions to give")
+ target_user.set_setting("permissions", user_permissions+new)
+ event["stdout"].write("Gave %s new permissions: %s" %
+ (account, ", ".join(new)))
+ elif subcommand == "remove":
+ permissions_set = set(permissions)
+ user_permissions_set = set(user_permissions)
+ removed = list(user_permissions_set&permissions_set)
+ if not (user_permissions_set & permissions_set):
+ raise utils.EventError("New permissions to remove")
+ change = list(user_permissions_set - permissions_set)
+
+ if not change:
+ target_user.del_setting("permissions")
+ else:
+ target_user.set_setting("permissions", change)
+ event["stdout"].write("Removed permissions from %s: %s" %
+ (account, ", ".join(change)))
+ else:
+ raise utils.EventError("Unknown subcommand %s" % subcommand)
+
+ @utils.hook("received.command.hostmask")
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("authenticated", True)
+ @utils.kwarg("usage", "list")
+ @utils.kwarg("usage", "add [hostmask]")
+ @utils.kwarg("usage", "remove [hostmask]")
+ def hostmask(self, event):
+ subcommand = event["args_split"][0].lower()
+ hostmasks = event["user"].get_setting(HOSTMASKS_SETTING, [])
+
+ if subcommand == "list":
+ event["stdout"].write("Your hostmasks: %s" % ", ".join(hostmasks))
+ else:
+ if event["args_split"][1:]:
+ hostmask = event["args_split"][1]
+ else:
+ hostmask = "*!%s" % event["user"].userhost()
+ account = self._account_name(event["user"])
+
+ if subcommand == "add":
+ if hostmask in hostmasks:
+ raise utils.EventError(
+ "Hostmask %s is already on your account" % hostmask)
+ hostmasks.append(hostmask)
+ event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
+
+ hostmask_obj = utils.irc.hostmask_parse(hostmaks)
+ self._specific_hostmask(event["server"], hostmask_obj, account)
+ self._add_hostmask(event["server"], hostmask_obj, account)
+
+ event["stdout"].write("Added %s to your hostmasks" % hostmask)
+ elif subcommand == "remove":
+ if not hostmask in hostmasks:
+ raise utils.EventError("Hostmask %s is not on your account"
+ % hostmask)
+ while hostmask in hostmasks:
+ hostmasks.remove(hostmask)
+ event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
+
+ self._specific_hostmask(event["server"], hostmask, None)
+ self._remove_hostmask(event["server"], hostmask)
+
+ event["stdout"].write("Removed %s from your hostmasks"
+ % hostmask)
+ else:
+ raise utils.EventError("Unknown subcommand %s" % subcommand)
+
+ def _assert(self, allowed):
+ if allowed:
+ return utils.consts.PERMISSION_FORCE_SUCCESS, None
+ else:
+ return utils.consts.PERMISSION_ERROR, NO_PERMISSION
+
+ @utils.hook("preprocess.command")
+ def preprocess_command(self, event):
+ allowed = None
+ permission = event["hook"].get_kwarg("permission", None)
+ authenticated = event["hook"].get_kwarg("authenticated", False)
+ if not permission == None:
+ allowed = self._has_permission(event["user"], permission)
+ elif authenticated:
+ allowed = self._is_identified(event["user"])
+ else:
+ return
+
+ return self._assert(allowed)
+
+ @utils.hook("check.command.permission")
+ def check_permission(self, event):
+ return self._assert(
+ self._has_permission(event["user"], event["request_args"][0]))
+ @utils.hook("check.command.authenticated")
+ def check_authenticated(self, event):
+ return self._assert(self._is_identified(event["user"]))