aboutsummaryrefslogtreecommitdiff
path: root/modules/permissions/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/permissions/__init__.py')
-rw-r--r--modules/permissions/__init__.py454
1 files changed, 226 insertions, 228 deletions
diff --git a/modules/permissions/__init__.py b/modules/permissions/__init__.py
index 639c1b01..1be221c4 100644
--- a/modules/permissions/__init__.py
+++ b/modules/permissions/__init__.py
@@ -1,52 +1,20 @@
-#--depends-on commands
-#--depends-on config
-
import base64, binascii, os
import scrypt
from src import ModuleManager, utils
-REQUIRES_IDENTIFY = "You need to be identified to use that command"
-REQUIRES_IDENTIFY_INTERNAL = ("You need to be identified to use that command "
- "(/msg %s register | /msg %s identify)")
+HOSTMASKS_SETTING = "hostmask-account"
+NO_PERMISSION = "You do not have permission to do that"
-@utils.export("serverset", utils.OptionsSetting(["internal", "ircv3-account"],
- "identity-mechanism", "Set the identity mechanism for this server"))
class Module(ModuleManager.BaseModule):
- @utils.hook("new.user")
- def new_user(self, event):
- self._logout(event["user"])
- event["user"].admin_master = False
-
- def _master_password(self):
- master_password = self._random_password()
- hash, salt = self._make_hash(master_password)
- self.bot.set_setting("master-password", [hash, salt])
- return master_password
-
- @utils.hook("control.master-password")
- def command_line(self, event):
- master_password = self._master_password()
- return "One-time master password: %s" % master_password
- @utils.hook("received.command.masterpassword", private_only=True)
- def master_password(self, event):
- """
- :permission: master-password
- """
- master_password = self._master_password()
- event["stdout"].write("One-time master password: %s" %
- master_password)
+ @utils.hook("new.server")
+ def new_server(self, event):
+ hostmasks = {}
- @utils.hook("received.part")
- def on_part(self, event):
- if len(event["user"].channels) == 0 and event["user"
- ].identified_account_override:
- event["user"].send_notice("You no longer share any channels "
- "with me so you have been signed out")
-
- def _get_hash(self, server, account):
- hash, salt = server.get_user(account).get_setting("authentication",
- (None, None))
- return hash, salt
+ for account, user_hostmasks in event["server"].get_all_user_settings(
+ HOSTMASKS_SETTING):
+ for hostmask in user_hostmasks:
+ hostmasks[hostmask] = account
+ event["server"]._hostmasks = hostmasks
def _make_salt(self):
return base64.b64encode(os.urandom(64)).decode("utf8")
@@ -59,46 +27,135 @@ class Module(ModuleManager.BaseModule):
hash = base64.b64encode(scrypt.hash(password, salt)).decode("utf8")
return hash, salt
- def _identified(self, server, user, account):
- user.identified_account_override = account
- user.identified_account_id_override = server.get_user(account).get_id()
+ def _get_hash(self, server, account):
+ hash, salt = server.get_user(account).get_setting("authentication",
+ (None, None))
+ return hash, salt
- def _logout(self, user):
- user.identified_account_override = None
- user.identified_account_id_override = None
+ def _has_identified(self, server, user, account):
+ user._id_override = server.get_user_id(account)
+ def _is_identified(self, user):
+ return not user._id_override == None
+ def _signout(self, user):
+ user._id_override = None
- @utils.hook("received.command.masterlogin", private_only=True, min_args=1)
- def master_login(self, event):
- saved_hash, saved_salt = self.bot.get_setting("master-password",
- (None, None))
+ def _find_hostmask(self, server, user):
+ user_hostmask = user.hostmask()
+ for hostmask in server._hostmasks.keys():
+ if utils.irc.hostmask_match(user_hostmask, hostmask):
+ return (hostmask, server._hostmasks[hostmask])
+ def _specific_hostmask(self, server, hostmask, account):
+ for user in server.users.values():
+ if utils.irc.hostmask_match(user.hostmask(), hostmask):
+ if account == None:
+ user._hostmask_account = None
+ self._signout(user)
+ else:
+ user._hostmask_account = (hostmask, account)
+ self._has_identified(server, user, account)
+
+ def _account_name(self, user):
+ if not user.account == None:
+ return user.account
+ elif not user._account_override == None:
+ return user._account_override
+ elif not user._hostmask_account == None:
+ return user._hostmask_account[1]
- if saved_hash and saved_salt:
- given_hash, _ = self._make_hash(event["args"], saved_salt)
- if utils.security.constant_time_compare(given_hash, saved_hash):
- self.bot.del_setting("master-password")
- event["user"].admin_master = True
- event["stdout"].write("Master login successful")
- return
- event["stderr"].write("Master login failed")
+ @utils.hook("new.user")
+ def new_user(self, event):
+ event["user"]._hostmask_account = None
+ event["user"]._account_override = None
+ def _set_hostmask(self, server, user):
+ account = self._find_hostmask(server, user)
+ if not account == None:
+ hostmask, account = account
+ user._hostmask_account = (hostmask, account)
+ self._has_identified(server, user, account)
- @utils.hook("received.command.identify", private_only=True, min_args=1)
- def identify(self, event):
+ @utils.hook("received.chghost")
+ @utils.hook("received.nick")
+ def chghost(self, event):
+ if not self._is_identified(event["user"]):
+ self._set_hostmask(event["server"], event["user"])
+ @utils.hook("received.whox")
+ @utils.hook("received.account")
+ @utils.hook("received.account.login")
+ @utils.hook("received.account.logout")
+ @utils.hook("received.join")
+ def check_account(self, event):
+ if not self._is_identified(event["user"]):
+ if event["user"].account:
+ self._has_identified(event["server"], event["user"],
+ event["user"].account)
+ else:
+ self._set_hostmask(event["server"], event["user"])
+
+ def _get_permissions(self, user):
+ if self._is_identified(user):
+ return user.get_setting("permissions", [])
+ return []
+
+ def _has_permission(self, user, permission):
+ permissions = self._get_permissions(user)
+ if permission in permissions:
+ return True
+ else:
+ permission_parts = permission.split(".")
+ for user_permission in permissions:
+ user_permission_parts = user_permission.split(".")
+ for i, part in enumerate(permission_parts):
+ last = i==(len(permission_parts)-1)
+ user_last = i==(len(user_permission_parts)-1)
+ if not permission_parts[i] == user_permission_parts[i]:
+ if user_permission_parts == "*" and user_last:
+ return True
+ else:
+ break
+ else:
+ if last and user_last:
+ return True
+ return False
+
+ @utils.hook("received.command.mypermissions")
+ @utils.kwarg("authenticated", True)
+ def my_permissions(self, event):
"""
- :help: Identify yourself
- :usage: [account] <password>
+ :help: Show your permissions
"""
- identity_mechanism = event["server"].get_setting("identity-mechanism",
- "internal")
- if not identity_mechanism == "internal":
- raise utils.EventError("The 'identify' command isn't available "
- "on this network")
+ permissions = event["user"].get_setting("permissions", [])
+ event["stdout"].write("Your permissions: %s" % ", ".join(permissions))
+
+ @utils.hook("received.command.register", private_only=True, min_args=1)
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("private_only", True)
+ @utils.kwarg("help", "Register your nickname")
+ @utils.kwarg("usage", "<password>")
+ def register(self, event):
+ hash, salt = self._get_hash(event["server"], event["user"].nickname)
+ if not hash and not salt:
+ password = event["args"]
+ hash, salt = self._make_hash(password)
+ event["user"].set_setting("authentication", [hash, salt])
+ self._has_identified(event["server"], event["user"],
+ event["user"].nickname)
+ event["stdout"].write("Nickname registered successfully")
+ else:
+ event["stderr"].write("This nickname is already registered")
+
+ @utils.hook("received.command.identify", private_only=True, min_args=1)
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("private_only", True)
+ @utils.kwarg("help", "Identify for your current nickname")
+ @utils.kwarg("usage", "[account] <password>")
+ def identify(self, event):
if not event["user"].channels:
raise utils.EventError("You must share at least one channel "
"with me before you can identify")
- if not event["user"].identified_account_override:
+ if not self._is_identified(event["user"]):
if len(event["args_split"]) > 1:
account = event["args_split"][0]
password = " ".join(event["args_split"][1:])
@@ -110,9 +167,11 @@ class Module(ModuleManager.BaseModule):
if hash and salt:
attempt, _ = self._make_hash(password, salt)
if utils.security.constant_time_compare(attempt, hash):
- self._identified(event["server"], event["user"], account)
+ event["user"]._account_override = account
+ self._has_identified(event["server"], event["user"], account)
+
event["stdout"].write("Correct password, you have "
- "been identified as '%s'." % account)
+ "been identified as %s." % account)
self.events.on("internal.identified").call(
user=event["user"])
else:
@@ -122,179 +181,118 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write("Account '%s' is not registered" %
account)
else:
- event["stderr"].write("You are already identified")
+ event["stderr"].write("You are already identified as %s" %
+ self._account_name(event["user"]))
- @utils.hook("received.command.register", private_only=True, min_args=1)
- def register(self, event):
- """
- :help: Register yourself
- :usage: <password>
- """
- identity_mechanism = event["server"].get_setting("identity-mechanism",
- "internal")
- if not identity_mechanism == "internal":
- raise utils.EventError("The 'identify' command isn't available "
- "on this network")
+ @utils.hook("received.command.permission")
+ @utils.kwarg("min_args", 2)
+ @utils.kwarg("usage", "list <account>")
+ @utils.kwarg("usage", "clear <account>")
+ @utils.kwarg("usage", "add <account> <permission>")
+ @utils.kwarg("usage", "remove <account> <permission>")
+ @utils.kwarg("permission", "permissions.change")
+ def permission(self, event):
+ subcommand = event["args_split"][0].lower()
+ account = event["args_split"][1]
+ target_user = event["server"].get_user(account)
- hash, salt = self._get_hash(event["server"], event["user"].nickname)
- if not hash and not salt:
- password = event["args"]
- hash, salt = self._make_hash(password)
- event["user"].set_setting("authentication", [hash, salt])
- self._identified(event["server"], event["user"],
- event["user"].nickname)
- event["stdout"].write("Nickname registered successfully")
+ if subcommand == "list":
+ event["stdout"].write("Permissions for %s: %s" % (
+ account, ", ".join(self._get_permissions(target_user))))
+ elif subcommand == "clear":
+ if not self._get_permissions(target_user):
+ raise utils.EventError("%s has no permissions" % account)
+ target_user.del_setting("permissions")
+ event["stdout"].write("Cleared permissions for %s" % account)
else:
- event["stderr"].write("This nickname is already registered")
+ permissions = event["args_split"][2:]
+ if not permissions:
+ raise utils.EventError("Please provide at least 1 permission")
+ user_permissions = self._get_permissions(target_user)
- @utils.hook("received.command.setpassword", authenticated=True, min_args=1)
- def set_password(self, event):
- """
- :help: Change your password
- :usage: <password>
- """
- hash, salt = self._make_hash(event["args"])
- event["user"].set_setting("authentication", [hash, salt])
- event["stdout"].write("Set your password")
+ if subcommand == "add":
+ new = list(set(permissions)-set(user_permissions))
+ if not new:
+ raise utils.EventError("No new permissions to give")
+ target_user.set_setting("permissions", user_permissions+new)
+ event["stdout"].write("Gave %s new permissions: %s" %
+ (account, ", ".join(new)))
+ elif subcommand == "remove":
+ permissions_set = set(permissions)
+ user_permissions_set = set(user_permissions)
+ removed = list(user_permissions_set&permissions_set)
+ if not (user_permissions_set & permissions_set):
+ raise utils.EventError("New permissions to remove")
+ change = list(user_permissions_set - permissions_set)
- @utils.hook("received.command.logout", private_only=True)
- def logout(self, event):
- """
- :help: Logout from your identified account
- """
- if event["user"].identified_account_override:
- self._logout(event["user"])
- event["stdout"].write("You have been logged out")
- else:
- event["stderr"].write("You are not logged in")
+ if not change:
+ target_user.del_setting("permissions")
+ else:
+ target_user.set_setting("permissions", change)
+ event["stdout"].write("Removed permissions from %s: %s" %
+ (account, ", ".join(change)))
+ else:
+ raise utils.EventError("Unknown subcommand %s" % subcommand)
- @utils.hook("received.command.resetpassword", private_only=True,
- min_args=2)
- def reset_password(self, event):
- """
- :help: Reset a given user's password
- :usage: <nickname> <password>
- :permission: resetpassword
- """
- target = event["server"].get_user(event["args_split"][0])
- password = " ".join(event["args_split"][1:])
- registered = target.get_setting("authentication", None)
+ @utils.hook("received.command.hostmask")
+ @utils.kwarg("min_args", 1)
+ @utils.kwarg("authenticated", True)
+ @utils.kwarg("usage", "list")
+ @utils.kwarg("usage", "add [hostmask]")
+ @utils.kwarg("usage", "remove [hostmask]")
+ def hostmask(self, event):
+ subcommand = event["args_split"][0].lower()
+ hostmasks = event["user"].get_setting(HOSTMASKS_SETTING, [])
- if registered == None:
- event["stderr"].write("'%s' isn't registered" % target.nickname)
+ if subcommand == "list":
+ event["stdout"].write("Your hostmasks: %s" % ", ".join(hostmasks))
else:
- hash, salt = self._make_hash(password)
- target.set_setting("authentication", [hash, salt])
- event["stdout"].write("Reset password for '%s'" %
- target.nickname)
+ if event["args_split"][1:]:
+ hostmask = event["args_split"][1]
+ else:
+ hostmask = "*!%s" % event["user"].userhost()
+ account = self._account_name(event["user"])
- def _check_command(self, event, permission, authenticated):
- if event["user"].admin_master and (permission or authenticated):
- return utils.consts.PERMISSION_FORCE_SUCCESS, None
+ if subcommand == "add":
+ if hostmask in hostmasks:
+ raise utils.EventError(
+ "Hostmask %s is already on your account" % hostmask)
+ hostmasks.append(hostmask)
+ event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
- identity_mechanism = event["server"].get_setting("identity-mechanism",
- "internal")
- identified_account = None
- if identity_mechanism == "internal":
- identified_account = event["user"].identified_account_override
- elif identity_mechanism == "ircv3-account":
- identified_account = (event["user"].identified_account or
- event["tags"].get("account", None))
+ self._specific_hostmask(event["server"], hostmask, account)
+ event["server"]._hostmasks[hostmask] = account
- identified_user = None
- permissions = []
- if identified_account:
- identified_user = event["server"].get_user(identified_account)
- permissions = identified_user.get_setting("permissions", [])
+ event["stdout"].write("Added %s to your hostmasks" % hostmask)
+ elif subcommand == "remove":
+ if not hostmask in hostmasks:
+ raise utils.EventError("Hostmask %s is not on your account"
+ % hostmask)
+ while hostmask in hostmasks:
+ hostmasks.remove(hostmask)
+ event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
- if permission:
- has_permission = permission and (
- permission in permissions or "*" in permissions)
- if not identified_account or not has_permission:
- return (utils.consts.PERMISSION_ERROR,
- "You do not have permission to do that")
- else:
- return utils.consts.PERMISSION_FORCE_SUCCESS, None
- elif authenticated:
- if not identified_account:
- error = None
- if identity_mechanism == "internal":
- error = REQUIRES_IDENTIFY_INTERNAL % (
- event["server"].nickname, event["server"].nickname)
- else:
- error = REQUIRES_IDENTIFY
- return utils.consts.PERMISSION_ERROR, error
+ self._specific_hostmask(event["server"], hostmask, None)
+ if hostmask in event["server"]._hostmasks:
+ del event["server"]._hostmasks[hostmask]
+
+ event["stdout"].write("Removed %s from your hostmasks"
+ % hostmask)
else:
- return utils.consts.PERMISSION_FORCE_SUCCESS, None
+ raise utils.EventError("Unknown subcommand %s" % subcommand)
@utils.hook("preprocess.command")
def preprocess_command(self, event):
+ allowed = None
permission = event["hook"].get_kwarg("permission", None)
authenticated = event["hook"].get_kwarg("authenticated", False)
- return self._check_command(event, permission, authenticated)
-
- @utils.hook("check.command.permission")
- def check_command(self, event):
- return self._check_command(event, event["request_args"][0], False)
+ if not permission == None:
+ allowed = self._has_permission(event["user"], permission)
+ elif not authenticated == None:
+ allowed = self._is_identified(event["user"])
- @utils.hook("received.command.mypermissions", authenticated=True)
- def my_permissions(self, event):
- """
- :help: Show your permissions
- """
- permissions = event["user"].get_setting("permissions", [])
- event["stdout"].write("Your permissions: %s" % ", ".join(permissions))
-
- def _get_user_details(self, server, nickname):
- target = server.get_user(nickname)
- registered = bool(target.get_setting("authentication", None))
- permissions = target.get_setting("permissions", [])
- return [target, registered, permissions]
-
- @utils.hook("received.command.givepermission", min_args=2)
- def give_permission(self, event):
- """
- :help: Give a given permission to a given user
- :usage: <nickname> <permission>
- :permission: givepermission
- """
- permission = event["args_split"][1].lower()
- target, registered, permissions = self._get_user_details(
- event["server"], event["args_split"][0])
-
- if target.get_identified_account() == None:
- raise utils.EventError("%s isn't registered" % target.nickname)
-
- if permission in permissions:
- event["stderr"].write("%s already has permission '%s'" % (
- target.nickname, permission))
- else:
- permissions.append(permission)
- target.set_setting("permissions", permissions)
- event["stdout"].write("Gave permission '%s' to %s" % (
- permission, target.nickname))
- @utils.hook("received.command.removepermission", min_args=2)
- def remove_permission(self, event):
- """
- :help: Remove a given permission from a given user
- :usage: <nickname> <permission>
- :permission: removepermission
- """
- permission = event["args_split"][1].lower()
- target, registered, permissions = self._get_user_details(
- event["server"], event["args_split"][0])
-
- if target.identified_account == None:
- raise utils.EventError("%s isn't registered" % target.nickname)
-
- if permission not in permissions:
- event["stderr"].write("%s doesn't have permission '%s'" % (
- target.nickname, permission))
- else:
- permissions.remove(permission)
- if not permissions:
- target.del_setting("permissions")
+ if not allowed == None:
+ if allowed:
+ return utils.consts.PERMISSION_FORCE_SUCCESS, None
else:
- target.set_setting("permissions", permissions)
- event["stdout"].write("Removed permission '%s' from %s" % (
- permission, target.nickname))
+ return utils.consts.PERMISSION_ERROR, NO_PERMISSION