diff options
Diffstat (limited to 'src/core_modules/commands')
| -rw-r--r-- | src/core_modules/commands/__init__.py | 424 | ||||
| -rw-r--r-- | src/core_modules/commands/outs.py | 28 |
2 files changed, 452 insertions, 0 deletions
diff --git a/src/core_modules/commands/__init__.py b/src/core_modules/commands/__init__.py new file mode 100644 index 00000000..5a750a9b --- /dev/null +++ b/src/core_modules/commands/__init__.py @@ -0,0 +1,424 @@ +#--depends-on config +#--depends-on permissions + +import enum, re, shlex, string, traceback, typing +from src import EventManager, IRCLine, ModuleManager, utils +from . import outs + +COMMAND_METHOD = "command-method" +COMMAND_METHODS = ["PRIVMSG", "NOTICE"] + +STR_MORE = " (more...)" +STR_MORE_LEN = len(STR_MORE.encode("utf8")) +STR_CONTINUED = "(...continued)" +WORD_BOUNDARIES = [" "] + +NON_ALPHANUMERIC = [char for char in string.printable if not char.isalnum()] + +class OutType(enum.Enum): + OUT = 1 + ERR = 2 + +class BadContextException(Exception): + def __init__(self, required_context): + self.required_context = required_context + Exception.__init__(self) + +class CommandEvent(object): + def __init__(self, command, args): + self.command = command + self.args = args + +SETTING_COMMANDMETHOD = utils.OptionsSetting(COMMAND_METHODS, COMMAND_METHOD, + "Set the method used to respond to commands") + +@utils.export("channelset", utils.Setting("command-prefix", + "Set the command prefix used in this channel", example="!")) +@utils.export("serverset", utils.Setting("command-prefix", + "Set the command prefix used on this server", example="!")) +@utils.export("serverset", SETTING_COMMANDMETHOD) +@utils.export("channelset", SETTING_COMMANDMETHOD) +@utils.export("botset", SETTING_COMMANDMETHOD) +@utils.export("channelset", utils.BoolSetting("hide-prefix", + "Disable/enable hiding prefix in command reponses")) +@utils.export("channelset", utils.BoolSetting("commands", + "Disable/enable responding to commands in-channel")) +@utils.export("channelset", utils.BoolSetting("prefixed-commands", + "Disable/enable responding to prefixed commands in-channel")) +class Module(ModuleManager.BaseModule): + @utils.hook("new.user") + @utils.hook("new.channel") + def new(self, event): + if "user" in event: + target = event["user"] + else: + target = event["channel"] + + def has_command(self, command): + return command.lower() in self.events.on("received").on( + "command").get_children() + def get_hooks(self, command): + return self.events.on("received.command").on(command + ).get_hooks() + + def is_highlight(self, server, s): + if s and s[-1] in [":", ","]: + return server.is_own_nickname(s[:-1]) + + def _command_method(self, server, target): + return target.get_setting(COMMAND_METHOD, + server.get_setting(COMMAND_METHOD, + self.bot.get_setting(COMMAND_METHOD, "PRIVMSG"))).upper() + + def _find_command_hook(self, server, target, is_channel, command, args): + if not self.has_command(command): + command_event = CommandEvent(command, args) + self.events.on("get.command").call(command=command_event, + server=server, target=target, is_channel=is_channel) + + command = command_event.command + args = command_event.args + + hook = None + args_split = [] + channel_skip = False + private_skip = False + if self.has_command(command): + for potential_hook in self.get_hooks(command): + alias_of = self._get_alias_of(potential_hook) + if alias_of: + if self.has_command(alias_of): + potential_hook = self.get_hooks(alias_of)[0] + else: + raise ValueError( + "'%s' is an alias of unknown command '%s'" + % (command.lower(), alias_of.lower())) + + if not is_channel and potential_hook.get_kwarg("channel_only", + False): + channel_skip = True + continue + if is_channel and potential_hook.get_kwarg("private_only", + False): + private_skip = True + continue + + hook = potential_hook + + if args: + argparse = hook.get_kwarg("argparse", "plain") + if argparse == "shlex": + args_split = shlex.split(args) + elif argparse == "plain": + args_split = args.split(" ") + + break + + if not hook and (private_skip or channel_skip): + raise BadContextException("channel" if channel_skip else "private") + + return hook, command, args_split + + def _check(self, context, kwargs, requests=[]): + event_hook = self.events.on(context).on("command") + + returns = [] + if requests: + for request, request_args in requests: + returns.append(event_hook.on(request).call_for_result_unsafe( + **kwargs, request_args=request_args)) + else: + returns = event_hook.call_unsafe(**kwargs) + + hard_fail = False + force_success = False + error = None + for returned in returns: + if returned: + type, message = returned + if type == utils.consts.PERMISSION_HARD_FAIL: + error = message + hard_fail = True + break + elif type == utils.consts.PERMISSION_FORCE_SUCCESS: + force_success = True + break + elif type == utils.consts.PERMISSION_ERROR: + error = message + + if hard_fail: + return False, error + elif not force_success and error: + return False, error + else: + return True, None + + + def _check_assert(self, check_kwargs, user, + check: typing.Union[utils.Check, utils.MultiCheck]): + checks = check.to_multi() # both Check and MultiCheck has this func + is_success, message = self._check("check", check_kwargs, + checks.requests()) + if not is_success: + raise utils.EventError("%s: %s" % (user.nickname, message)) + + def command(self, server, target, target_str, is_channel, user, command, + args_split, line, hook, **kwargs): + module_name = (self._get_prefix(hook) or + self.bot.modules.from_context(hook.context).title) + + stdout = outs.StdOut(module_name) + stderr = outs.StdOut(module_name) + + ret = False + has_out = False + + if hook.get_kwarg("remove_empty", True): + args_split = list(filter(None, args_split)) + + event_kwargs = {"hook": hook, "user": user, "server": server, + "target": target, "target_str": target_str, + "is_channel": is_channel, "line": line, "args_split": args_split, + "command": command, "args": " ".join(args_split), "stdout": stdout, + "stderr": stderr, "tags": {}} + event_kwargs.update(kwargs) + + check_assert = lambda check: self._check_assert(event_kwargs, user, + check) + event_kwargs["check_assert"] = check_assert + + eaten = False + + check_success, check_message = self._check("preprocess", event_kwargs) + if check_success: + new_event = self.events.on(hook.event_name).make_event(**event_kwargs) + self.log.trace("calling command '%s': %s", [command, new_event.kwargs]) + + try: + hook.call(new_event) + except utils.EventError as e: + stderr.write(str(e)) + eaten = new_event.eaten + else: + if check_message: + stderr.write("%s: %s" % (user.nickname, check_message)) + + self._check("postprocess", event_kwargs) + # postprocess - send stdout/stderr and typing tag + + return eaten + + @utils.hook("postprocess.command") + @utils.kwarg("priority", EventManager.PRIORITY_LOW) + def postprocess(self, event): + type = None + obj = None + if event["stdout"].has_text(): + type = OutType.OUT + obj = event["stdout"] + elif event["stderr"].has_text(): + type = OutType.ERR + obj = event["stderr"] + else: + return + self._out(event["server"], event["target"], event["target_str"], obj, + type, event["tags"]) + + def _out(self, server, target, target_str, obj, type, tags): + if type == OutType.OUT: + color = utils.consts.GREEN + else: + color = utils.consts.RED + + line_str = obj.pop() + if obj.prefix: + line_str = "[%s] %s" % ( + utils.irc.color(obj.prefix, color), line_str) + method = self._command_method(server, target) + + if not method in ["PRIVMSG", "NOTICE"]: + raise ValueError("Unknown command-method '%s'" % method) + + line = IRCLine.ParsedLine(method, [target_str, line_str], + tags=tags) + valid, trunc = line.truncate(server.hostmask(), + margin=STR_MORE_LEN) + + if trunc: + if not trunc[0] in WORD_BOUNDARIES: + for boundary in WORD_BOUNDARIES: + left, *right = valid.rsplit(boundary, 1) + if right: + valid = left + trunc = right[0]+trunc + obj.insert("%s %s" % (STR_CONTINUED, trunc)) + valid = valid+STR_MORE + line = IRCLine.parse_line(valid) + server.send(line) + + @utils.hook("preprocess.command") + def _check_min_args(self, event): + min_args = event["hook"].get_kwarg("min_args") + if min_args and len(event["args_split"]) < min_args: + usage = self._get_usage(event["hook"], event["command"], + event["command_prefix"]) + error = None + if usage: + error = "Not enough arguments, usage: %s" % usage + else: + error = "Not enough arguments (minimum: %d)" % min_args + return utils.consts.PERMISSION_HARD_FAIL, error + + def _command_prefix(self, server, channel): + return channel.get_setting("command-prefix", + server.get_setting("command-prefix", "!")) + + @utils.hook("received.message.channel", priority=EventManager.PRIORITY_LOW) + def channel_message(self, event): + commands_enabled = event["channel"].get_setting("commands", True) + if not commands_enabled: + return + + command_prefix = self._command_prefix(event["server"], event["channel"]) + command = None + args = "" + if event["message_split"][0].startswith(command_prefix): + if not event["channel"].get_setting("prefixed-commands",True): + return + command = event["message_split"][0].replace( + command_prefix, "", 1).lower() + if " " in event["message"]: + args = event["message"].split(" ", 1)[1] + elif len(event["message_split"]) > 1 and self.is_highlight( + event["server"], event["message_split"][0]): + command = event["message_split"][1].lower() + if event["message"].count(" ") > 1: + args = event["message"].split(" ", 2)[2] + + hook = None + args_split = [] + if command: + try: + hook, command, args_split = self._find_command_hook( + event["server"], event["channel"], True, command, args) + except BadContextException: + event["channel"].send_message( + "%s: That command is not valid in a channel" % + event["user"].nickname) + return + + if hook: + if event["action"]: + return + + if hook: + self.command(event["server"], event["channel"], + event["target_str"], True, event["user"], command, + args_split, event["line"], hook, + command_prefix=command_prefix, + buffer_line=event["buffer_line"]) + else: + self.events.on("unknown.command").call(server=event["server"], + target=event["channel"], user=event["user"], + command=command, command_prefix=command_prefix, + is_channel=True) + else: + regex_hooks = self.events.on("command.regex").get_hooks() + for hook in regex_hooks: + if event["action"] and hook.get_kwarg("ignore_action", True): + continue + + pattern = hook.get_kwarg("pattern", None) + if pattern: + match = re.search(pattern, event["message"]) + if match: + command = hook.get_kwarg("command", "") + res = self.command(event["server"], event["channel"], + event["target_str"], True, event["user"], command, + "", event["line"], hook, match=match, + message=event["message"], command_prefix="", + action=event["action"], + buffer_line=event["buffer_line"]) + + if res: + break + + @utils.hook("received.message.private", priority=EventManager.PRIORITY_LOW) + def private_message(self, event): + if event["message_split"] and not event["action"]: + command = event["message_split"][0].lower() + + # this should help catch commands when people try to do prefixed + # commands ('!help' rather than 'help') in PM + command = command.lstrip("".join(NON_ALPHANUMERIC)) + + args = "" + if " " in event["message"]: + args = event["message"].split(" ", 1)[1] + + try: + hook, command, args_split = self._find_command_hook( + event["server"], event["user"], False, command, args) + except BadContextException: + event["user"].send_message( + "That command is not valid in a PM") + return + + if hook: + self.command(event["server"], event["user"], + event["user"].nickname, False, event["user"], command, + args_split, event["line"], hook, command_prefix="", + buffer_line=event["buffer_line"]) + else: + self.events.on("unknown.command").call(server=event["server"], + target=event["user"], user=event["user"], command=command, + command_prefix="", is_channel=False) + + def _get_usage(self, hook, command, command_prefix=""): + command = "%s%s" % (command_prefix, command) + usages = hook.get_kwargs("usage") + + if usages: + return " | ".join( + "%s %s" % (command, usage) for usage in usages) + return None + + def _get_prefix(self, hook): + return hook.get_kwarg("prefix", None) + def _get_alias_of(self, hook): + return hook.get_kwarg("alias_of", None) + + @utils.hook("send.stdout") + def _stdout(self, event): + self._send_out(event, OutType.OUT) + @utils.hook("send.stderr") + def _stderr(self, event): + self._send_out(event, OutType.ERR) + + def _send_out(self, event, type): + target = event["target"] + stdout = outs.StdOut(event["module_name"]) + stdout.write(event["message"]) + if event.get("hide_prefix", False): + stdout.prefix = None + + target_str = event.get("target_str", target.name) + self._out(event["server"], target, target_str, stdout, + type, {}) + + @utils.hook("check.command.self") + def check_command_self(self, event): + if event["server"].irc_lower(event["request_args"][0] + ) == event["user"].name: + return utils.consts.PERMISSION_FORCE_SUCCESS, None + else: + return (utils.consts.PERMISSION_ERROR, + "You do not have permission to do this") + + @utils.hook("check.command.is-channel") + def check_command_is_channel(self, event): + if event["is_channel"]: + return utils.consts.PERMISSION_FORCE_SUCCESS, None + else: + return (utils.consts.PERMISSION_ERROR, + "This command can only be used in-channel") diff --git a/src/core_modules/commands/outs.py b/src/core_modules/commands/outs.py new file mode 100644 index 00000000..e82ceefd --- /dev/null +++ b/src/core_modules/commands/outs.py @@ -0,0 +1,28 @@ +import re +from src import IRCLine, utils + +class StdOut(object): + def __init__(self, prefix): + self.prefix = prefix + self._lines = [] + self._assured = False + + def assure(self): + self._assured = True + + def write(self, text): + self.write_lines( + text.replace("\r", "").replace("\n\n", "\n").split("\n")) + def write_lines(self, lines): + self._lines += list(filter(None, lines)) + + def get_all(self): + return self._lines.copy() + def pop(self): + return self._lines.pop(0) + def insert(self, text): + self._lines.insert(0, text) + + def has_text(self): + return bool(self._lines) + |
