1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
#--depends-on channel_access
#--depends-on check_mode
#--depends-on commands
#--depends-on shorturl
import itertools, json, re, urllib.parse
from src import ModuleManager, utils
from . import colors, gitea, github, gitlab
FORM_ENCODED = "application/x-www-form-urlencoded"
DEFAULT_EVENT_CATEGORIES = [
"ping", "code", "pr", "issue", "repo"
]
@utils.export("channelset", utils.BoolSetting("git-prevent-highlight",
"Enable/disable preventing highlights"))
@utils.export("channelset", utils.BoolSetting("git-hide-organisation",
"Hide/show organisation in repository names"))
@utils.export("channelset", utils.BoolSetting("git-hide-prefix",
"Hide/show command-like prefix on git webhook outputs"))
@utils.export("channelset", utils.BoolSetting("git-shorten-urls",
"Weather or not git webhook URLs should be shortened"))
@utils.export("botset", utils.BoolSetting("git-show-private",
"Whether or not to show git activity for private repositories"))
class Module(ModuleManager.BaseModule):
_name = "Webhooks"
def on_load(self):
self._github = github.GitHub(self.log)
self._gitea = gitea.Gitea()
self._gitlab = gitlab.GitLab()
@utils.hook("api.post.github")
def _api_github_webhook(self, event):
return self._webhook("github", "GitHub", self._github,
event["data"], event["headers"])
@utils.hook("api.post.gitea")
def _api_gitea_webhook(self, event):
return self._webhook("gitea", "Gitea", self._gitea,
event["data"], event["headers"])
@utils.hook("api.post.gitlab")
def _api_gitlab_webhook(self, event):
return self._webhook("gitlab", "GitLab", self._gitlab,
event["data"], event["headers"])
def _webhook(self, webhook_type, webhook_name, handler, payload_str,
headers):
payload = payload_str.decode("utf8")
if headers["Content-Type"] == FORM_ENCODED:
payload = urllib.parse.unquote(urllib.parse.parse_qs(payload)[
"payload"][0])
data = json.loads(payload)
if handler.is_private(data, headers) and not self.bot.get_setting(
"git-show-private", False):
return {"state": "success", "deliveries": 0}
full_name, repo_username, repo_name, organisation = handler.names(
data, headers)
full_name_lower = (full_name or "").lower()
repo_username_lower = (repo_username or "").lower()
repo_name_lower = (repo_name or "").lower()
organisation_lower = (organisation or "").lower()
branch = handler.branch(data, headers)
current_event, event_action = handler.event(data, headers)
hooks = self.bot.database.channel_settings.find_by_setting(
"git-webhooks")
targets = []
repo_hooked = False
for server_id, channel_name, hooked_repos in hooks:
hooked_repos_lower = {k.lower(): v for k, v in hooked_repos.items()}
found_hook = None
if full_name_lower and full_name_lower in hooked_repos_lower:
found_hook = hooked_repos_lower[full_name_lower]
elif repo_username_lower and repo_username_lower in hooked_repos_lower:
found_hook = hooked_repos_lower[repo_username_lower]
elif organisation_lower and organisation_lower in hooked_repos_lower:
found_hook = hooked_repos_lower[organisation_lower]
else:
continue
repo_hooked = True
server = self.bot.get_server_by_id(server_id)
if server and channel_name in server.channels:
if (branch and
found_hook["branches"] and
not branch in found_hook["branches"]):
continue
events = []
for hooked_event in found_hook["events"]:
events.append(handler.event_categories(hooked_event))
events = list(itertools.chain(*events))
channel = server.channels.get(channel_name)
if (current_event in events or
(event_action and event_action in events)):
targets.append([server, channel])
if not targets:
if not repo_hooked:
return None
else:
return {"state": "success", "deliveries": 0}
outputs = handler.webhook(full_name, current_event, data, headers)
if outputs:
for server, channel in targets:
source = full_name or organisation
hide_org = channel.get_setting("git-hide-organisation", False)
if repo_name and hide_org:
source = repo_name
for output, url in outputs:
output = "(%s) %s" % (
utils.irc.color(source, colors.COLOR_REPO), output)
if url:
if channel.get_setting("git-shorten-urls", False):
url = self.exports.get_one("shorturl")(server, url,
context=channel) or url
output = "%s - %s" % (output, url)
if channel.get_setting("git-prevent-highlight", False):
output = self._prevent_highlight(server, channel,
output)
hide_prefix = channel.get_setting("git-hide-prefix", False)
self.events.on("send.stdout").call(target=channel,
module_name=webhook_name, server=server, message=output,
hide_prefix=hide_prefix)
return {"state": "success", "deliveries": len(targets)}
def _prevent_highlight(self, server, channel, s):
for user in channel.users:
if len(user.nickname) == 1:
# if we don't ignore 1-letter nicknames, the below while loop
# will fire indefininitely.
continue
regex = re.compile(r"([0-9]|\W)(%s)(%s)" % (
re.escape(user.nickname[0]), re.escape(user.nickname[1:])),
re.I)
s = regex.sub("\\1\\2\u200c\\3", s)
return s
@utils.hook("received.command.webhook", min_args=1, channel_only=True)
def github_webhook(self, event):
"""
:help: Add/remove/modify a git webhook
:require_mode: high
:require_access: git-webhook
:permission: gitoverride
:usage: list
:usage: add <hook>
:usage: remove <hook>
:usage: events <hook> [category [category ...]]
:usage: branches <hook> [branch [branch ...]]
"""
all_hooks = event["target"].get_setting("git-webhooks", {})
hook_name = None
existing_hook = None
if len(event["args_split"]) > 1:
hook_name = event["args_split"][1]
for existing_hook_name in all_hooks.keys():
if existing_hook_name.lower() == hook_name.lower():
existing_hook = existing_hook_name
break
success_message = None
subcommand = event["args_split"][0].lower()
if subcommand == "list":
event["stdout"].write("Registered webhooks: %s" %
", ".join(all_hooks.keys()))
elif subcommand == "add":
if existing_hook:
raise utils.EventError("There's already a hook for %s" %
hook_name)
all_hooks[hook_name] = {
"events": DEFAULT_EVENT_CATEGORIES.copy(),
"branches": [],
}
success_message = "Added hook for %s" % hook_name
elif subcommand == "remove":
if not existing_hook:
raise utils.EventError("No hook found for %s" % hook_name)
del all_hooks[existing_hook]
success_message = "Removed hook for %s" % hook_name
elif subcommand == "events":
if not existing_hook:
raise utils.EventError("No hook found for %s" % hook_name)
if len(event["args_split"]) < 3:
event["stdout"].write("Events for hook %s: %s" %
(hook_name, " ".join(all_hooks[existing_hook]["events"])))
else:
new_events = [e.lower() for e in event["args_split"][2:]]
all_hooks[existing_hook]["events"] = new_events
success_message = "Updated events for hook %s" % hook_name
elif subcommand == "branches":
if not existing_hook:
raise utils.EventError("No hook found for %s" % hook_name)
if len(event["args_split"]) < 3:
branches = ",".join(all_hooks[existing_hook]["branches"])
event["stdout"].write("Branches shown for hook %s: %s" %
(hook_name, branches))
else:
all_hooks[existing_hook]["branches"] = event["args_split"][2:]
success_message = "Updated branches for hook %s" % hook_name
else:
event["stderr"].write("Unknown command '%s'" %
event["args_split"][0])
if not success_message == None:
if all_hooks:
event["target"].set_setting("git-webhooks", all_hooks)
else:
event["target"].del_setting("git-webhooks")
event["stdout"].write(success_message)
|