aboutsummaryrefslogtreecommitdiff
path: root/modules/permissions/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/permissions/__init__.py')
-rw-r--r--modules/permissions/__init__.py357
1 files changed, 0 insertions, 357 deletions
diff --git a/modules/permissions/__init__.py b/modules/permissions/__init__.py
deleted file mode 100644
index 0559774c..00000000
--- a/modules/permissions/__init__.py
+++ /dev/null
@@ -1,357 +0,0 @@
-#--depends-on commands
-
-import base64, binascii, os
-import scrypt
-from src import ModuleManager, utils
-
-HOSTMASKS_SETTING = "hostmask-account"
-NO_PERMISSION = "You do not have permission to do that"
-
-class Module(ModuleManager.BaseModule):
- def on_load(self):
- self.exports.add("is-identified", self._is_identified)
- self.exports.add("account-name", self._account_name)
-
- @utils.hook("new.server")
- def new_server(self, event):
- event["server"]._hostmasks = {}
-
- for account, user_hostmasks in event["server"].get_all_user_settings(
- HOSTMASKS_SETTING):
- for hostmask in user_hostmasks:
- self._add_hostmask(event["server"],
- utils.irc.hostmask_parse(hostmask), account)
-
- def _add_hostmask(self, server, hostmask, account):
- server._hostmasks[hostmask.original] = (hostmask, account)
- def _remove_hostmask(self, server, hostmask):
- if hostmask in server._hostmasks:
- del server._hostmasks[hostmask]
-
- def _make_salt(self):
- return base64.b64encode(os.urandom(64)).decode("utf8")
-
- def _random_password(self):
- return binascii.hexlify(os.urandom(32)).decode("utf8")
-
- def _make_hash(self, password, salt=None):
- salt = salt or self._make_salt()
- hash = base64.b64encode(scrypt.hash(password, salt)).decode("utf8")
- return hash, salt
-
- def _get_hash(self, server, account):
- hash, salt = server.get_user(account).get_setting("authentication",
- (None, None))
- return hash, salt
-
- def _master_password(self):
- master_password = self._random_password()
- hash, salt = self._make_hash(master_password)
- self.bot.set_setting("master-password", [hash, salt])
- return master_password
- @utils.hook("control.master-password")
- def command_line(self, event):
- master_password = self._master_password()
- return "One-time master password: %s" % master_password
-
- def _has_identified(self, server, user, account):
- user._id_override = server.get_user_id(account)
- def _is_identified(self, user):
- return not user._id_override == None
- def _signout(self, user):
- user._id_override = None
-
- def _find_hostmask(self, server, user):
- user_hostmask = user.hostmask()
- for hostmask, (hostmask_pattern, account) in server._hostmasks.items():
- if utils.irc.hostmask_match(user_hostmask, hostmask_pattern):
- return (hostmask, account)
- def _specific_hostmask(self, server, hostmask, account):
- for user in server.users.values():
- if utils.irc.hostmask_match(user.hostmask(), hostmask):
- if account == None:
- user._hostmask_account = None
- self._signout(user)
- else:
- user._hostmask_account = (hostmask, account)
- self._has_identified(server, user, account)
-
- def _account_name(self, user):
- if not user.account == None:
- return user.account
- elif not user._account_override == None:
- return user._account_override
- elif not user._hostmask_account == None:
- return user._hostmask_account[1]
-
- @utils.hook("new.user")
- def new_user(self, event):
- event["user"]._hostmask_account = None
- event["user"]._account_override = None
- event["user"]._master_admin = False
-
- def _set_hostmask(self, server, user):
- account = self._find_hostmask(server, user)
- if not account == None:
- hostmask, account = account
- user._hostmask_account = (hostmask, account)
- self._has_identified(server, user, account)
-
- @utils.hook("received.chghost")
- @utils.hook("received.nick")
- @utils.hook("received.who")
- @utils.hook("received.whox")
- @utils.hook("received.message.private")
- def chghost(self, event):
- if not self._is_identified(event["user"]):
- self._set_hostmask(event["server"], event["user"])
- @utils.hook("received.whox")
- @utils.hook("received.account")
- @utils.hook("received.account.login")
- @utils.hook("received.account.logout")
- @utils.hook("received.join")
- def check_account(self, event):
- if not self._is_identified(event["user"]):
- if event["user"].account:
- self._has_identified(event["server"], event["user"],
- event["user"].account)
- else:
- self._set_hostmask(event["server"], event["user"])
-
- def _get_permissions(self, user):
- if self._is_identified(user):
- return user.get_setting("permissions", [])
- return []
-
- def _has_permission(self, user, permission):
- if user._master_admin:
- return True
-
- permissions = self._get_permissions(user)
- if permission in permissions:
- return True
- else:
- permission_parts = permission.split(".")
- for user_permission in permissions:
- user_permission_parts = user_permission.split(".")
- for i, part in enumerate(permission_parts):
- last = i==(len(permission_parts)-1)
- user_last = i==(len(user_permission_parts)-1)
- if not permission_parts[i] == user_permission_parts[i]:
- if user_permission_parts[i] == "*" and user_last:
- return True
- else:
- break
- else:
- if last and user_last:
- return True
- return False
-
- @utils.hook("received.command.masterlogin")
- @utils.kwarg("min_args", 1)
- @utils.kwarg("private_only", True)
- def master_login(self, event):
- saved_hash, saved_salt = self.bot.get_setting("master-password",
- (None, None))
- if saved_hash and saved_salt:
- given_hash, _ = self._make_hash(event["args"], saved_salt)
- if utils.security.constant_time_compare(given_hash, saved_hash):
- self.bot.del_setting("master-password")
- event["user"]._master_admin = True
- event["stdout"].write("Master login successful")
- return
- event["stderr"].write("Master login failed")
-
- @utils.hook("received.command.mypermissions")
- @utils.kwarg("authenticated", True)
- def my_permissions(self, event):
- """
- :help: Show your permissions
- """
- permissions = event["user"].get_setting("permissions", [])
- event["stdout"].write("Your permissions: %s" % ", ".join(permissions))
-
-
- @utils.hook("received.command.register", private_only=True, min_args=1)
- @utils.kwarg("min_args", 1)
- @utils.kwarg("private_only", True)
- @utils.kwarg("help", "Register your nickname")
- @utils.kwarg("usage", "<password>")
- def register(self, event):
- hash, salt = self._get_hash(event["server"], event["user"].nickname)
- if not hash and not salt:
- password = event["args"]
- hash, salt = self._make_hash(password)
- event["user"].set_setting("authentication", [hash, salt])
-
- event["user"]._account_override = event["user"].nickname
- self._has_identified(event["server"], event["user"],
- event["user"].nickname)
-
- event["stdout"].write("Nickname registered successfully")
- else:
- event["stderr"].write("This nickname is already registered")
-
- @utils.hook("received.command.identify", private_only=True, min_args=1)
- @utils.kwarg("min_args", 1)
- @utils.kwarg("private_only", True)
- @utils.kwarg("help", "Identify for your current nickname")
- @utils.kwarg("usage", "[account] <password>")
- def identify(self, event):
- if not event["user"].channels:
- raise utils.EventError("You must share at least one channel "
- "with me before you can identify")
-
- if not self._is_identified(event["user"]):
- if len(event["args_split"]) > 1:
- account = event["args_split"][0]
- password = " ".join(event["args_split"][1:])
- else:
- account = event["user"].nickname
- password = event["args"]
-
- hash, salt = self._get_hash(event["server"], account)
- if hash and salt:
- attempt, _ = self._make_hash(password, salt)
- if utils.security.constant_time_compare(attempt, hash):
- event["user"]._account_override = account
- self._has_identified(event["server"], event["user"], account)
-
- event["stdout"].write("Correct password, you have "
- "been identified as %s." % account)
- self.events.on("internal.identified").call(
- user=event["user"])
- else:
- event["stderr"].write("Incorrect password for '%s'" %
- account)
- else:
- event["stderr"].write("Account '%s' is not registered" %
- account)
- else:
- event["stderr"].write("You are already identified as %s" %
- self._account_name(event["user"]))
-
- @utils.hook("received.command.permission")
- @utils.kwarg("min_args", 2)
- @utils.kwarg("usage", "list <account>")
- @utils.kwarg("usage", "clear <account>")
- @utils.kwarg("usage", "add <account> <permission>")
- @utils.kwarg("usage", "remove <account> <permission>")
- @utils.kwarg("permission", "permissions.change")
- def permission(self, event):
- subcommand = event["args_split"][0].lower()
- account = event["args_split"][1]
- target_user = event["server"].get_user(account)
-
- if subcommand == "list":
- event["stdout"].write("Permissions for %s: %s" % (
- account, ", ".join(self._get_permissions(target_user))))
- elif subcommand == "clear":
- if not self._get_permissions(target_user):
- raise utils.EventError("%s has no permissions" % account)
- target_user.del_setting("permissions")
- event["stdout"].write("Cleared permissions for %s" % account)
- else:
- permissions = event["args_split"][2:]
- if not permissions:
- raise utils.EventError("Please provide at least 1 permission")
- user_permissions = self._get_permissions(target_user)
-
- if subcommand == "add":
- new = list(set(permissions)-set(user_permissions))
- if not new:
- raise utils.EventError("No new permissions to give")
- target_user.set_setting("permissions", user_permissions+new)
- event["stdout"].write("Gave %s new permissions: %s" %
- (account, ", ".join(new)))
- elif subcommand == "remove":
- permissions_set = set(permissions)
- user_permissions_set = set(user_permissions)
- removed = list(user_permissions_set&permissions_set)
- if not (user_permissions_set & permissions_set):
- raise utils.EventError("New permissions to remove")
- change = list(user_permissions_set - permissions_set)
-
- if not change:
- target_user.del_setting("permissions")
- else:
- target_user.set_setting("permissions", change)
- event["stdout"].write("Removed permissions from %s: %s" %
- (account, ", ".join(change)))
- else:
- raise utils.EventError("Unknown subcommand %s" % subcommand)
-
- @utils.hook("received.command.hostmask")
- @utils.kwarg("min_args", 1)
- @utils.kwarg("authenticated", True)
- @utils.kwarg("usage", "list")
- @utils.kwarg("usage", "add [hostmask]")
- @utils.kwarg("usage", "remove [hostmask]")
- def hostmask(self, event):
- subcommand = event["args_split"][0].lower()
- hostmasks = event["user"].get_setting(HOSTMASKS_SETTING, [])
-
- if subcommand == "list":
- event["stdout"].write("Your hostmasks: %s" % ", ".join(hostmasks))
- else:
- if event["args_split"][1:]:
- hostmask = event["args_split"][1]
- else:
- hostmask = "*!%s" % event["user"].userhost()
- account = self._account_name(event["user"])
-
- if subcommand == "add":
- if hostmask in hostmasks:
- raise utils.EventError(
- "Hostmask %s is already on your account" % hostmask)
- hostmasks.append(hostmask)
- event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
-
- hostmask_obj = utils.irc.hostmask_parse(hostmaks)
- self._specific_hostmask(event["server"], hostmask_obj, account)
- self._add_hostmask(event["server"], hostmask_obj, account)
-
- event["stdout"].write("Added %s to your hostmasks" % hostmask)
- elif subcommand == "remove":
- if not hostmask in hostmasks:
- raise utils.EventError("Hostmask %s is not on your account"
- % hostmask)
- while hostmask in hostmasks:
- hostmasks.remove(hostmask)
- event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
-
- self._specific_hostmask(event["server"], hostmask, None)
- self._remove_hostmask(event["server"], hostmask)
-
- event["stdout"].write("Removed %s from your hostmasks"
- % hostmask)
- else:
- raise utils.EventError("Unknown subcommand %s" % subcommand)
-
- def _assert(self, allowed):
- if allowed:
- return utils.consts.PERMISSION_FORCE_SUCCESS, None
- else:
- return utils.consts.PERMISSION_ERROR, NO_PERMISSION
-
- @utils.hook("preprocess.command")
- def preprocess_command(self, event):
- allowed = None
- permission = event["hook"].get_kwarg("permission", None)
- authenticated = event["hook"].get_kwarg("authenticated", False)
- if not permission == None:
- allowed = self._has_permission(event["user"], permission)
- elif authenticated:
- allowed = self._is_identified(event["user"])
- else:
- return
-
- return self._assert(allowed)
-
- @utils.hook("check.command.permission")
- def check_permission(self, event):
- return self._assert(
- self._has_permission(event["user"], event["request_args"][0]))
- @utils.hook("check.command.authenticated")
- def check_authenticated(self, event):
- return self._assert(self._is_identified(event["user"]))