aboutsummaryrefslogtreecommitdiff
path: root/modules/git_webhooks/__init__.py
diff options
context:
space:
mode:
authorGravatar jesopo2019-06-24 15:51:55 +0100
committerGravatar jesopo2019-06-24 15:51:55 +0100
commit9ae97627c60d2753798c25a28be3f4b2a6a56bee (patch)
treef96c22941dbe9d5d9a8ac186209ebc0e55e9b223 /modules/git_webhooks/__init__.py
parent'github-hooks' -> 'webhooks'. this is a breaking change! (diff)
signature
'webhooks' -> 'git-webhooks'. move all 'webhooks' settings to 'git-webhooks'
Diffstat (limited to 'modules/git_webhooks/__init__.py')
-rw-r--r--modules/git_webhooks/__init__.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/modules/git_webhooks/__init__.py b/modules/git_webhooks/__init__.py
new file mode 100644
index 00000000..d861e5bb
--- /dev/null
+++ b/modules/git_webhooks/__init__.py
@@ -0,0 +1,196 @@
+import itertools, json, urllib.parse
+from src import ModuleManager, utils
+from . import colors, github
+
+FORM_ENCODED = "application/x-www-form-urlencoded"
+
+@utils.export("channelset", {"setting": "git-prevent-highlight",
+ "help": "Enable/disable preventing highlights",
+ "validate": utils.bool_or_none, "example": "on"})
+@utils.export("channelset", {"setting": "git-hide-organisation",
+ "help": "Hide/show organisation in repository names",
+ "validate": utils.bool_or_none, "example": "on"})
+@utils.export("channelset", {"setting": "git-hide-prefix",
+ "help": "Hide/show command-like prefix on git webhook outputs",
+ "validate": utils.bool_or_none, "example": "on"})
+class Module(ModuleManager.BaseModule):
+ def on_load(self):
+ self._github = github.GitHub()
+
+ @utils.hook("api.post.github")
+ def _api_github_webhook(self, event):
+ return self._webhook("github", "GitHub", self._github,
+ event["data"], event["headers"])
+
+ def _webhook(self, webhook_type, webhook_name, handler, payload_str,
+ headers):
+ payload = payload_str.decode("utf8")
+ if headers["Content-Type"] == FORM_ENCODED:
+ payload = urllib.parse.unquote(urllib.parse.parse_qs(payload)[
+ "payload"][0])
+ data = json.loads(payload)
+
+ full_name, repo_username, repo_name, organisation = handler.names(
+ data, headers)
+ branch = handler.branch(data, headers)
+ current_event, event_action = handler.event(data, headers)
+
+ hooks = self.bot.database.channel_settings.find_by_setting(
+ "git-webhooks")
+
+ targets = []
+ repo_hooked = False
+
+ for server_id, channel_name, hooked_repos in hooks:
+ found_hook = None
+ if full_name and full_name in hooked_repos:
+ found_hook = hooked_repos[full_name]
+ elif repo_username and repo_username in hooked_repos:
+ found_hook = hooked_repos[repo_username]
+ elif organisation and organisation in hooked_repos:
+ found_hook = hooked_repos[organisation]
+ else:
+ continue
+
+ repo_hooked = True
+ server = self.bot.get_server_by_id(server_id)
+ if server and channel_name in server.channels:
+ if (branch and
+ found_hook["branches"] and
+ not branch in found_hook["branches"]):
+ continue
+
+ events = []
+ for hooked_event in found_hook["events"]:
+ events.append(handler.event_categories(hooked_event))
+ events = list(itertools.chain(*events))
+
+ channel = server.channels.get(channel_name)
+ if (current_event in events or
+ (event_action and event_action in events)):
+ targets.append([server, channel])
+
+ if not targets:
+ if not repo_hooked:
+ return None
+ else:
+ return {"state": "success", "deliveries": 0}
+
+ outputs = handler.webhook(full_name, current_event, data, headers)
+
+ if outputs:
+ for server, channel in targets:
+ source = full_name or organisation
+ hide_org = channel.get_setting("git-hide-organisation", False)
+ if repo_name and hide_org:
+ source = repo_name
+
+ for output in outputs:
+ output = "(%s) %s" % (
+ utils.irc.color(source, colors.COLOR_REPO), output)
+
+ if channel.get_setting("git-prevent-highlight", False):
+ output = self._prevent_highlight(server, channel,
+ output)
+
+ hide_prefix = channel.get_setting("git-hide-prefix", False)
+ self.events.on("send.stdout").call(target=channel,
+ module_name=webhook_name, server=server, message=output,
+ hide_prefix=hide_prefix)
+
+ return {"state": "success", "deliveries": len(targets)}
+
+ def _prevent_highlight(self, server, channel, s):
+ for user in channel.users:
+ if len(user.nickname) == 1:
+ # if we don't ignore 1-letter nicknames, the below while loop
+ # will fire indefininitely.
+ continue
+
+ regex = re.compile(r"(.)\b(%s)(%s)" % (
+ re.escape(user.nickname[0]), re.escape(user.nickname[1:])),
+ re.I)
+ s = regex.sub("\\1\\2\u200c\\3", s)
+
+ return s
+
+ @utils.hook("received.command.webhook", min_args=1, channel_only=True)
+ def github_webhook(self, event):
+ """
+ :help: Add/remove/modify a git webhook
+ :require_mode: high
+ :require_access: git-webhook
+ :permission: gitoverride
+ :usage: list
+ :usage: add <hook>
+ :usage: remove <hook>
+ :usage: events <hook> [category [category ...]]
+ :usage: branches <hook> [branch [branch ...]]
+ """
+ all_hooks = event["target"].get_setting("git-webhooks", {})
+ hook_name = None
+ existing_hook = None
+ if len(event["args_split"]) > 1:
+ hook_name = event["args_split"][1]
+ for existing_hook_name in all_hooks.keys():
+ if existing_hook_name.lower() == hook_name.lower():
+ existing_hook = existing_hook_name
+ break
+
+ success_message = None
+
+ subcommand = event["args_split"][0].lower()
+ if subcommand == "list":
+ event["stdout"].write("Registered web hooks: %s" %
+ ", ".join(all_hooks.keys()))
+ elif subcommand == "add":
+ if existing_hook:
+ raise utils.EventError("There's already a hook for %s" %
+ hook_name)
+
+ all_hooks[hook_name] = {
+ "events": DEFAULT_EVENT_CATEGORIES.copy(),
+ "branches": [],
+ }
+ success_message = "Added hook for %s" % hook_name
+
+ elif subcommand == "remove":
+ if not existing_hook:
+ raise utils.EventError("No hook found for %s" % hook_name)
+
+ del all_hooks[existing_hook]
+ success_message = "Removed hook for %s" % hook_name
+
+ elif subcommand == "events":
+ if not existing_hook:
+ raise utils.EventError("No hook found for %s" % hook_name)
+
+ if len(event["args_split"]) < 3:
+ event["stdout"].write("Events for hook %s: %s" %
+ (hook_name, " ".join(all_hooks[existing_hook]["events"])))
+ else:
+ new_events = [e.lower() for e in event["args_split"][2:]]
+ all_hooks[existing_hook]["events"] = new_events
+ sucess_message = "Updated events for hook %s" % hook_name
+ elif subcommand == "branches":
+ if not existing_hook:
+ raise utils.EventError("No hook found for %s" % hook_name)
+
+ if len(event["args_split"]) < 3:
+ branches = ",".join(all_hooks[existing_hook]["branches"])
+ event["stdout"].write("Branches shown for hook %s: %s" %
+ (hook_name, branches))
+ else:
+ all_hooks[existing_hook]["branches"] = event["args_split"][2:]
+ success_message = "Updated branches for hook %s" % hook_name
+ else:
+ event["stderr"].write("Unknown command '%s'" %
+ event["args_split"][0])
+
+ if not success_message == None:
+ if all_hooks:
+ event["target"].set_setting("git-webhooks", all_hooks)
+ else:
+ event["target"].del_setting("git-webhooks")
+
+ event["stdout"].write(success_message)