From fa279aab93c80b50c24d700c8d9d959399897371 Mon Sep 17 00:00:00 2001 From: jesopo Date: Wed, 14 Aug 2019 14:38:47 +0100 Subject: refactor/rewrite channel_op.py, split highlight spam protection out --- modules/channel_op.py | 381 +++++++++++++--------------------------------- modules/highlight_spam.py | 31 ++++ 2 files changed, 141 insertions(+), 271 deletions(-) create mode 100644 modules/highlight_spam.py (limited to 'modules') diff --git a/modules/channel_op.py b/modules/channel_op.py index 151dc12a..719b0f50 100644 --- a/modules/channel_op.py +++ b/modules/channel_op.py @@ -5,17 +5,8 @@ from src import ModuleManager, utils -class UserNotFoundException(Exception): - pass -class InvalidTimeoutException(Exception): - pass +KICK_REASON = "your behavior is not conducive to the desired environment" -@utils.export("channelset", utils.IntSetting("highlight-spam-threshold", - "Set the number of nicknames in a message that qualifies as spam")) -@utils.export("channelset", utils.BoolSetting("highlight-spam-protection", - "Enable/Disable highlight spam protection")) -@utils.export("channelset", utils.BoolSetting("highlight-spam-ban", - "Enable/Disable banning highlight spammers instead of just kicking")) @utils.export("channelset", utils.Setting("ban-format", "Set ban format ($n = nick, $u = username, $h = hostname)", example="*!$u@$h")) @@ -23,293 +14,141 @@ class InvalidTimeoutException(Exception): ["qmode", "insp", "unreal", "none"], "Set this server's method of muting users")) class Module(ModuleManager.BaseModule): - _name = "ChanOp" - - @utils.hook("timer.unban") - def _timer_unban(self, event): - server = self.bot.get_server_by_id(event["server_id"]) - if server and event["channel_name"] in server.channels: - channel = server.channels.get(event["channel_name"]) - channel.send_unban(event["hostmask"]) - - def _kick(self, server, channel, nickname, reason): - target_user = server.get_user(nickname) - if channel.has_user(target_user): - channel.send_kick(nickname, reason) + def _parse_time(self, args, min_args): + if args[0][0] == "+": + if len(args[1:]) < min_args: + raise utils.EventError("Not enough arguments") + time = utils.from_pretty_time(args[0][1:]) + if time == None: + raise utils.EventError("Invalid timeframe") + return time, args[1:] + return None, args + + def _kick(self, server, channel, target_nickname, reason): + target_user = server.get_user(target_nickname, create=False) + if target_user and channel.has_user(target_user): + reason = " ".join(reason) or KICK_REASON + channel.send_kick(target_user.nickname, reason) else: - raise UserNotFoundException("That user is not in this channel") - def _kick_command(self, event, channel, args_split): - target = args_split[0] - reason = " ".join(args_split[1:]) or None - - try: - self._kick(event["server"], channel, target, reason) - except UserNotFoundException as e: - event["stderr"].write(str(e)) - - @utils.hook("received.command.kick", private_only=True, min_args=2) - def private_kick(self, event): - """ - :help: Kick a user from the current channel - :usage: [reason] - :prefix: Kick - """ - channel = event["server"].channels.get(event["args_split"][0]) - - event["check_assert"](utils.Check("channel-access", channel, "kick")) - - self._kick_command(event, channel, event["args_split"][1:]) + raise utils.EventError("No such user") - @utils.hook("received.command.k", alias_of="kick") - @utils.hook("received.command.kick", channel_only=True, min_args=1) + @utils.hook("received.command.kick") + @utils.hook("received.command.k", alias_of="k") + @utils.kwarg("min_args", 1) + @utils.kwarg("require_mode", "o") + @utils.kwarg("require_access", "kick") + @utils.kwarg("usage", " [reason]") def kick(self, event): - """ - :help: Kick a user from the current channel - :usage: [reason] - :require_mode: o - :require_access: kick - :prefix: Kick - """ - self._kick_command(event, event["target"], event["args_split"]) + self._kick(event["server"], event["target"], event["args_split"][0], + event["args_split"][1:]) - def _ban_format(self, user, s): + def _format_hostmask(self, user, s): return s.replace("$n", user.nickname).replace("$u", user.username ).replace("$h", user.hostname) - def _ban_user(self, channel, ban, user): - if channel.has_user(user): - format = channel.get_setting("ban-format", "*!$u@$h") - hostmask_split = format.split("$$") - hostmask_split = [self._ban_format(user, s) for s in hostmask_split] - hostmask = "".join(hostmask_split) - if ban: - channel.send_ban(hostmask) - else: - channel.send_unban(hostmask) - return hostmask + def _get_hostmask(self, channel, user): + format = channel.get_setting("ban-format", "*!$u@$h") + hostmask_split = [ + self._format_hostmask(user, s) for s in format.split("$$")] + return "$".join(hostmask_split) + + def _ban(self, server, channel, target, allow_hostmask, time, add): + hostmask = None + target_user = server.get_user(target, create=False) + if target_user and channel.has_user(target_user): + hostmask = self._get_hostmask(channel, target_user) else: - raise UserNotFoundException("That user is not in this channel") - - def _ban(self, server, channel, ban, target): - target_user = server.get_user(target) - if channel.has_user(target_user): - return self._ban_user(channel, ban, target_user) + if not allow_hostmask: + raise utils.EventError("No such user") + hostmask = target + if not add: + channel.send_unban(hostmask) else: - if ban: - channel.send_ban(target) - else: - channel.send_unban(target) - return target + channel.send_ban(hostmask) - @utils.hook("received.command.ban", private_only=True, min_args=2) - def private_ban(self, event): - """ - :help: Ban a user/hostmask from the current channel - :usage: - """ - channel = event["server"].channels.get(event["args_split"][0]) + if not time == None: + self.timers.add_persistent("unban", time, server_id=server.id, + channel_name=channel.name, hostmask=hostmask) - event["check_assert"](utils.Check("channel-access", channel, "ban")) + @utils.hook("timer.unban") + def _timer_unban(self, event): + server = self.bot.get_server_by_id(event["server_id"]) + if server and event["channel_name"] in server.channels: + channel = server.channels.get(event["channel_name"]) + channel.send_unban(event["hostmask"]) - self._ban(event["server"], channel, True, event["args_split"][1]) - @utils.hook("received.command.ban", channel_only=True, min_args=1) + @utils.hook("received.command.ban") + @utils.kwarg("min_args", 1) + @utils.kwarg("require_mode", "o") + @utils.kwarg("require_access", "ban") + @utils.kwarg("usage", "[+time] ") def ban(self, event): - """ - :help: Ban a user/hostmask from the current channel - :usage: - :require_mode: o - :require_access: ban - """ - self._ban(event["server"], event["target"], True, - event["args_split"][0]) - - def _temp_ban(self, event, accept_hostmask): - timeout = utils.from_pretty_time(event["args_split"][1]) - if not timeout: - raise InvalidTimeoutException( - "Please provided a valid time above 0 seconds") - - if accept_hostmask: - hostmask = self._ban(event["server"], event["target"], True, - event["args_split"][0]) - else: - hostmask = self._ban_user(event["target"], True, - event["server"].get_user(event["args_split"][0])) - - self.timers.add_persistent("unban", timeout, - server_id=event["server"].id, - channel_name=event["target"].name, hostmask=hostmask) - - @utils.hook("received.command.tb", alias_of="tempban") - @utils.hook("received.command.tempban", channel_only=True, min_args=2) - def temp_ban(self, event): - """ - :help: Temporarily ban someone from the current channel - :usage: