aboutsummaryrefslogtreecommitdiff
path: root/src/core_modules/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/core_modules/commands')
-rw-r--r--src/core_modules/commands/__init__.py424
-rw-r--r--src/core_modules/commands/outs.py28
2 files changed, 452 insertions, 0 deletions
diff --git a/src/core_modules/commands/__init__.py b/src/core_modules/commands/__init__.py
new file mode 100644
index 00000000..5a750a9b
--- /dev/null
+++ b/src/core_modules/commands/__init__.py
@@ -0,0 +1,424 @@
+#--depends-on config
+#--depends-on permissions
+
+import enum, re, shlex, string, traceback, typing
+from src import EventManager, IRCLine, ModuleManager, utils
+from . import outs
+
+COMMAND_METHOD = "command-method"
+COMMAND_METHODS = ["PRIVMSG", "NOTICE"]
+
+STR_MORE = " (more...)"
+STR_MORE_LEN = len(STR_MORE.encode("utf8"))
+STR_CONTINUED = "(...continued)"
+WORD_BOUNDARIES = [" "]
+
+NON_ALPHANUMERIC = [char for char in string.printable if not char.isalnum()]
+
+class OutType(enum.Enum):
+ OUT = 1
+ ERR = 2
+
+class BadContextException(Exception):
+ def __init__(self, required_context):
+ self.required_context = required_context
+ Exception.__init__(self)
+
+class CommandEvent(object):
+ def __init__(self, command, args):
+ self.command = command
+ self.args = args
+
+SETTING_COMMANDMETHOD = utils.OptionsSetting(COMMAND_METHODS, COMMAND_METHOD,
+ "Set the method used to respond to commands")
+
+@utils.export("channelset", utils.Setting("command-prefix",
+ "Set the command prefix used in this channel", example="!"))
+@utils.export("serverset", utils.Setting("command-prefix",
+ "Set the command prefix used on this server", example="!"))
+@utils.export("serverset", SETTING_COMMANDMETHOD)
+@utils.export("channelset", SETTING_COMMANDMETHOD)
+@utils.export("botset", SETTING_COMMANDMETHOD)
+@utils.export("channelset", utils.BoolSetting("hide-prefix",
+ "Disable/enable hiding prefix in command reponses"))
+@utils.export("channelset", utils.BoolSetting("commands",
+ "Disable/enable responding to commands in-channel"))
+@utils.export("channelset", utils.BoolSetting("prefixed-commands",
+ "Disable/enable responding to prefixed commands in-channel"))
+class Module(ModuleManager.BaseModule):
+ @utils.hook("new.user")
+ @utils.hook("new.channel")
+ def new(self, event):
+ if "user" in event:
+ target = event["user"]
+ else:
+ target = event["channel"]
+
+ def has_command(self, command):
+ return command.lower() in self.events.on("received").on(
+ "command").get_children()
+ def get_hooks(self, command):
+ return self.events.on("received.command").on(command
+ ).get_hooks()
+
+ def is_highlight(self, server, s):
+ if s and s[-1] in [":", ","]:
+ return server.is_own_nickname(s[:-1])
+
+ def _command_method(self, server, target):
+ return target.get_setting(COMMAND_METHOD,
+ server.get_setting(COMMAND_METHOD,
+ self.bot.get_setting(COMMAND_METHOD, "PRIVMSG"))).upper()
+
+ def _find_command_hook(self, server, target, is_channel, command, args):
+ if not self.has_command(command):
+ command_event = CommandEvent(command, args)
+ self.events.on("get.command").call(command=command_event,
+ server=server, target=target, is_channel=is_channel)
+
+ command = command_event.command
+ args = command_event.args
+
+ hook = None
+ args_split = []
+ channel_skip = False
+ private_skip = False
+ if self.has_command(command):
+ for potential_hook in self.get_hooks(command):
+ alias_of = self._get_alias_of(potential_hook)
+ if alias_of:
+ if self.has_command(alias_of):
+ potential_hook = self.get_hooks(alias_of)[0]
+ else:
+ raise ValueError(
+ "'%s' is an alias of unknown command '%s'"
+ % (command.lower(), alias_of.lower()))
+
+ if not is_channel and potential_hook.get_kwarg("channel_only",
+ False):
+ channel_skip = True
+ continue
+ if is_channel and potential_hook.get_kwarg("private_only",
+ False):
+ private_skip = True
+ continue
+
+ hook = potential_hook
+
+ if args:
+ argparse = hook.get_kwarg("argparse", "plain")
+ if argparse == "shlex":
+ args_split = shlex.split(args)
+ elif argparse == "plain":
+ args_split = args.split(" ")
+
+ break
+
+ if not hook and (private_skip or channel_skip):
+ raise BadContextException("channel" if channel_skip else "private")
+
+ return hook, command, args_split
+
+ def _check(self, context, kwargs, requests=[]):
+ event_hook = self.events.on(context).on("command")
+
+ returns = []
+ if requests:
+ for request, request_args in requests:
+ returns.append(event_hook.on(request).call_for_result_unsafe(
+ **kwargs, request_args=request_args))
+ else:
+ returns = event_hook.call_unsafe(**kwargs)
+
+ hard_fail = False
+ force_success = False
+ error = None
+ for returned in returns:
+ if returned:
+ type, message = returned
+ if type == utils.consts.PERMISSION_HARD_FAIL:
+ error = message
+ hard_fail = True
+ break
+ elif type == utils.consts.PERMISSION_FORCE_SUCCESS:
+ force_success = True
+ break
+ elif type == utils.consts.PERMISSION_ERROR:
+ error = message
+
+ if hard_fail:
+ return False, error
+ elif not force_success and error:
+ return False, error
+ else:
+ return True, None
+
+
+ def _check_assert(self, check_kwargs, user,
+ check: typing.Union[utils.Check, utils.MultiCheck]):
+ checks = check.to_multi() # both Check and MultiCheck has this func
+ is_success, message = self._check("check", check_kwargs,
+ checks.requests())
+ if not is_success:
+ raise utils.EventError("%s: %s" % (user.nickname, message))
+
+ def command(self, server, target, target_str, is_channel, user, command,
+ args_split, line, hook, **kwargs):
+ module_name = (self._get_prefix(hook) or
+ self.bot.modules.from_context(hook.context).title)
+
+ stdout = outs.StdOut(module_name)
+ stderr = outs.StdOut(module_name)
+
+ ret = False
+ has_out = False
+
+ if hook.get_kwarg("remove_empty", True):
+ args_split = list(filter(None, args_split))
+
+ event_kwargs = {"hook": hook, "user": user, "server": server,
+ "target": target, "target_str": target_str,
+ "is_channel": is_channel, "line": line, "args_split": args_split,
+ "command": command, "args": " ".join(args_split), "stdout": stdout,
+ "stderr": stderr, "tags": {}}
+ event_kwargs.update(kwargs)
+
+ check_assert = lambda check: self._check_assert(event_kwargs, user,
+ check)
+ event_kwargs["check_assert"] = check_assert
+
+ eaten = False
+
+ check_success, check_message = self._check("preprocess", event_kwargs)
+ if check_success:
+ new_event = self.events.on(hook.event_name).make_event(**event_kwargs)
+ self.log.trace("calling command '%s': %s", [command, new_event.kwargs])
+
+ try:
+ hook.call(new_event)
+ except utils.EventError as e:
+ stderr.write(str(e))
+ eaten = new_event.eaten
+ else:
+ if check_message:
+ stderr.write("%s: %s" % (user.nickname, check_message))
+
+ self._check("postprocess", event_kwargs)
+ # postprocess - send stdout/stderr and typing tag
+
+ return eaten
+
+ @utils.hook("postprocess.command")
+ @utils.kwarg("priority", EventManager.PRIORITY_LOW)
+ def postprocess(self, event):
+ type = None
+ obj = None
+ if event["stdout"].has_text():
+ type = OutType.OUT
+ obj = event["stdout"]
+ elif event["stderr"].has_text():
+ type = OutType.ERR
+ obj = event["stderr"]
+ else:
+ return
+ self._out(event["server"], event["target"], event["target_str"], obj,
+ type, event["tags"])
+
+ def _out(self, server, target, target_str, obj, type, tags):
+ if type == OutType.OUT:
+ color = utils.consts.GREEN
+ else:
+ color = utils.consts.RED
+
+ line_str = obj.pop()
+ if obj.prefix:
+ line_str = "[%s] %s" % (
+ utils.irc.color(obj.prefix, color), line_str)
+ method = self._command_method(server, target)
+
+ if not method in ["PRIVMSG", "NOTICE"]:
+ raise ValueError("Unknown command-method '%s'" % method)
+
+ line = IRCLine.ParsedLine(method, [target_str, line_str],
+ tags=tags)
+ valid, trunc = line.truncate(server.hostmask(),
+ margin=STR_MORE_LEN)
+
+ if trunc:
+ if not trunc[0] in WORD_BOUNDARIES:
+ for boundary in WORD_BOUNDARIES:
+ left, *right = valid.rsplit(boundary, 1)
+ if right:
+ valid = left
+ trunc = right[0]+trunc
+ obj.insert("%s %s" % (STR_CONTINUED, trunc))
+ valid = valid+STR_MORE
+ line = IRCLine.parse_line(valid)
+ server.send(line)
+
+ @utils.hook("preprocess.command")
+ def _check_min_args(self, event):
+ min_args = event["hook"].get_kwarg("min_args")
+ if min_args and len(event["args_split"]) < min_args:
+ usage = self._get_usage(event["hook"], event["command"],
+ event["command_prefix"])
+ error = None
+ if usage:
+ error = "Not enough arguments, usage: %s" % usage
+ else:
+ error = "Not enough arguments (minimum: %d)" % min_args
+ return utils.consts.PERMISSION_HARD_FAIL, error
+
+ def _command_prefix(self, server, channel):
+ return channel.get_setting("command-prefix",
+ server.get_setting("command-prefix", "!"))
+
+ @utils.hook("received.message.channel", priority=EventManager.PRIORITY_LOW)
+ def channel_message(self, event):
+ commands_enabled = event["channel"].get_setting("commands", True)
+ if not commands_enabled:
+ return
+
+ command_prefix = self._command_prefix(event["server"], event["channel"])
+ command = None
+ args = ""
+ if event["message_split"][0].startswith(command_prefix):
+ if not event["channel"].get_setting("prefixed-commands",True):
+ return
+ command = event["message_split"][0].replace(
+ command_prefix, "", 1).lower()
+ if " " in event["message"]:
+ args = event["message"].split(" ", 1)[1]
+ elif len(event["message_split"]) > 1 and self.is_highlight(
+ event["server"], event["message_split"][0]):
+ command = event["message_split"][1].lower()
+ if event["message"].count(" ") > 1:
+ args = event["message"].split(" ", 2)[2]
+
+ hook = None
+ args_split = []
+ if command:
+ try:
+ hook, command, args_split = self._find_command_hook(
+ event["server"], event["channel"], True, command, args)
+ except BadContextException:
+ event["channel"].send_message(
+ "%s: That command is not valid in a channel" %
+ event["user"].nickname)
+ return
+
+ if hook:
+ if event["action"]:
+ return
+
+ if hook:
+ self.command(event["server"], event["channel"],
+ event["target_str"], True, event["user"], command,
+ args_split, event["line"], hook,
+ command_prefix=command_prefix,
+ buffer_line=event["buffer_line"])
+ else:
+ self.events.on("unknown.command").call(server=event["server"],
+ target=event["channel"], user=event["user"],
+ command=command, command_prefix=command_prefix,
+ is_channel=True)
+ else:
+ regex_hooks = self.events.on("command.regex").get_hooks()
+ for hook in regex_hooks:
+ if event["action"] and hook.get_kwarg("ignore_action", True):
+ continue
+
+ pattern = hook.get_kwarg("pattern", None)
+ if pattern:
+ match = re.search(pattern, event["message"])
+ if match:
+ command = hook.get_kwarg("command", "")
+ res = self.command(event["server"], event["channel"],
+ event["target_str"], True, event["user"], command,
+ "", event["line"], hook, match=match,
+ message=event["message"], command_prefix="",
+ action=event["action"],
+ buffer_line=event["buffer_line"])
+
+ if res:
+ break
+
+ @utils.hook("received.message.private", priority=EventManager.PRIORITY_LOW)
+ def private_message(self, event):
+ if event["message_split"] and not event["action"]:
+ command = event["message_split"][0].lower()
+
+ # this should help catch commands when people try to do prefixed
+ # commands ('!help' rather than 'help') in PM
+ command = command.lstrip("".join(NON_ALPHANUMERIC))
+
+ args = ""
+ if " " in event["message"]:
+ args = event["message"].split(" ", 1)[1]
+
+ try:
+ hook, command, args_split = self._find_command_hook(
+ event["server"], event["user"], False, command, args)
+ except BadContextException:
+ event["user"].send_message(
+ "That command is not valid in a PM")
+ return
+
+ if hook:
+ self.command(event["server"], event["user"],
+ event["user"].nickname, False, event["user"], command,
+ args_split, event["line"], hook, command_prefix="",
+ buffer_line=event["buffer_line"])
+ else:
+ self.events.on("unknown.command").call(server=event["server"],
+ target=event["user"], user=event["user"], command=command,
+ command_prefix="", is_channel=False)
+
+ def _get_usage(self, hook, command, command_prefix=""):
+ command = "%s%s" % (command_prefix, command)
+ usages = hook.get_kwargs("usage")
+
+ if usages:
+ return " | ".join(
+ "%s %s" % (command, usage) for usage in usages)
+ return None
+
+ def _get_prefix(self, hook):
+ return hook.get_kwarg("prefix", None)
+ def _get_alias_of(self, hook):
+ return hook.get_kwarg("alias_of", None)
+
+ @utils.hook("send.stdout")
+ def _stdout(self, event):
+ self._send_out(event, OutType.OUT)
+ @utils.hook("send.stderr")
+ def _stderr(self, event):
+ self._send_out(event, OutType.ERR)
+
+ def _send_out(self, event, type):
+ target = event["target"]
+ stdout = outs.StdOut(event["module_name"])
+ stdout.write(event["message"])
+ if event.get("hide_prefix", False):
+ stdout.prefix = None
+
+ target_str = event.get("target_str", target.name)
+ self._out(event["server"], target, target_str, stdout,
+ type, {})
+
+ @utils.hook("check.command.self")
+ def check_command_self(self, event):
+ if event["server"].irc_lower(event["request_args"][0]
+ ) == event["user"].name:
+ return utils.consts.PERMISSION_FORCE_SUCCESS, None
+ else:
+ return (utils.consts.PERMISSION_ERROR,
+ "You do not have permission to do this")
+
+ @utils.hook("check.command.is-channel")
+ def check_command_is_channel(self, event):
+ if event["is_channel"]:
+ return utils.consts.PERMISSION_FORCE_SUCCESS, None
+ else:
+ return (utils.consts.PERMISSION_ERROR,
+ "This command can only be used in-channel")
diff --git a/src/core_modules/commands/outs.py b/src/core_modules/commands/outs.py
new file mode 100644
index 00000000..e82ceefd
--- /dev/null
+++ b/src/core_modules/commands/outs.py
@@ -0,0 +1,28 @@
+import re
+from src import IRCLine, utils
+
+class StdOut(object):
+ def __init__(self, prefix):
+ self.prefix = prefix
+ self._lines = []
+ self._assured = False
+
+ def assure(self):
+ self._assured = True
+
+ def write(self, text):
+ self.write_lines(
+ text.replace("\r", "").replace("\n\n", "\n").split("\n"))
+ def write_lines(self, lines):
+ self._lines += list(filter(None, lines))
+
+ def get_all(self):
+ return self._lines.copy()
+ def pop(self):
+ return self._lines.pop(0)
+ def insert(self, text):
+ self._lines.insert(0, text)
+
+ def has_text(self):
+ return bool(self._lines)
+