1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
from src import EventManager, ModuleManager, utils
# describing command arg specifications, to centralise parsing and validating.
#
# format: <!|?><name>
# ! = required
# ? = optional
#
# if "name" contains "~", it will be marked as an "important" spec
# this means that, e.g. "!r~channel" will be:
# - marked as important
# - name split to everything after ~
# - the name, and it's value, will be offered to other preprocessors.
#
# this means, in practice, that "!r~channel" is a:
# - "revelant" channel (current if in channel, explicit arg otherwise)
# - will be used to check if a user has permissions
#
# spec types:
# - "time" - +1w2d3h4m5s format time
# - "rchannel" - relevant channel. current channel if we're in channel,
# otherwise an explicit channel name argument
# - "channel" - an argument of a channel name
# - "cuser" - a nickname but only if they are in the current channel
# - "ruser" - revlevant user. either current user if no arguments, otherwise
# take nickname for user from given args
# - "user" - an argument of a user's nickname
# - "ouser" - an argument of a potentially offline user's nickname
# - "word" - one word from arguments
# - "string" - collect all remaining args in to a string
class Module(ModuleManager.BaseModule):
def _spec_value(self, server, channel, user, argument_types, args):
options = []
first_error = None
for argument_type in argument_types:
value = None
n = 0
error = None
simple_value, simple_count = argument_type.simple(args)
if not simple_count == -1:
value = simple_value
n = simple_count
error = argument_type.error()
elif argument_type.type == "rchannel":
if channel:
value = channel
elif args:
n = 1
if args[0] in server.channels:
value = server.channels.get(args[0])
error = "No such channel"
else:
error = "No channel provided"
elif argument_type.type == "channel" and args:
if args[0] in server.channels:
value = server.channels.get(args[0])
n = 1
error = "No such channel"
elif argument_type.type == "cuser" and args:
tuser = server.get_user(args[0], create=False)
if tuser and channel.has_user(tuser):
value = tuser
n = 1
error = "That user is not in this channel"
elif argument_type.type == "ruser":
if args:
value = server.get_user(args[0], create=False)
n = 1
else:
value = user
error = "No such user"
elif argument_type.type == "user":
if args:
value = server.get_user(args[0], create=False)
n = 1
error = "No such user"
else:
error = "No user provided"
elif argument_type.type == "ouser":
if args:
if server.has_user_id(args[0]):
value = server.get_user(args[0], create=True)
error = "Unknown nickname"
n = 1
elif argument_type.type == "nuser":
if args:
value = server.get_user(args[0], create=True)
n = 1
elif argument_type.type == "channelonly":
if channel:
value = True
n = 0
error = "Command not valid in PM"
elif argument_type.type == "privateonly":
if not channel:
value = True
n = 0
error = "Command not valid in-channel"
options.append([argument_type, value, n, error])
return options
def _argument_types(self, options, args):
current_error = None
for argument_type, value, n, error in options:
if not value == None:
return [argument_type, n, value]
elif error:
current_error = error
elif n > len(args):
current_error = "Not enough arguments"
return [None, -1, current_error or "Invalid arguments"]
@utils.hook("preprocess.command")
@utils.kwarg("priority", EventManager.PRIORITY_HIGH)
def preprocess(self, event):
specs = event["hook"].get_kwargs("spec")
if specs:
server = event["server"]
channel = event["target"] if event["is_channel"] else None
user = event["user"]
overall_error = None
best_count = 0
for spec_arguments in specs:
out = {}
args = event["args_split"].copy()
kwargs = {"channel": channel}
failed = False
current_error = None
count = 0
for i, spec_argument in enumerate(spec_arguments):
argument_type_multi = len(set(
t.type for t in spec_argument.types)) > 1
options = self._spec_value(server, kwargs["channel"], user,
spec_argument.types, args)
argument_type, n, value = self._argument_types(options, args)
if n > -1:
args = args[n:]
if argument_type.exported:
kwargs[argument_type.exported] = value
if argument_type_multi:
value = [argument_type.type, value]
elif not spec_argument.optional:
failed = True
current_error = value
break
else:
value = None
if not argument_type == None and spec_argument.consume:
out[i] = value
argument_type_name = argument_type.name()
if argument_type_name:
out[argument_type_name] = value
if not failed:
kwargs["spec"] = out
event["kwargs"].update(kwargs)
return
else:
if count >= best_count:
overall_error = current_error
return utils.consts.PERMISSION_HARD_FAIL, overall_error
|