aboutsummaryrefslogtreecommitdiff
path: root/modules/git_webhooks/__init__.py
blob: c847866ebbe42cd6087ce1cc3aa12158d33b7dc2 (about) (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#--depends-on channel_access
#--depends-on check_mode
#--depends-on commands
#--depends-on shorturl

import itertools, json, re, urllib.parse
from src import ModuleManager, utils
from . import colors, gitea, github, gitlab

FORM_ENCODED = "application/x-www-form-urlencoded"

DEFAULT_EVENT_CATEGORIES = [
    "ping", "code", "pr", "issue", "repo"
]

@utils.export("channelset", utils.BoolSetting("git-prevent-highlight",
    "Enable/disable preventing highlights"))
@utils.export("channelset", utils.BoolSetting("git-hide-organisation",
    "Hide/show organisation in repository names"))
@utils.export("channelset", utils.BoolSetting("git-hide-prefix",
    "Hide/show command-like prefix on git webhook outputs"))
@utils.export("channelset", utils.BoolSetting("git-shorten-urls",
    "Weather or not git webhook URLs should be shortened"))
@utils.export("botset", utils.BoolSetting("git-show-private",
    "Whether or not to show git activity for private repositories"))
class Module(ModuleManager.BaseModule):
    _name = "Webhooks"

    def on_load(self):
        self._github = github.GitHub(self.log)
        self._gitea = gitea.Gitea()
        self._gitlab = gitlab.GitLab()

    @utils.hook("api.post.github")
    def _api_github_webhook(self, event):
        return self._webhook("github", "GitHub", self._github,
            event["data"], event["headers"], event["params"])

    @utils.hook("api.post.gitea")
    def _api_gitea_webhook(self, event):
        return self._webhook("gitea", "Gitea", self._gitea,
            event["data"], event["headers"], event["params"])

    @utils.hook("api.post.gitlab")
    def _api_gitlab_webhook(self, event):
        return self._webhook("gitlab", "GitLab", self._gitlab,
            event["data"], event["headers"], event["params"])

    def _webhook(self, webhook_type, webhook_name, handler, payload_str,
            headers, params):
        payload = payload_str.decode("utf8")
        if headers["Content-Type"] == FORM_ENCODED:
            payload = urllib.parse.unquote(urllib.parse.parse_qs(payload)[
                "payload"][0])
        data = json.loads(payload)

        if handler.is_private(data, headers) and not self.bot.get_setting(
                "git-show-private", False):
            return {"state": "success", "deliveries": 0}

        full_name, repo_username, repo_name, organisation = handler.names(
            data, headers)

        full_name_lower = (full_name or "").lower()
        repo_username_lower = (repo_username or "").lower()
        repo_name_lower = (repo_name or "").lower()
        organisation_lower = (organisation or "").lower()

        branch = handler.branch(data, headers)
        current_events = handler.event(data, headers)

        unfiltered_targets = []
        if "channels" in params:
            channels = params["channels"].split(",")
            for channel in params["channels"].split(","):
                server, _, channel_name = channel.partition(":")
                if server and channel_name:
                    server = self.bot.get_server_by_alias(server)

                    if server and channel_name in server.channels:
                        channel = server.channels.get(channel_name)
                        hooks = channel.get_setting("git-webhooks", {})

                        if hooks:
                            found_hook = self._find_hook(
                                full_name_lower, repo_username_lower,
                                organisation_lower, hooks)

                            if found_hook:
                                unfiltered_targets.append([
                                    server, channel, found_hook])
        else:
            unfiltered_targets = self._find_targets(full_name_lower,
                repo_username_lower, organisation_lower)

        repo_hooked = bool(unfiltered_targets)
        targets = []
        for server, channel, hook in unfiltered_targets:
            if (branch and
                    hook["branches"] and
                    not branch in hook["branches"]):
                continue

            hooked_events = []
            for hooked_event in hook["events"]:
                hooked_events.append(handler.event_categories(hooked_event))
            hooked_events = set(itertools.chain(*hooked_events))

            if bool(set(current_events)&set(hooked_events)):
                targets.append([server, channel])

        if not targets:
            if not repo_hooked:
                return None
            else:
                return {"state": "success", "deliveries": 0}

        outputs = handler.webhook(full_name, current_event, data, headers)

        if outputs:
            for server, channel in targets:
                source = full_name or organisation
                hide_org = channel.get_setting("git-hide-organisation", False)
                if repo_name and hide_org:
                    source = repo_name

                for output, url in outputs:
                    output = "(%s) %s" % (
                        utils.irc.color(source, colors.COLOR_REPO), output)

                    if url:
                        if channel.get_setting("git-shorten-urls", False):
                            url = self.exports.get_one("shorturl")(server, url,
                                context=channel) or url
                        output = "%s - %s" % (output, url)

                    if channel.get_setting("git-prevent-highlight", False):
                        output = self._prevent_highlight(server, channel,
                            output)

                    hide_prefix = channel.get_setting("git-hide-prefix", False)
                    self.events.on("send.stdout").call(target=channel,
                        module_name=webhook_name, server=server, message=output,
                        hide_prefix=hide_prefix)

        return {"state": "success", "deliveries": len(targets)}

    def _prevent_highlight(self, server, channel, s):
        for user in channel.users:
            if len(user.nickname) == 1:
                # if we don't ignore 1-letter nicknames, the below while loop
                # will fire indefininitely.
                continue

            regex = re.compile(r"([0-9]|\W)(%s)(%s)" % (
                re.escape(user.nickname[0]), re.escape(user.nickname[1:])),
                re.I)
            s = regex.sub("\\1\\2\u200c\\3", s)

        return s

    def _find_targets(self, full_name_lower, repo_username_lower,
            organisation_lower):
        hooks = self.bot.database.channel_settings.find_by_setting(
            "git-webhooks")
        targets = []
        for server_id, channel_name, hooked_repos in hooks:
            found_hook = self._find_hook(full_name_lower, repo_username_lower,
                organisation_lower, hooked_repos)
            server = self.bot.get_server_by_id(server_id)
            if found_hook and server and channel_name in server.channels:
                channel = server.channels.get(channel_name)
                targets.append([server, channel, found_hook])

        return targets

    def _find_hook(self, full_name_lower, repo_username_lower,
            organisation_lower, hooks):
        hooked_repos_lower = {k.lower(): v for k, v in hooks.items()}
        if full_name_lower and full_name_lower in hooked_repos_lower:
            return hooked_repos_lower[full_name_lower]
        elif (repo_username_lower and
                repo_username_lower in hooked_repos_lower):
            return hooked_repos_lower[repo_username_lower]
        elif (organisation_lower and
                organisation_lower in hooked_repos_lower):
            return hooked_repos_lower[organisation_lower]


    @utils.hook("received.command.webhook", min_args=1, channel_only=True)
    def github_webhook(self, event):
        """
        :help: Add/remove/modify a git webhook
        :require_mode: high
        :require_access: git-webhook
        :permission: gitoverride
        :usage: list
        :usage: add <hook>
        :usage: remove <hook>
        :usage: events <hook> [category [category ...]]
        :usage: branches <hook> [branch [branch ...]]
        """
        all_hooks = event["target"].get_setting("git-webhooks", {})
        hook_name = None
        existing_hook = None
        if len(event["args_split"]) > 1:
            hook_name = event["args_split"][1]
            for existing_hook_name in all_hooks.keys():
                if existing_hook_name.lower() == hook_name.lower():
                    existing_hook = existing_hook_name
                    break

        success_message = None

        subcommand = event["args_split"][0].lower()
        if subcommand == "list":
            event["stdout"].write("Registered webhooks: %s" %
                ", ".join(all_hooks.keys()))
        elif subcommand == "add":
            if existing_hook:
                raise utils.EventError("There's already a hook for %s" %
                    hook_name)

            all_hooks[hook_name] = {
                "events": DEFAULT_EVENT_CATEGORIES.copy(),
                "branches": [],
            }
            success_message = "Added hook for %s" % hook_name

        elif subcommand == "remove":
            if not existing_hook:
                raise utils.EventError("No hook found for %s" % hook_name)

            del all_hooks[existing_hook]
            success_message = "Removed hook for %s" % hook_name

        elif subcommand == "events":
            if not existing_hook:
                raise utils.EventError("No hook found for %s" % hook_name)

            if len(event["args_split"]) < 3:
                event["stdout"].write("Events for hook %s: %s" %
                    (hook_name, " ".join(all_hooks[existing_hook]["events"])))
            else:
                new_events = [e.lower() for e in event["args_split"][2:]]
                all_hooks[existing_hook]["events"] = new_events
                success_message = "Updated events for hook %s" % hook_name
        elif subcommand == "branches":
            if not existing_hook:
                raise utils.EventError("No hook found for %s" % hook_name)

            if len(event["args_split"]) < 3:
                branches = ",".join(all_hooks[existing_hook]["branches"])
                event["stdout"].write("Branches shown for hook %s: %s" %
                    (hook_name, branches))
            else:
                all_hooks[existing_hook]["branches"] = event["args_split"][2:]
                success_message = "Updated branches for hook %s" % hook_name
        else:
            event["stderr"].write("Unknown command '%s'" %
                event["args_split"][0])

        if not success_message == None:
            if all_hooks:
                event["target"].set_setting("git-webhooks", all_hooks)
            else:
                event["target"].del_setting("git-webhooks")

            event["stdout"].write(success_message)