aboutsummaryrefslogtreecommitdiff
path: root/modules/commands/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/commands/__init__.py')
-rw-r--r--modules/commands/__init__.py424
1 files changed, 0 insertions, 424 deletions
diff --git a/modules/commands/__init__.py b/modules/commands/__init__.py
deleted file mode 100644
index 5a750a9b..00000000
--- a/modules/commands/__init__.py
+++ /dev/null
@@ -1,424 +0,0 @@
-#--depends-on config
-#--depends-on permissions
-
-import enum, re, shlex, string, traceback, typing
-from src import EventManager, IRCLine, ModuleManager, utils
-from . import outs
-
-COMMAND_METHOD = "command-method"
-COMMAND_METHODS = ["PRIVMSG", "NOTICE"]
-
-STR_MORE = " (more...)"
-STR_MORE_LEN = len(STR_MORE.encode("utf8"))
-STR_CONTINUED = "(...continued)"
-WORD_BOUNDARIES = [" "]
-
-NON_ALPHANUMERIC = [char for char in string.printable if not char.isalnum()]
-
-class OutType(enum.Enum):
- OUT = 1
- ERR = 2
-
-class BadContextException(Exception):
- def __init__(self, required_context):
- self.required_context = required_context
- Exception.__init__(self)
-
-class CommandEvent(object):
- def __init__(self, command, args):
- self.command = command
- self.args = args
-
-SETTING_COMMANDMETHOD = utils.OptionsSetting(COMMAND_METHODS, COMMAND_METHOD,
- "Set the method used to respond to commands")
-
-@utils.export("channelset", utils.Setting("command-prefix",
- "Set the command prefix used in this channel", example="!"))
-@utils.export("serverset", utils.Setting("command-prefix",
- "Set the command prefix used on this server", example="!"))
-@utils.export("serverset", SETTING_COMMANDMETHOD)
-@utils.export("channelset", SETTING_COMMANDMETHOD)
-@utils.export("botset", SETTING_COMMANDMETHOD)
-@utils.export("channelset", utils.BoolSetting("hide-prefix",
- "Disable/enable hiding prefix in command reponses"))
-@utils.export("channelset", utils.BoolSetting("commands",
- "Disable/enable responding to commands in-channel"))
-@utils.export("channelset", utils.BoolSetting("prefixed-commands",
- "Disable/enable responding to prefixed commands in-channel"))
-class Module(ModuleManager.BaseModule):
- @utils.hook("new.user")
- @utils.hook("new.channel")
- def new(self, event):
- if "user" in event:
- target = event["user"]
- else:
- target = event["channel"]
-
- def has_command(self, command):
- return command.lower() in self.events.on("received").on(
- "command").get_children()
- def get_hooks(self, command):
- return self.events.on("received.command").on(command
- ).get_hooks()
-
- def is_highlight(self, server, s):
- if s and s[-1] in [":", ","]:
- return server.is_own_nickname(s[:-1])
-
- def _command_method(self, server, target):
- return target.get_setting(COMMAND_METHOD,
- server.get_setting(COMMAND_METHOD,
- self.bot.get_setting(COMMAND_METHOD, "PRIVMSG"))).upper()
-
- def _find_command_hook(self, server, target, is_channel, command, args):
- if not self.has_command(command):
- command_event = CommandEvent(command, args)
- self.events.on("get.command").call(command=command_event,
- server=server, target=target, is_channel=is_channel)
-
- command = command_event.command
- args = command_event.args
-
- hook = None
- args_split = []
- channel_skip = False
- private_skip = False
- if self.has_command(command):
- for potential_hook in self.get_hooks(command):
- alias_of = self._get_alias_of(potential_hook)
- if alias_of:
- if self.has_command(alias_of):
- potential_hook = self.get_hooks(alias_of)[0]
- else:
- raise ValueError(
- "'%s' is an alias of unknown command '%s'"
- % (command.lower(), alias_of.lower()))
-
- if not is_channel and potential_hook.get_kwarg("channel_only",
- False):
- channel_skip = True
- continue
- if is_channel and potential_hook.get_kwarg("private_only",
- False):
- private_skip = True
- continue
-
- hook = potential_hook
-
- if args:
- argparse = hook.get_kwarg("argparse", "plain")
- if argparse == "shlex":
- args_split = shlex.split(args)
- elif argparse == "plain":
- args_split = args.split(" ")
-
- break
-
- if not hook and (private_skip or channel_skip):
- raise BadContextException("channel" if channel_skip else "private")
-
- return hook, command, args_split
-
- def _check(self, context, kwargs, requests=[]):
- event_hook = self.events.on(context).on("command")
-
- returns = []
- if requests:
- for request, request_args in requests:
- returns.append(event_hook.on(request).call_for_result_unsafe(
- **kwargs, request_args=request_args))
- else:
- returns = event_hook.call_unsafe(**kwargs)
-
- hard_fail = False
- force_success = False
- error = None
- for returned in returns:
- if returned:
- type, message = returned
- if type == utils.consts.PERMISSION_HARD_FAIL:
- error = message
- hard_fail = True
- break
- elif type == utils.consts.PERMISSION_FORCE_SUCCESS:
- force_success = True
- break
- elif type == utils.consts.PERMISSION_ERROR:
- error = message
-
- if hard_fail:
- return False, error
- elif not force_success and error:
- return False, error
- else:
- return True, None
-
-
- def _check_assert(self, check_kwargs, user,
- check: typing.Union[utils.Check, utils.MultiCheck]):
- checks = check.to_multi() # both Check and MultiCheck has this func
- is_success, message = self._check("check", check_kwargs,
- checks.requests())
- if not is_success:
- raise utils.EventError("%s: %s" % (user.nickname, message))
-
- def command(self, server, target, target_str, is_channel, user, command,
- args_split, line, hook, **kwargs):
- module_name = (self._get_prefix(hook) or
- self.bot.modules.from_context(hook.context).title)
-
- stdout = outs.StdOut(module_name)
- stderr = outs.StdOut(module_name)
-
- ret = False
- has_out = False
-
- if hook.get_kwarg("remove_empty", True):
- args_split = list(filter(None, args_split))
-
- event_kwargs = {"hook": hook, "user": user, "server": server,
- "target": target, "target_str": target_str,
- "is_channel": is_channel, "line": line, "args_split": args_split,
- "command": command, "args": " ".join(args_split), "stdout": stdout,
- "stderr": stderr, "tags": {}}
- event_kwargs.update(kwargs)
-
- check_assert = lambda check: self._check_assert(event_kwargs, user,
- check)
- event_kwargs["check_assert"] = check_assert
-
- eaten = False
-
- check_success, check_message = self._check("preprocess", event_kwargs)
- if check_success:
- new_event = self.events.on(hook.event_name).make_event(**event_kwargs)
- self.log.trace("calling command '%s': %s", [command, new_event.kwargs])
-
- try:
- hook.call(new_event)
- except utils.EventError as e:
- stderr.write(str(e))
- eaten = new_event.eaten
- else:
- if check_message:
- stderr.write("%s: %s" % (user.nickname, check_message))
-
- self._check("postprocess", event_kwargs)
- # postprocess - send stdout/stderr and typing tag
-
- return eaten
-
- @utils.hook("postprocess.command")
- @utils.kwarg("priority", EventManager.PRIORITY_LOW)
- def postprocess(self, event):
- type = None
- obj = None
- if event["stdout"].has_text():
- type = OutType.OUT
- obj = event["stdout"]
- elif event["stderr"].has_text():
- type = OutType.ERR
- obj = event["stderr"]
- else:
- return
- self._out(event["server"], event["target"], event["target_str"], obj,
- type, event["tags"])
-
- def _out(self, server, target, target_str, obj, type, tags):
- if type == OutType.OUT:
- color = utils.consts.GREEN
- else:
- color = utils.consts.RED
-
- line_str = obj.pop()
- if obj.prefix:
- line_str = "[%s] %s" % (
- utils.irc.color(obj.prefix, color), line_str)
- method = self._command_method(server, target)
-
- if not method in ["PRIVMSG", "NOTICE"]:
- raise ValueError("Unknown command-method '%s'" % method)
-
- line = IRCLine.ParsedLine(method, [target_str, line_str],
- tags=tags)
- valid, trunc = line.truncate(server.hostmask(),
- margin=STR_MORE_LEN)
-
- if trunc:
- if not trunc[0] in WORD_BOUNDARIES:
- for boundary in WORD_BOUNDARIES:
- left, *right = valid.rsplit(boundary, 1)
- if right:
- valid = left
- trunc = right[0]+trunc
- obj.insert("%s %s" % (STR_CONTINUED, trunc))
- valid = valid+STR_MORE
- line = IRCLine.parse_line(valid)
- server.send(line)
-
- @utils.hook("preprocess.command")
- def _check_min_args(self, event):
- min_args = event["hook"].get_kwarg("min_args")
- if min_args and len(event["args_split"]) < min_args:
- usage = self._get_usage(event["hook"], event["command"],
- event["command_prefix"])
- error = None
- if usage:
- error = "Not enough arguments, usage: %s" % usage
- else:
- error = "Not enough arguments (minimum: %d)" % min_args
- return utils.consts.PERMISSION_HARD_FAIL, error
-
- def _command_prefix(self, server, channel):
- return channel.get_setting("command-prefix",
- server.get_setting("command-prefix", "!"))
-
- @utils.hook("received.message.channel", priority=EventManager.PRIORITY_LOW)
- def channel_message(self, event):
- commands_enabled = event["channel"].get_setting("commands", True)
- if not commands_enabled:
- return
-
- command_prefix = self._command_prefix(event["server"], event["channel"])
- command = None
- args = ""
- if event["message_split"][0].startswith(command_prefix):
- if not event["channel"].get_setting("prefixed-commands",True):
- return
- command = event["message_split"][0].replace(
- command_prefix, "", 1).lower()
- if " " in event["message"]:
- args = event["message"].split(" ", 1)[1]
- elif len(event["message_split"]) > 1 and self.is_highlight(
- event["server"], event["message_split"][0]):
- command = event["message_split"][1].lower()
- if event["message"].count(" ") > 1:
- args = event["message"].split(" ", 2)[2]
-
- hook = None
- args_split = []
- if command:
- try:
- hook, command, args_split = self._find_command_hook(
- event["server"], event["channel"], True, command, args)
- except BadContextException:
- event["channel"].send_message(
- "%s: That command is not valid in a channel" %
- event["user"].nickname)
- return
-
- if hook:
- if event["action"]:
- return
-
- if hook:
- self.command(event["server"], event["channel"],
- event["target_str"], True, event["user"], command,
- args_split, event["line"], hook,
- command_prefix=command_prefix,
- buffer_line=event["buffer_line"])
- else:
- self.events.on("unknown.command").call(server=event["server"],
- target=event["channel"], user=event["user"],
- command=command, command_prefix=command_prefix,
- is_channel=True)
- else:
- regex_hooks = self.events.on("command.regex").get_hooks()
- for hook in regex_hooks:
- if event["action"] and hook.get_kwarg("ignore_action", True):
- continue
-
- pattern = hook.get_kwarg("pattern", None)
- if pattern:
- match = re.search(pattern, event["message"])
- if match:
- command = hook.get_kwarg("command", "")
- res = self.command(event["server"], event["channel"],
- event["target_str"], True, event["user"], command,
- "", event["line"], hook, match=match,
- message=event["message"], command_prefix="",
- action=event["action"],
- buffer_line=event["buffer_line"])
-
- if res:
- break
-
- @utils.hook("received.message.private", priority=EventManager.PRIORITY_LOW)
- def private_message(self, event):
- if event["message_split"] and not event["action"]:
- command = event["message_split"][0].lower()
-
- # this should help catch commands when people try to do prefixed
- # commands ('!help' rather than 'help') in PM
- command = command.lstrip("".join(NON_ALPHANUMERIC))
-
- args = ""
- if " " in event["message"]:
- args = event["message"].split(" ", 1)[1]
-
- try:
- hook, command, args_split = self._find_command_hook(
- event["server"], event["user"], False, command, args)
- except BadContextException:
- event["user"].send_message(
- "That command is not valid in a PM")
- return
-
- if hook:
- self.command(event["server"], event["user"],
- event["user"].nickname, False, event["user"], command,
- args_split, event["line"], hook, command_prefix="",
- buffer_line=event["buffer_line"])
- else:
- self.events.on("unknown.command").call(server=event["server"],
- target=event["user"], user=event["user"], command=command,
- command_prefix="", is_channel=False)
-
- def _get_usage(self, hook, command, command_prefix=""):
- command = "%s%s" % (command_prefix, command)
- usages = hook.get_kwargs("usage")
-
- if usages:
- return " | ".join(
- "%s %s" % (command, usage) for usage in usages)
- return None
-
- def _get_prefix(self, hook):
- return hook.get_kwarg("prefix", None)
- def _get_alias_of(self, hook):
- return hook.get_kwarg("alias_of", None)
-
- @utils.hook("send.stdout")
- def _stdout(self, event):
- self._send_out(event, OutType.OUT)
- @utils.hook("send.stderr")
- def _stderr(self, event):
- self._send_out(event, OutType.ERR)
-
- def _send_out(self, event, type):
- target = event["target"]
- stdout = outs.StdOut(event["module_name"])
- stdout.write(event["message"])
- if event.get("hide_prefix", False):
- stdout.prefix = None
-
- target_str = event.get("target_str", target.name)
- self._out(event["server"], target, target_str, stdout,
- type, {})
-
- @utils.hook("check.command.self")
- def check_command_self(self, event):
- if event["server"].irc_lower(event["request_args"][0]
- ) == event["user"].name:
- return utils.consts.PERMISSION_FORCE_SUCCESS, None
- else:
- return (utils.consts.PERMISSION_ERROR,
- "You do not have permission to do this")
-
- @utils.hook("check.command.is-channel")
- def check_command_is_channel(self, event):
- if event["is_channel"]:
- return utils.consts.PERMISSION_FORCE_SUCCESS, None
- else:
- return (utils.consts.PERMISSION_ERROR,
- "This command can only be used in-channel")