1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
#--depends-on commands
import re
from src import ModuleManager, utils
class Module(ModuleManager.BaseModule):
_name = "Filter"
def _split(self, s):
backslash = False
forward_slash = []
for i, c in enumerate(s):
if not backslash:
if c == "/":
forward_slash.append(i)
if c == "\\":
backslash = True
else:
backslash = False
if forward_slash and (not forward_slash[-1] == (len(s)-1)):
forward_slash.append(len(s))
last = 0
out = []
for i in forward_slash:
out.append(s[last:i])
last = i+1
return out
def _get_filters(self, server, target):
filters = self.bot.get_setting("message-filters", [])
filters.extend(server.get_setting("message-filters", []))
filters.extend(target.get_setting("message-filters", []))
return list(set(filters))
@utils.hook("preprocess.send.privmsg")
@utils.hook("preprocess.send.notice")
def channel_message(self, event):
message = event["line"].args[1]
message_plain = utils.irc.strip_font(message)
original_message = message
target_name = event["line"].args[0]
# strip off any STATUSMSG chars
target_name = target_name.strip("".join(event["server"].statusmsg))
if target_name in event["server"].channels:
target = event["server"].channels.get(target_name)
else:
target = event["server"].get_user(target_name)
filters = self._get_filters(event["server"], target)
for filter in filters:
type, pattern, *args = self._split(filter)
if type == "m":
if re.search(pattern, message_plain):
self.log.info("Message matched filter, dropping: %s"
% event["line"].format())
event["line"].invalidate()
return
elif type == "s":
replace, *args = args
message = re.sub(pattern, replace, message)
if not message == message_plain:
event["line"].args[1] = message
@utils.hook("received.command.cfilter", channel_only=True)
@utils.hook("received.command.filter")
@utils.hook("received.command.bfilter")
@utils.kwarg("help", "Add a message filter for the current channel")
@utils.kwarg("permissions", "cfilter")
@utils.spec("!'list ?<index>int")
@utils.spec("!'add ?<m/pattern/>string|<s/pattern/replace/>string")
@utils.spec("!'remove !<index>int")
def filter(self, event):
# mark output as "assured" so it can bypass filtering
event["stdout"].assure()
event["stderr"].assure()
if event["command"] == "cfilter":
target = event["target"]
elif event["command"] == "bfilter":
target = self.bot
else:
target = event["server"]
filters = target.get_setting("message-filters", [])
if event["spec"][0] == "list":
if event["spec"][1]:
if not len(filters) > event["spec"][1]:
raise utils.EventError("Filter index %d doesn't exist"
% event["spec"][1])
event["stdout"].write("Message filter %d: %s"
% (event["spec"][1], filters[event["spec"][1]]))
else:
s = ", ".join(
f"({i}) {s}" for i, s in enumerate(filters))
event["stdout"].write("Message filters: %s" % s)
elif event["spec"][0] == "add":
filters.append(event["spec"][1])
target.set_setting("message-filters", filters)
event["stdout"].write("Added filter %d" % (len(filters)-1))
elif event["spec"][0] == "remove":
if not filters:
raise utils.EventError("No filters")
removed = filters.pop(event["spec"][1])
target.set_setting("message-filters", filters)
event["stdout"].write("Removed filter %d: %s" %
(event["spec"][1], removed))
|