From f827bdce7fd1295b395f8350d9694d67b825f51d Mon Sep 17 00:00:00 2001 From: jesopo Date: Fri, 14 Feb 2020 21:57:06 +0000 Subject: split out command_spec module --- src/core_modules/command_spec.py | 186 ------------------------------ src/core_modules/command_spec/__init__.py | 127 ++++++++++++++++++++ src/core_modules/command_spec/types.py | 99 ++++++++++++++++ 3 files changed, 226 insertions(+), 186 deletions(-) delete mode 100644 src/core_modules/command_spec.py create mode 100644 src/core_modules/command_spec/__init__.py create mode 100644 src/core_modules/command_spec/types.py (limited to 'src/core_modules') diff --git a/src/core_modules/command_spec.py b/src/core_modules/command_spec.py deleted file mode 100644 index 68313fd6..00000000 --- a/src/core_modules/command_spec.py +++ /dev/null @@ -1,186 +0,0 @@ -from src import EventManager, ModuleManager, utils - -# describing command arg specifications, to centralise parsing and validating. -# -# format: -# ! = required -# ? = optional -# -# if "name" contains "~", it will be marked as an "important" spec -# this means that, e.g. "!r~channel" will be: -# - marked as important -# - name split to everything after ~ -# - the name, and it's value, will be offered to other preprocessors. -# -# this means, in practice, that "!r~channel" is a: -# - "revelant" channel (current if in channel, explicit arg otherwise) -# - will be used to check if a user has permissions -# -# spec types: -# - "time" - +1w2d3h4m5s format time -# - "rchannel" - relevant channel. current channel if we're in channel, -# otherwise an explicit channel name argument -# - "channel" - an argument of a channel name -# - "cuser" - a nickname but only if they are in the current channel -# - "ruser" - revlevant user. either current user if no arguments, otherwise -# take nickname for user from given args -# - "user" - an argument of a user's nickname -# - "ouser" - an argument of a potentially offline user's nickname -# - "word" - one word from arguments -# - "string" - collect all remaining args in to a string - -class Module(ModuleManager.BaseModule): - def _spec_value(self, server, channel, user, argument_types, args): - options = [] - first_error = None - for argument_type in argument_types: - value = None - n = 0 - error = None - - simple_value, simple_count = argument_type.simple(args) - if not simple_count == -1: - value = simple_value - n = simple_count - error = argument_type.error() - elif argument_type.type == "rchannel": - if channel: - value = channel - elif args: - n = 1 - if args[0] in server.channels: - value = server.channels.get(args[0]) - error = "No such channel" - else: - error = "No channel provided" - elif argument_type.type == "channel" and args: - if args[0] in server.channels: - value = server.channels.get(args[0]) - n = 1 - error = "No such channel" - elif argument_type.type == "cuser" and args: - tuser = server.get_user(args[0], create=False) - if tuser and channel.has_user(tuser): - value = tuser - n = 1 - error = "That user is not in this channel" - elif argument_type.type == "ruser": - if args: - value = server.get_user(args[0], create=False) - n = 1 - else: - value = user - error = "No such user" - elif argument_type.type == "user": - if args: - value = server.get_user(args[0], create=False) - n = 1 - error = "No such user" - else: - error = "No user provided" - elif argument_type.type == "ouser": - if args: - if server.has_user_id(args[0]): - value = server.get_user(args[0], create=True) - error = "Unknown nickname" - n = 1 - elif argument_type.type == "nuser": - if args: - value = server.get_user(args[0], create=True) - n = 1 - elif argument_type.type == "lstring": - if args: - value = " ".join(args) - n = len(args) - else: - last_message = (channel or user).buffer.get() - if last_message: - value = last_message.message - n = 0 - else: - n = 1 - elif argument_type.type == "channelonly": - if channel: - value = True - n = 0 - error = "Command not valid in PM" - elif argument_type.type == "privateonly": - if not channel: - value = True - n = 0 - error = "Command not valid in-channel" - - options.append([argument_type, value, n, error]) - return options - - def _argument_types(self, options, args): - current_error = None - for argument_type, value, n, error in options: - if not value == None: - return [argument_type, n, value] - elif error: - current_error = error - elif n > len(args): - current_error = "Not enough arguments" - return [None, -1, current_error or "Invalid arguments"] - - @utils.hook("preprocess.command") - @utils.kwarg("priority", EventManager.PRIORITY_HIGH) - def preprocess(self, event): - specs = event["hook"].get_kwargs("spec") - if specs: - server = event["server"] - channel = event["target"] if event["is_channel"] else None - user = event["user"] - - overall_error = None - best_count = 0 - for spec_arguments in specs: - out = {} - args = event["args_split"].copy() - kwargs = {"channel": channel} - failed = False - - current_error = None - count = 0 - spec_index = 0 - for spec_argument in spec_arguments: - argument_type_multi = len(set( - t.type for t in spec_argument.types)) > 1 - options = self._spec_value(server, kwargs["channel"], user, - spec_argument.types, args) - - argument_type, n, value = self._argument_types(options, args) - if n > -1: - args = args[n:] - - if argument_type.exported: - kwargs[argument_type.exported] = value - - if argument_type_multi: - value = [argument_type.type, value] - - elif not spec_argument.optional: - failed = True - current_error = value - break - else: - value = None - - count += 1 - if spec_argument.consume: - out[spec_index] = value - spec_index += 1 - if argument_type: - key = argument_type.name() or argument_type.type - out[key] = value - - if not failed: - kwargs["spec"] = out - event["kwargs"].update(kwargs) - return - else: - if count >= best_count: - overall_error = current_error - - return utils.consts.PERMISSION_HARD_FAIL, overall_error diff --git a/src/core_modules/command_spec/__init__.py b/src/core_modules/command_spec/__init__.py new file mode 100644 index 00000000..c28976a8 --- /dev/null +++ b/src/core_modules/command_spec/__init__.py @@ -0,0 +1,127 @@ +from src import EventManager, ModuleManager, utils +from . import types + +# describing command arg specifications, to centralise parsing and validating. +# +# format: +# ! = required +# ? = optional +# +# if "name" contains "~", it will be marked as an "important" spec +# this means that, e.g. "!r~channel" will be: +# - marked as important +# - name split to everything after ~ +# - the name, and it's value, will be offered to other preprocessors. +# +# this means, in practice, that "!r~channel" is a: +# - "revelant" channel (current if in channel, explicit arg otherwise) +# - will be used to check if a user has permissions +# +# spec types: +# - "time" - +1w2d3h4m5s format time +# - "rchannel" - relevant channel. current channel if we're in channel, +# otherwise an explicit channel name argument +# - "channel" - an argument of a channel name +# - "cuser" - a nickname but only if they are in the current channel +# - "ruser" - revlevant user. either current user if no arguments, otherwise +# take nickname for user from given args +# - "user" - an argument of a user's nickname +# - "ouser" - an argument of a potentially offline user's nickname +# - "word" - one word from arguments +# - "string" - collect all remaining args in to a string + +class Module(ModuleManager.BaseModule): + def _spec_value(self, server, channel, user, argument_types, args): + options = [] + first_error = None + for argument_type in argument_types: + value = None + n = 0 + error = None + + simple_value, simple_count = argument_type.simple(args) + if not simple_count == -1: + value = simple_value + n = simple_count + error = argument_type.error() + elif argument_type.type in types.TYPES: + func = types.TYPES[argument_type.type] + try: + value, n = func(server, channel, user, args) + except types.SpecTypeError as e: + error = e.message + + options.append([argument_type, value, n, error]) + return options + + def _argument_types(self, options, args): + current_error = None + for argument_type, value, n, error in options: + if not value == None: + return [argument_type, n, value] + elif error: + current_error = error + elif n > len(args): + current_error = "Not enough arguments" + return [None, -1, current_error or "Invalid arguments"] + + @utils.hook("preprocess.command") + @utils.kwarg("priority", EventManager.PRIORITY_HIGH) + def preprocess(self, event): + specs = event["hook"].get_kwargs("spec") + if specs: + server = event["server"] + channel = event["target"] if event["is_channel"] else None + user = event["user"] + + overall_error = None + best_count = 0 + for spec_arguments in specs: + out = {} + args = event["args_split"].copy() + kwargs = {"channel": channel} + failed = False + + current_error = None + count = 0 + spec_index = 0 + for spec_argument in spec_arguments: + argument_type_multi = len(set( + t.type for t in spec_argument.types)) > 1 + options = self._spec_value(server, kwargs["channel"], user, + spec_argument.types, args) + + argument_type, n, value = self._argument_types(options, args) + if n > -1: + args = args[n:] + + if argument_type.exported: + kwargs[argument_type.exported] = value + + if argument_type_multi: + value = [argument_type.type, value] + + elif not spec_argument.optional: + failed = True + current_error = value + break + else: + value = None + + count += 1 + if spec_argument.consume: + out[spec_index] = value + spec_index += 1 + if argument_type: + key = argument_type.name() or argument_type.type + out[key] = value + + if not failed: + kwargs["spec"] = out + event["kwargs"].update(kwargs) + return + else: + if count >= best_count: + overall_error = current_error + + return utils.consts.PERMISSION_HARD_FAIL, overall_error diff --git a/src/core_modules/command_spec/types.py b/src/core_modules/command_spec/types.py new file mode 100644 index 00000000..cb7c841e --- /dev/null +++ b/src/core_modules/command_spec/types.py @@ -0,0 +1,99 @@ + +class SpecTypeError(Exception): + def __init__(self, message: str, arg_count: int=1): + self.message = message + self.arg_count = arg_count + +TYPES = {} +def _type(func): + TYPES[func.__name__] = func + +def _assert_args(args, type): + if not args: + raise SpecTypeError("No %s provided" % type) + +@_type +def rchannel(server, channel, user, args): + if channel: + return channel, 0 + elif args: + if args[0] in server.channels: + return server.channels.get(args[0]), 1 + else: + raise SpecTypeError("No such channel") + else: + raise SpecTypeError("No channel provided") + +@_type +def channel(server, channel, user, args): + _assert_args(args, "channel") + if args[0] in server.channels: + return server.channels.get(args[0]), 1 + else: + raise SpecTypeError("No such channel") + +@_type +def cuser(server, channel, user, args): + _assert_args(args, "user") + target_user = server.get_user(args[0], create=False) + if target_user and channel.has_user(target_user): + return target_user, 1 + else: + raise SpecTypeError("That user is not in this channel") + +@_type +def ruser(server, channel, user, args): + if args: + target_user = server.get_user(args[0], create=False) + if target_user: + return target_user, 1 + else: + raise SpecTypeError("No such user") + else: + return user, 0 + +@_type +def user(server, channel, user, args): + _assert_args(args, "user") + target_user = server.get_user(args[0], create=False) + if target_user: + return target_user, 1 + else: + raise SpecTypeError("No such user") + +@_type +def ouser(server, channel, user, args): + _assert_args(args, "user") + if server.has_user_id(args[0]): + return server.get_user(args[0], create=True), 1 + else: + raise SpecTypeError("No such user") + +@_type +def nuser(server, channel, user, args): + _assert_args(args, "user") + return server.get_user(args[0], create=True), 1 + +@_type +def lstring(server, channel, user, args): + if args: + return " ".join(args), len(args) + else: + last_message = (channel or user).buffer.get() + if last_message: + return last_message.message, 0 + else: + raise SpecTypeError("No message found") + +@_type +def channelonly(server, channel, user, args): + if channel: + return True, 0 + else: + raise SpecTypeError("Command not valid in PM") +@_type +def privateonly(server, channel, user, args): + if not channel: + return True, 0 + else: + raise SpecTypeError("Command not valid in channel") -- cgit v1.3.1-10-gc9f91