aboutsummaryrefslogtreecommitdiff
path: root/src/core_modules/command_spec
diff options
context:
space:
mode:
authorGravatar jesopo2020-02-14 21:57:06 +0000
committerGravatar jesopo2020-02-14 21:59:41 +0000
commitf827bdce7fd1295b395f8350d9694d67b825f51d (patch)
tree7e3519717147bdd41b86d96bc832809a79a38443 /src/core_modules/command_spec
parent_schedule_match_part doesn't take an array (diff)
signature
split out command_spec module
Diffstat (limited to 'src/core_modules/command_spec')
-rw-r--r--src/core_modules/command_spec/__init__.py127
-rw-r--r--src/core_modules/command_spec/types.py99
2 files changed, 226 insertions, 0 deletions
diff --git a/src/core_modules/command_spec/__init__.py b/src/core_modules/command_spec/__init__.py
new file mode 100644
index 00000000..c28976a8
--- /dev/null
+++ b/src/core_modules/command_spec/__init__.py
@@ -0,0 +1,127 @@
+from src import EventManager, ModuleManager, utils
+from . import types
+
+# describing command arg specifications, to centralise parsing and validating.
+#
+# format: <!|?><name>
+# ! = required
+# ? = optional
+#
+# if "name" contains "~", it will be marked as an "important" spec
+# this means that, e.g. "!r~channel" will be:
+# - marked as important
+# - name split to everything after ~
+# - the name, and it's value, will be offered to other preprocessors.
+#
+# this means, in practice, that "!r~channel" is a:
+# - "revelant" channel (current if in channel, explicit arg otherwise)
+# - will be used to check if a user has permissions
+#
+# spec types:
+# - "time" - +1w2d3h4m5s format time
+# - "rchannel" - relevant channel. current channel if we're in channel,
+# otherwise an explicit channel name argument
+# - "channel" - an argument of a channel name
+# - "cuser" - a nickname but only if they are in the current channel
+# - "ruser" - revlevant user. either current user if no arguments, otherwise
+# take nickname for user from given args
+# - "user" - an argument of a user's nickname
+# - "ouser" - an argument of a potentially offline user's nickname
+# - "word" - one word from arguments
+# - "string" - collect all remaining args in to a string
+
+class Module(ModuleManager.BaseModule):
+ def _spec_value(self, server, channel, user, argument_types, args):
+ options = []
+ first_error = None
+ for argument_type in argument_types:
+ value = None
+ n = 0
+ error = None
+
+ simple_value, simple_count = argument_type.simple(args)
+ if not simple_count == -1:
+ value = simple_value
+ n = simple_count
+ error = argument_type.error()
+ elif argument_type.type in types.TYPES:
+ func = types.TYPES[argument_type.type]
+ try:
+ value, n = func(server, channel, user, args)
+ except types.SpecTypeError as e:
+ error = e.message
+
+ options.append([argument_type, value, n, error])
+ return options
+
+ def _argument_types(self, options, args):
+ current_error = None
+ for argument_type, value, n, error in options:
+ if not value == None:
+ return [argument_type, n, value]
+ elif error:
+ current_error = error
+ elif n > len(args):
+ current_error = "Not enough arguments"
+ return [None, -1, current_error or "Invalid arguments"]
+
+ @utils.hook("preprocess.command")
+ @utils.kwarg("priority", EventManager.PRIORITY_HIGH)
+ def preprocess(self, event):
+ specs = event["hook"].get_kwargs("spec")
+ if specs:
+ server = event["server"]
+ channel = event["target"] if event["is_channel"] else None
+ user = event["user"]
+
+ overall_error = None
+ best_count = 0
+ for spec_arguments in specs:
+ out = {}
+ args = event["args_split"].copy()
+ kwargs = {"channel": channel}
+ failed = False
+
+ current_error = None
+ count = 0
+ spec_index = 0
+ for spec_argument in spec_arguments:
+ argument_type_multi = len(set(
+ t.type for t in spec_argument.types)) > 1
+ options = self._spec_value(server, kwargs["channel"], user,
+ spec_argument.types, args)
+
+ argument_type, n, value = self._argument_types(options, args)
+ if n > -1:
+ args = args[n:]
+
+ if argument_type.exported:
+ kwargs[argument_type.exported] = value
+
+ if argument_type_multi:
+ value = [argument_type.type, value]
+
+ elif not spec_argument.optional:
+ failed = True
+ current_error = value
+ break
+ else:
+ value = None
+
+ count += 1
+ if spec_argument.consume:
+ out[spec_index] = value
+ spec_index += 1
+ if argument_type:
+ key = argument_type.name() or argument_type.type
+ out[key] = value
+
+ if not failed:
+ kwargs["spec"] = out
+ event["kwargs"].update(kwargs)
+ return
+ else:
+ if count >= best_count:
+ overall_error = current_error
+
+ return utils.consts.PERMISSION_HARD_FAIL, overall_error
diff --git a/src/core_modules/command_spec/types.py b/src/core_modules/command_spec/types.py
new file mode 100644
index 00000000..cb7c841e
--- /dev/null
+++ b/src/core_modules/command_spec/types.py
@@ -0,0 +1,99 @@
+
+class SpecTypeError(Exception):
+ def __init__(self, message: str, arg_count: int=1):
+ self.message = message
+ self.arg_count = arg_count
+
+TYPES = {}
+def _type(func):
+ TYPES[func.__name__] = func
+
+def _assert_args(args, type):
+ if not args:
+ raise SpecTypeError("No %s provided" % type)
+
+@_type
+def rchannel(server, channel, user, args):
+ if channel:
+ return channel, 0
+ elif args:
+ if args[0] in server.channels:
+ return server.channels.get(args[0]), 1
+ else:
+ raise SpecTypeError("No such channel")
+ else:
+ raise SpecTypeError("No channel provided")
+
+@_type
+def channel(server, channel, user, args):
+ _assert_args(args, "channel")
+ if args[0] in server.channels:
+ return server.channels.get(args[0]), 1
+ else:
+ raise SpecTypeError("No such channel")
+
+@_type
+def cuser(server, channel, user, args):
+ _assert_args(args, "user")
+ target_user = server.get_user(args[0], create=False)
+ if target_user and channel.has_user(target_user):
+ return target_user, 1
+ else:
+ raise SpecTypeError("That user is not in this channel")
+
+@_type
+def ruser(server, channel, user, args):
+ if args:
+ target_user = server.get_user(args[0], create=False)
+ if target_user:
+ return target_user, 1
+ else:
+ raise SpecTypeError("No such user")
+ else:
+ return user, 0
+
+@_type
+def user(server, channel, user, args):
+ _assert_args(args, "user")
+ target_user = server.get_user(args[0], create=False)
+ if target_user:
+ return target_user, 1
+ else:
+ raise SpecTypeError("No such user")
+
+@_type
+def ouser(server, channel, user, args):
+ _assert_args(args, "user")
+ if server.has_user_id(args[0]):
+ return server.get_user(args[0], create=True), 1
+ else:
+ raise SpecTypeError("No such user")
+
+@_type
+def nuser(server, channel, user, args):
+ _assert_args(args, "user")
+ return server.get_user(args[0], create=True), 1
+
+@_type
+def lstring(server, channel, user, args):
+ if args:
+ return " ".join(args), len(args)
+ else:
+ last_message = (channel or user).buffer.get()
+ if last_message:
+ return last_message.message, 0
+ else:
+ raise SpecTypeError("No message found")
+
+@_type
+def channelonly(server, channel, user, args):
+ if channel:
+ return True, 0
+ else:
+ raise SpecTypeError("Command not valid in PM")
+@_type
+def privateonly(server, channel, user, args):
+ if not channel:
+ return True, 0
+ else:
+ raise SpecTypeError("Command not valid in channel")