aboutsummaryrefslogtreecommitdiff
path: root/modules/git_webhooks/__init__.py
blob: f2dd1f822d0010777cb3991f7bee5cd1dc9b9fb1 (about) (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
#--depends-on channel_access
#--depends-on check_mode
#--depends-on commands
#--depends-on shorturl

import itertools, json, re, urllib.parse
from src import ModuleManager, utils
from . import colors, gitea, github

FORM_ENCODED = "application/x-www-form-urlencoded"

DEFAULT_EVENT_CATEGORIES = [
    "ping", "code", "pr", "issue", "repo"
]

@utils.export("channelset", utils.BoolSetting("git-prevent-highlight",
    "Enable/disable preventing highlights"))
@utils.export("channelset", utils.BoolSetting("git-hide-organisation",
    "Hide/show organisation in repository names"))
@utils.export("channelset", utils.BoolSetting("git-hide-prefix",
    "Hide/show command-like prefix on git webhook outputs"))
@utils.export("botset", utils.BoolSetting("git-show-private",
    "Whether or not to show git activity for private repositories"))
class Module(ModuleManager.BaseModule):
    _name = "Webhooks"

    def on_load(self):
        self._github = github.GitHub(self.log)
        self._gitea = gitea.Gitea()

    @utils.hook("api.post.github")
    def _api_github_webhook(self, event):
        return self._webhook("github", "GitHub", self._github,
            event["data"], event["headers"])

    @utils.hook("api.post.gitea")
    def _api_gitea_webhook(self, event):
        return self._webhook("gitea", "Gitea", self._gitea,
            event["data"], event["headers"])

    def _webhook(self, webhook_type, webhook_name, handler, payload_str,
            headers):
        payload = payload_str.decode("utf8")
        if headers["Content-Type"] == FORM_ENCODED:
            payload = urllib.parse.unquote(urllib.parse.parse_qs(payload)[
                "payload"][0])
        data = json.loads(payload)

        if handler.is_private(data, headers) and not self.bot.get_setting(
                "git-show-private", False):
            return {"state": "success", "deliveries": 0}

        full_name, repo_username, repo_name, organisation = handler.names(
            data, headers)

        full_name_lower = (full_name or "").lower()
        repo_username_lower = (repo_username or "").lower()
        repo_name_lower = (repo_name or "").lower()
        organisation_lower = (organisation or "").lower()

        branch = handler.branch(data, headers)
        current_event, event_action = handler.event(data, headers)

        hooks = self.bot.database.channel_settings.find_by_setting(
            "git-webhooks")

        targets = []
        repo_hooked = False

        for server_id, channel_name, hooked_repos in hooks:
            hooked_repos_lower = {k.lower(): v for k, v in hooked_repos.items()}
            found_hook = None
            if full_name_lower and full_name_lower in hooked_repos_lower:
                found_hook = hooked_repos_lower[full_name_lower]
            elif repo_username_lower and repo_username_lower in hooked_repos_lower:
                found_hook = hooked_repos_lower[repo_username_lower]
            elif organisation_lower and organisation_lower in hooked_repos_lower:
                found_hook = hooked_repos_lower[organisation_lower]
            else:
                continue

            repo_hooked = True
            server = self.bot.get_server_by_id(server_id)
            if server and channel_name in server.channels:
                if (branch and
                        found_hook["branches"] and
                        not branch in found_hook["branches"]):
                    continue

                events = []
                for hooked_event in found_hook["events"]:
                    events.append(handler.event_categories(hooked_event))
                events = list(itertools.chain(*events))

                channel = server.channels.get(channel_name)
                if (current_event in events or
                        (event_action and event_action in events)):
                    targets.append([server, channel])

        if not targets:
            if not repo_hooked:
                return None
            else:
                return {"state": "success", "deliveries": 0}

        outputs = handler.webhook(full_name, current_event, data, headers)

        if outputs:
            for server, channel in targets:
                source = full_name or organisation
                hide_org = channel.get_setting("git-hide-organisation", False)
                if repo_name and hide_org:
                    source = repo_name

                for output in outputs:
                    output = "(%s) %s" % (
                        utils.irc.color(source, colors.COLOR_REPO), output)

                    if channel.get_setting("git-prevent-highlight", False):
                        output = self._prevent_highlight(server, channel,
                            output)

                    hide_prefix = channel.get_setting("git-hide-prefix", False)
                    self.events.on("send.stdout").call(target=channel,
                        module_name=webhook_name, server=server, message=output,
                        hide_prefix=hide_prefix)

        return {"state": "success", "deliveries": len(targets)}

    def _prevent_highlight(self, server, channel, s):
        for user in channel.users:
            if len(user.nickname) == 1:
                # if we don't ignore 1-letter nicknames, the below while loop
                # will fire indefininitely.
                continue

            regex = re.compile(r"([0-9]|\W)(%s)(%s)" % (
                re.escape(user.nickname[0]), re.escape(user.nickname[1:])),
                re.I)
            s = regex.sub("\\1\\2\u200c\\3", s)

        return s

    @utils.hook("received.command.webhook", min_args=1, channel_only=True)
    def github_webhook(self, event):
        """
        :help: Add/remove/modify a git webhook
        :require_mode: high
        :require_access: git-webhook
        :permission: gitoverride
        :usage: list
        :usage: add <hook>
        :usage: remove <hook>
        :usage: events <hook> [category [category ...]]
        :usage: branches <hook> [branch [branch ...]]
        """
        all_hooks = event["target"].get_setting("git-webhooks", {})
        hook_name = None
        existing_hook = None
        if len(event["args_split"]) > 1:
            hook_name = event["args_split"][1]
            for existing_hook_name in all_hooks.keys():
                if existing_hook_name.lower() == hook_name.lower():
                    existing_hook = existing_hook_name
                    break

        success_message = None

        subcommand = event["args_split"][0].lower()
        if subcommand == "list":
            event["stdout"].write("Registered webhooks: %s" %
                ", ".join(all_hooks.keys()))
        elif subcommand == "add":
            if existing_hook:
                raise utils.EventError("There's already a hook for %s" %
                    hook_name)

            all_hooks[hook_name] = {
                "events": DEFAULT_EVENT_CATEGORIES.copy(),
                "branches": [],
            }
            success_message = "Added hook for %s" % hook_name

        elif subcommand == "remove":
            if not existing_hook:
                raise utils.EventError("No hook found for %s" % hook_name)

            del all_hooks[existing_hook]
            success_message = "Removed hook for %s" % hook_name

        elif subcommand == "events":
            if not existing_hook:
                raise utils.EventError("No hook found for %s" % hook_name)

            if len(event["args_split"]) < 3:
                event["stdout"].write("Events for hook %s: %s" %
                    (hook_name, " ".join(all_hooks[existing_hook]["events"])))
            else:
                new_events = [e.lower() for e in event["args_split"][2:]]
                all_hooks[existing_hook]["events"] = new_events
                success_message = "Updated events for hook %s" % hook_name
        elif subcommand == "branches":
            if not existing_hook:
                raise utils.EventError("No hook found for %s" % hook_name)

            if len(event["args_split"]) < 3:
                branches = ",".join(all_hooks[existing_hook]["branches"])
                event["stdout"].write("Branches shown for hook %s: %s" %
                    (hook_name, branches))
            else:
                all_hooks[existing_hook]["branches"] = event["args_split"][2:]
                success_message = "Updated branches for hook %s" % hook_name
        else:
            event["stderr"].write("Unknown command '%s'" %
                event["args_split"][0])

        if not success_message == None:
            if all_hooks:
                event["target"].set_setting("git-webhooks", all_hooks)
            else:
                event["target"].del_setting("git-webhooks")

            event["stdout"].write(success_message)