aboutsummaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
authorGravatar jesopo2016-03-29 12:56:58 +0100
committerGravatar jesopo2016-03-29 12:56:58 +0100
commitf943d63098a50746f4e470e403a991a4d9713030 (patch)
treedeeb98058917d0155227211d72576f0cbab28d3f /modules
parentInitial commit (diff)
first commit.
Diffstat (limited to 'modules')
-rw-r--r--modules/accept_invite.py9
-rw-r--r--modules/channel_save.py18
-rw-r--r--modules/commands.py142
-rw-r--r--modules/ctcp.py26
-rw-r--r--modules/define.py27
-rw-r--r--modules/dns.py21
-rw-r--r--modules/geoip.py30
-rw-r--r--modules/google.py28
-rw-r--r--modules/imdb.py26
-rw-r--r--modules/in.py79
-rw-r--r--modules/isbn.py35
-rw-r--r--modules/join.py8
-rw-r--r--modules/karma.py41
-rw-r--r--modules/lastfm.py56
-rw-r--r--modules/nickserv.py13
-rw-r--r--modules/perform.py16
-rw-r--r--modules/pong.py9
-rw-r--r--modules/sed.py64
-rw-r--r--modules/seen.py25
-rw-r--r--modules/set.py54
-rw-r--r--modules/signals.py48
-rw-r--r--modules/thesaurus.py43
-rw-r--r--modules/title.py33
-rw-r--r--modules/to.py26
-rw-r--r--modules/trakt.py58
-rw-r--r--modules/translate.py51
-rw-r--r--modules/twitter.py69
-rw-r--r--modules/urbandictionary.py34
-rw-r--r--modules/weather.py35
-rw-r--r--modules/wolframalpha.py42
-rw-r--r--modules/youtube.py105
31 files changed, 1271 insertions, 0 deletions
diff --git a/modules/accept_invite.py b/modules/accept_invite.py
new file mode 100644
index 00000000..8c0368b2
--- /dev/null
+++ b/modules/accept_invite.py
@@ -0,0 +1,9 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("invite").hook(
+ self.on_invite)
+
+ def on_invite(self, event):
+ event["server"].send_join(event["target_channel"])
diff --git a/modules/channel_save.py b/modules/channel_save.py
new file mode 100644
index 00000000..cb950ed2
--- /dev/null
+++ b/modules/channel_save.py
@@ -0,0 +1,18 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("self").on("join").hook(self.on_join)
+ bot.events.on("received").on("numeric").on("366").hook(
+ self.on_connect)
+
+ def on_join(self, event):
+ channels = set(event["server"].get_setting("autojoin", []))
+ channels.add(event["channel"].name)
+ event["server"].set_setting("autojoin", list(channels))
+
+ def on_connect(self, event):
+ if event["line_split"][3].lower() == "#bitbot":
+ channels = event["server"].get_setting("autojoin", [])
+ for channel in channels:
+ event["server"].send_join(channel)
diff --git a/modules/commands.py b/modules/commands.py
new file mode 100644
index 00000000..d31c1e3d
--- /dev/null
+++ b/modules/commands.py
@@ -0,0 +1,142 @@
+
+STR_MORE = " (more...)"
+STR_CONTINUED = "(...continued) "
+
+OUT_CUTOFF = 400
+
+class ChannelOut(object):
+ def __init__(self, module_name, channel):
+ self.module_name = module_name
+ self.channel = channel
+ self._text = ""
+ self.written = False
+ def write(self, text):
+ self._text += text
+ self.written = True
+ return self
+ def send(self):
+ if self.has_text():
+ text = "[%s] %s" % (self.prefix(), self._text)
+ text_encoded = text.encode("utf8")
+ if len(text_encoded) > OUT_CUTOFF:
+ text = "%s%s" % (text_encoded[:OUT_CUTOFF].decode("utf8"
+ ).rstrip(), STR_MORE)
+ self._text = "%s%s" % (STR_CONTINUED, text_encoded[OUT_CUTOFF:
+ ].decode("utf8").lstrip())
+ self.channel.send_message(text)
+ def set_prefix(self, prefix):
+ self.module_name = prefix
+ def has_text(self):
+ return bool(self._text)
+
+class ChannelStdOut(ChannelOut):
+ def prefix(self):
+ return self.module_name
+class ChannelStdErr(ChannelOut):
+ def prefix(self):
+ return "!%s" % self.module_name
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("message").on("channel").hook(
+ self.channel_message)
+ bot.events.on("received").on("message").on("private").hook(
+ self.private_message)
+ bot.events.on("received").on("command").on("help").hook(self.help)
+ bot.events.on("new").on("user", "channel").hook(self.new)
+ bot.events.on("received").on("command").on("more").hook(self.more)
+ bot.events.on("send").on("stdout").hook(self.send_stdout)
+ bot.events.on("send").on("stderr").hook(self.send_stderr)
+
+ def new(self, event):
+ if "user" in event:
+ target = event["user"]
+ else:
+ target = event["channel"]
+ target.last_stdout = None
+ target.last_stderr = None
+
+ def has_command(self, command):
+ return command.lower() in self.bot.events.on("received").on(
+ "command").get_children()
+ def get_hook(self, command):
+ return self.bot.events.on("received").on("command").on(command
+ ).get_hooks()[0]
+
+ def channel_message(self, event):
+ command_prefix = event["channel"].get_setting("command_prefix",
+ event["server"].get_setting("command_prefix", "!"))
+ if event["message_split"][0].startswith(command_prefix):
+ command = event["message_split"][0].replace(
+ command_prefix, "", 1).lower()
+ if self.has_command(command):
+ hook = self.get_hook(command)
+ module_name = hook.function.__self__._name
+ stdout = ChannelStdOut(module_name, event["channel"])
+ stderr = ChannelStdErr(module_name, event["channel"])
+ returns = self.bot.events.on("preprocess").on("command"
+ ).call(hook=hook, user=event["user"], server=event[
+ "server"])
+ for returned in returns:
+ if returned:
+ event["stderr"].write(returned).send()
+ return
+ min_args = hook.kwargs.get("min_args")
+ args_split = event["message_split"][1:]
+ if min_args and len(args_split) < min_args:
+ ChannelStdErr("Error", event["channel"]
+ ).write("Not enough arguments ("
+ "minimum: %d)" % min_args).send()
+ else:
+ args = " ".join(args_split)
+ self.bot.events.on("received").on("command").on(
+ command).call(1, user=event["user"],
+ channel=event["channel"], args=args,
+ args_split=args_split, server=event["server"
+ ], stdout=stdout, stderr=stderr, command=command,
+ log=event["channel"].log, target=event["channel"])
+ stdout.send()
+ stderr.send()
+ if stdout.has_text():
+ event["channel"].last_stdout = stdout
+ if stderr.has_text():
+ event["channel"].last_stderr = stderr
+ event["channel"].log.skip_next()
+ def private_message(self, event):
+ pass
+
+ def help(self, event):
+ if event["args"]:
+ command = event["args_split"][0].lower()
+ if command in self.bot.events.on("received").on(
+ "command").get_children():
+ hooks = self.bot.events.on("received").on("command").on(command).get_hooks()
+ if hooks and "help" in hooks[0].kwargs:
+ event["stdout"].write("%s: %s" % (command, hooks[0].kwargs["help"]))
+ else:
+ help_available = []
+ for child in self.bot.events.on("received").on("command").get_children():
+ hooks = self.bot.events.on("received").on("command").on(child).get_hooks()
+ if hooks and "help" in hooks[0].kwargs:
+ help_available.append(child)
+ help_available = sorted(help_available)
+ event["stdout"].write("Commands: %s" % ", ".join(help_available))
+
+ def more(self, event):
+ if event["target"].last_stdout:
+ event["target"].last_stdout.send()
+
+ def send_stdout(self, event):
+ if event["target"].name[0] in event["server"].channel_types:
+ stdout = ChannelStdOut(event["module_name"], event["target"])
+ stdout.write(event["message"]).send()
+ if stdout.has_text():
+ event["target"].last_stdout = stdout
+ def send_stderr(self, event):
+ if event["target"].name[0] in event["server"].channel_types:
+ stderr = ChannelStdErr(event["module_name"], event["target"])
+ stderr.write(event["message"]).send()
+ if stderr.has_text():
+ event["target"].last_stderr = stderr
+
diff --git a/modules/ctcp.py b/modules/ctcp.py
new file mode 100644
index 00000000..f646cedf
--- /dev/null
+++ b/modules/ctcp.py
@@ -0,0 +1,26 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("message").on("private").hook(
+ self.private_message)
+ self.bot = bot
+
+ def private_message(self, event):
+ if event["message"][0] == "\x01" and event["message"][-1] == "\x01":
+ ctcp_command = event["message_split"][0][1:].upper()
+ if ctcp_command.endswith("\x01"):
+ ctcp_command = ctcp_command[:-1]
+ ctcp_args = " ".join(event["message_split"][1:])[:-1]
+ ctcp_args_split = ctcp_args.split(" ")
+
+ ctcp_response = None
+ if ctcp_command == "VERSION":
+ ctcp_response = self.bot.config.get("ctcp-version",
+ "BitBot")
+ elif ctcp_command == "PING":
+ ctcp_response = " ".join(ctcp_args_split)
+
+ if ctcp_response:
+ event["user"].send_ctcp_response(ctcp_command,
+ ctcp_response)
diff --git a/modules/define.py b/modules/define.py
new file mode 100644
index 00000000..f6b36e3e
--- /dev/null
+++ b/modules/define.py
@@ -0,0 +1,27 @@
+import Utils
+
+URL_WORDNIK = "http://api.wordnik.com:80/v4/word.json/%s/definitions"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("define").hook(
+ self.define, help="Define a provided term")
+
+ def define(self, event):
+ if event["args"]:
+ word = event["args"]
+ else:
+ word = event["log"].get(from_self=False)
+ page = Utils.get_url(URL_WORDNIK % event["args"], get_params={
+ "useCanonical": "true", "limit": 1,
+ "sourceDictionaries": "wiktionary", "api_key": self.bot.config[
+ "wordnik-api-key"]}, json=True)
+ if page:
+ if len(page):
+ event["stdout"].write("%s: %s" % (page[0]["word"],
+ page[0]["text"]))
+ else:
+ event["stderr"].write("No definitions found")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/dns.py b/modules/dns.py
new file mode 100644
index 00000000..95c1b80c
--- /dev/null
+++ b/modules/dns.py
@@ -0,0 +1,21 @@
+import socket
+
+class Module(object):
+ _name = "DNS"
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("dns").hook(
+ self.dns, min_args=1,
+ help="Get all addresses for a given hostname (IPv4/IPv6)")
+
+ def dns(self, event):
+ hostname = event["args_split"][0]
+ try:
+ address_info = socket.getaddrinfo(hostname, 1, 0,
+ socket.SOCK_DGRAM)
+ except socket.gaierror:
+ event["stderr"].write("Failed to find hostname")
+ return
+ ips = []
+ for _, _, _, _, address in address_info:
+ ips.append(address[0])
+ event["stdout"].write("%s: %s" % (hostname, ", ".join(ips)))
diff --git a/modules/geoip.py b/modules/geoip.py
new file mode 100644
index 00000000..a3c5db2d
--- /dev/null
+++ b/modules/geoip.py
@@ -0,0 +1,30 @@
+import Utils
+
+URL_GEOIP = "http://ip-api.com/json/%s"
+
+class Module(object):
+ _name = "GeoIP"
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("geoip").hook(
+ self.geoip, min_args=1,
+ help="Get geoip data on a given IPv4/IPv6 address")
+
+ def geoip(self, event):
+ page = Utils.get_url(URL_GEOIP % event["args_split"][0],
+ json=True)
+ if page:
+ if page["status"] == "success":
+ data = page["query"]
+ data += " | Organisation: %s" % page["org"]
+ data += " | City: %s" % page["city"]
+ data += " | Region: %s (%s)" % (page["regionName"],
+ page["countryCode"])
+ data += " | ISP: %s" % page["isp"]
+ data += " | Lon/Lat: %s/%s" % (page["lon"],
+ page["lat"])
+ data += " | Timezone: %s" % page["timezone"]
+ event["stdout"].write(data)
+ else:
+ event["stderr"].write("No geoip data found")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/google.py b/modules/google.py
new file mode 100644
index 00000000..8d00945f
--- /dev/null
+++ b/modules/google.py
@@ -0,0 +1,28 @@
+import Utils
+
+URL_GOOGLESEARCH = "https://www.googleapis.com/customsearch/v1"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("google",
+ "g").hook(self.google, help="Google feeling lucky")
+
+ def google(self, event):
+ phrase = event["args"] or event["log"].get()
+ if phrase:
+ page = Utils.get_url(URL_GOOGLESEARCH, get_params={
+ "q": phrase, "key": self.bot.config[
+ "google-api-key"], "cx": self.bot.config[
+ "google-search-id"], "prettyPrint": "true",
+ "num": 1, "gl": "us"}, json=True)
+ if page:
+ if "items" in page and len(page["items"]):
+ event["stdout"].write(page["items"][0][
+ "link"])
+ else:
+ event["stderr"].write("No results found")
+ else:
+ event["stderr"].write("Failed to load results")
+ else:
+ event["stderr"].write("No phrase provided")
diff --git a/modules/imdb.py b/modules/imdb.py
new file mode 100644
index 00000000..62492bc7
--- /dev/null
+++ b/modules/imdb.py
@@ -0,0 +1,26 @@
+import json
+import Utils
+
+URL_OMDB = "http://www.omdbapi.com/"
+URL_IMDBTITLE = "http://imdb.com/title/%s"
+
+class Module(object):
+ _name = "IMDb"
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("imdb").hook(
+ self.imdb, min_args=1,
+ help="Search for a given title on IMDb")
+
+ def imdb(self, event):
+ page = Utils.get_url(URL_OMDB, get_params={"t": event["args"]},
+ json=True)
+ if page:
+ if "Title" in page:
+ event["stdout"].write("%s, %s (%s) %s (%s/10.0) %s" % (
+ page["Title"], page["Year"], page["Runtime"],
+ page["Plot"], page["imdbRating"],
+ URL_IMDBTITLE % page["imdbID"]))
+ else:
+ event["stderr"].write("Title not found")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/in.py b/modules/in.py
new file mode 100644
index 00000000..7c543124
--- /dev/null
+++ b/modules/in.py
@@ -0,0 +1,79 @@
+import time
+import Utils
+
+SECONDS_MAX = Utils.SECONDS_WEEKS*8
+SECONDS_MAX_DESCRIPTION = "8 weeks"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("in").hook(
+ self.in_command, min_args=2)
+ bot.events.on("received").on("numeric").on("001").hook(
+ self.on_connect)
+
+ def on_connect(self, event):
+ self.load_reminders(event["server"])
+
+ def remove_timer(self, target, due_time, server_id, nickname, message):
+ setting = "in-%s" % nickname
+ reminders = self.bot.database.get_server_setting(server_id, setting, [])
+ try:
+ reminders.remove([target, due_time, server_id, nickname, message])
+ except:
+ print("failed to remove a reminder. huh.")
+ if reminders:
+ self.bot.database.set_server_setting(server_id,
+ setting, reminders)
+ else:
+ self.bot.database.del_server_setting(server_id,
+ setting)
+
+ def load_reminders(self, server):
+ reminders = server.find_settings("in-%")
+ for user_reminders in reminders:
+ for target, due_time, server_id, nickname, message in user_reminders[1]:
+ time_left = due_time-time.time()
+ if time_left > 0:
+ self.bot.add_timer(self.timer_due, time_left, target=target,
+ due_time=due_time, server_id=server_id, nickname=nickname,
+ message=message)
+ else:
+ self.remove_timer(target, due_time, server_id, nickname,
+ message)
+
+ def in_command(self, event):
+ seconds = Utils.from_pretty_time(event["args_split"][0])
+ message = " ".join(event["args_split"][1:])
+ if seconds:
+ if seconds <= SECONDS_MAX:
+ due_time = int(time.time())+seconds
+
+ setting = "in-%s" % event["user"].nickname
+ reminders = event["server"].get_setting(setting, [])
+ reminders.append([event["target"].name, due_time,
+ event["server"].id, event["user"].nickname, message])
+ event["server"].set_setting(setting, reminders)
+
+ self.bot.add_timer(self.timer_due, seconds,
+ target=event["target"].name, due_time=due_time,
+ server_id=event["server"].id, nickname=event["user"].nickname,
+ message=message)
+ else:
+ event["stderr"].write(
+ "The given time is above the max (%s)" % (
+ SECONDS_MAX_DESCRIPTION))
+ else:
+ event["stderr"].write(
+ "Please provided a valid time above 0 seconds")
+
+ def timer_due(self, timer, **kwargs):
+ for server in self.bot.servers.values():
+ if kwargs["server_id"] == server.id:
+ server.send_message(kwargs["target"],
+ "%s, this is your reminder: %s" % (
+ kwargs["nickname"], kwargs["message"]))
+ break
+ setting = "in-%s" % kwargs["nickname"]
+ self.remove_timer(kwargs["target"], kwargs["due_time"],
+ kwargs["server_id"], kwargs["nickname"], kwargs["message"])
diff --git a/modules/isbn.py b/modules/isbn.py
new file mode 100644
index 00000000..67870520
--- /dev/null
+++ b/modules/isbn.py
@@ -0,0 +1,35 @@
+import json
+import Utils
+
+URL_GOOGLEBOOKS = "https://www.googleapis.com/books/v1/volumes"
+
+class Module(object):
+ _name = "ISBN"
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("isbn").hook(
+ self.isbn, help="Get book information from a provided ISBN",
+ min_args=1)
+
+ def isbn(self, event):
+ isbn = event["args_split"][0]
+ if len(isbn) == 10:
+ isbn = "978%s" % isbn
+ isbn = isbn.replace("-", "")
+ page = Utils.get_url(URL_GOOGLEBOOKS, get_params={
+ "q": "isbn:%s" % isbn, "country": "us"}, json=True)
+ if page:
+ if page["totalItems"] > 0:
+ book = page["items"][0]["volumeInfo"]
+ title = book["title"]
+ sub_title = book["subtitle"]
+ authors = ", ".join(book["authors"])
+ date = book["publishedDate"]
+ rating = book["averageRating"]
+ #language = book["language"]
+ event["stdout"].write("%s - %s (%s), %s (%s/5.0)" % (
+ title, authors, date, sub_title, rating))
+ else:
+ event["stderr"].write("Unable to find book")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/join.py b/modules/join.py
new file mode 100644
index 00000000..3a95d1c7
--- /dev/null
+++ b/modules/join.py
@@ -0,0 +1,8 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("numeric").on("001").hook(self.do_join)
+
+ def do_join(self, event):
+ event["server"].send_join("#bitbot")
diff --git a/modules/karma.py b/modules/karma.py
new file mode 100644
index 00000000..99533e37
--- /dev/null
+++ b/modules/karma.py
@@ -0,0 +1,41 @@
+import re, time
+
+REGEX_KARMA = re.compile("(.*)(\+{2,}|\-{2,})")
+KARMA_DELAY_SECONDS = 3
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("new").on("user").hook(self.new_user)
+ bot.events.on("received").on("message").on("channel").hook(
+ self.channel_message)
+ bot.events.on("received").on("command").on("karma").hook(
+ self.karma)
+
+ def new_user(self, event):
+ event["user"].last_karma = None
+
+ def channel_message(self, event):
+ match = re.match(REGEX_KARMA, event["message"])
+ if match:
+ if not event["user"].last_karma or (time.time()-event["user"
+ ].last_karma) >= KARMA_DELAY_SECONDS:
+ positive = match.group(2)[0] == "+"
+ setting = "karma-%s" % match.group(1).strip()
+ karma = event["server"].get_setting(setting, 0)
+ if positive:
+ karma += 1
+ else:
+ karma -= 1
+ if karma:
+ event["server"].set_setting(setting, karma)
+ else:
+ event["server"].del_setting(setting)
+ event["user"].last_karma = time.time()
+
+ def karma(self, event):
+ if event["args"]:
+ target = event["args"]
+ else:
+ target = event["user"].nickname
+ karma = event["server"].get_setting("karma-%s" % target, 0)
+ event["stdout"].write("%s has %s karma" % (target, karma))
diff --git a/modules/lastfm.py b/modules/lastfm.py
new file mode 100644
index 00000000..cb985f11
--- /dev/null
+++ b/modules/lastfm.py
@@ -0,0 +1,56 @@
+import Utils
+
+URL_SCROBBLER = "http://ws.audioscrobbler.com/2.0/"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("boot").on("done").hook(self.boot_done)
+ bot.events.on("received").on("command").on("np",
+ "listening", "nowplaying").hook(self.np,
+ help="Get the last listen to track from a user")
+
+ def boot_done(self, event):
+ self.bot.events.on("postboot").on("configure").on(
+ "set").call(setting="lastfm",
+ help="Set username on last.fm")
+
+ def np(self, event):
+ if event["args_split"]:
+ username = event["args_split"][0]
+ else:
+ username = event["user"].get_setting("lastfm",
+ event["user"].nickname)
+ page = Utils.get_url(URL_SCROBBLER, get_params={
+ "method": "user.getrecenttracks", "user": username,
+ "api_key": self.bot.config["lastfm-api-key"],
+ "format": "json", "limit": "1"}, json=True)
+ if page:
+ if "recenttracks" in page and len(page["recenttracks"
+ ]["track"]):
+ now_playing = page["recenttracks"]["track"][0]
+ track_name = now_playing["name"]
+ artist = now_playing["artist"]["#text"]
+
+ tags_page = Utils.get_url(URL_SCROBBLER, get_params={
+ "method": "track.getTags", "artist": artist,
+ "track": track_name, "autocorrect": "1",
+ "api_key": self.bot.config["lastfm-api-key"],
+ "user": username, "format": "json"}, json=True)
+ tags = []
+ if tags_page.get("tags", {}).get("tag"):
+ for tag in tags_page["tags"]["tag"]:
+ tags.append(tag["name"])
+ if tags:
+ tags = " (%s)" % ", ".join(tags)
+ else:
+ tags = ""
+
+ event["stdout"].write("%s is now playing: %s - %s%s" % (
+ username, artist, track_name, tags))
+ else:
+ event["stderr"].write(
+ "The user '%s' has never scrobbled before" % (
+ username))
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/nickserv.py b/modules/nickserv.py
new file mode 100644
index 00000000..bd7bccdc
--- /dev/null
+++ b/modules/nickserv.py
@@ -0,0 +1,13 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("numeric").on("001"
+ ).hook(self.on_connect)
+
+ def on_connect(self, event):
+ nickserv_password = event["server"].get_setting(
+ "nickserv_password")
+ if nickserv_password:
+ event["server"].send_message("nickserv",
+ "identify %s" % nickserv_password)
diff --git a/modules/perform.py b/modules/perform.py
new file mode 100644
index 00000000..c22b8b52
--- /dev/null
+++ b/modules/perform.py
@@ -0,0 +1,16 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("numeric").on("001").hook(
+ self.on_connect)
+
+ def on_connect(self, event):
+ commands = event["server"].get_setting("perform", [])
+ for i, command in enumerate(commands):
+ command = command.split("%%")
+ for j, part in enumerate(command[:]):
+ command[j] = part.replace("%nick%", event["server"
+ ].nickname)
+ command = "%".join(command)
+ event["server"].send(command)
diff --git a/modules/pong.py b/modules/pong.py
new file mode 100644
index 00000000..10a3b782
--- /dev/null
+++ b/modules/pong.py
@@ -0,0 +1,9 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("ping"
+ ).hook(self.pong, help="Ping pong!")
+
+ def pong(self, event):
+ event["stdout"].write("Pong!")
diff --git a/modules/sed.py b/modules/sed.py
new file mode 100644
index 00000000..6bdd1cb0
--- /dev/null
+++ b/modules/sed.py
@@ -0,0 +1,64 @@
+import re, traceback
+
+REGEX_SPLIT = re.compile("(?<!\\\\)/")
+REGEX_SED = re.compile("^s/")
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("boot").on("done").hook(self.boot_done)
+ bot.events.on("received").on("message").on("channel").hook(
+ self.channel_message)
+
+ def validate_setchannel(self, s):
+ return s.lower() == "true"
+ def boot_done(self, event):
+ self.bot.events.on("postboot").on("configure").on(
+ "channelset").call(setting="sed",
+ help="Disable/Enable sed in a channel",
+ validate=self.validate_setchannel)
+
+ def channel_message(self, event):
+ if event["action"] or not event["channel"].get_setting("sed", True):
+ return
+ sed_split = re.split(REGEX_SPLIT, event["message"], 3)
+ if event["message"].startswith("s/") and len(sed_split) > 2:
+ regex_flags = 0
+ flags = (sed_split[3:] or [""])[0]
+ count = None
+
+ last_flag = ""
+ for flag in flags:
+ if flag.isdigit():
+ if last_flag.isdigit():
+ count = int(str(count) + flag)
+ elif not count:
+ count = int(flag)
+ elif flag == "i":
+ regex_flags |= re.I
+ elif flag == "g":
+ count = 0
+ last_flag = flag
+ if count == None:
+ count = 1
+
+ try:
+ pattern = re.compile(sed_split[1], regex_flags)
+ except:
+ traceback.print_exc()
+ self.bot.events.on("send").on("stderr").call(target=event[
+ "channel"], module_name="Sed", server=event["server"],
+ message="Invalid regex in pattern")
+ return
+ replace = sed_split[2].replace("\\/", "/")
+
+ line = event["channel"].log.find(pattern, from_self=False, not_pattern=REGEX_SED)
+ if line:
+ new_message = re.sub(pattern, replace, line.message, count)
+ if line.action:
+ prefix = "* %s" % line.sender
+ else:
+ prefix = "<%s>" % line.sender
+ self.bot.events.on("send").on("stdout").call(target=event[
+ "channel"], module_name="Sed", server=event["server"],
+ message="%s %s" % (prefix, new_message))
diff --git a/modules/seen.py b/modules/seen.py
new file mode 100644
index 00000000..dc2d7ae1
--- /dev/null
+++ b/modules/seen.py
@@ -0,0 +1,25 @@
+import time
+import Utils
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("message").on("channel"
+ ).hook(self.channel_message)
+ bot.events.on("received").on("command").on("seen").hook(
+ self.seen, min_args=1,
+ help="Find out when a user was last seen")
+
+ def channel_message(self, event):
+ seen_seconds = time.time()
+ event["user"].set_setting("seen", seen_seconds)
+
+ def seen(self, event):
+ seen_seconds = event["server"].get_user(event["args_split"][0]
+ ).get_setting("seen")
+ if seen_seconds:
+ since, unit = Utils.time_unit(time.time()-seen_seconds)
+ event["stdout"].write("%s was last seen %s %s ago" % (
+ event["args_split"][0], since, unit))
+ else:
+ event["stderr"].write("I have never seen %s before." % (
+ event["args_split"][0]))
diff --git a/modules/set.py b/modules/set.py
new file mode 100644
index 00000000..36592f7a
--- /dev/null
+++ b/modules/set.py
@@ -0,0 +1,54 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ self.settings = {}
+ self.channel_settings = {}
+ bot.events.on("postboot").on("configure").on("set").hook(
+ self.postboot_set)
+ bot.events.on("postboot").on("configure").on("channelset"
+ ).hook(self.postboot_channelset)
+ bot.events.on("received").on("command").on("set").hook(
+ self.set, help="Set a specified user setting")
+ bot.events.on("received").on("command").on("channelset"
+ ).hook(self.channel_set, channel_only=True)
+
+ def _postboot_set(self, settings, event):
+ settings[event["setting"]] = {}
+ settings[event["setting"]]["validate"] = event.get(
+ "validate", lambda s: s)
+ settings[event["setting"]]["help"] = event.get("help",
+ "")
+ def postboot_set(self, event):
+ self._postboot_set(self.settings, event)
+ def postboot_channelset(self, event):
+ self._postboot_set(self.channel_settings, event)
+
+ def _set(self, settings, target, event):
+ if len(event["args_split"]) > 1:
+ setting = event["args_split"][0].lower()
+ if setting in settings:
+ value = " ".join(event["args_split"][1:])
+ value = settings[setting]["validate"](value)
+ if not value == None:
+ target.set_setting(setting, value)
+ event["stdout"].write("Saved setting")
+ else:
+ event["stderr"].write("Invalid value")
+ else:
+ event["stderr"].write("Unknown setting")
+ elif len(event["args_split"]) == 1:
+ event["stderr"].write("Please provide a value")
+ else:
+ event["stdout"].write("Available settings: %s" % (
+ ", ".join(settings.keys())))
+ def set(self, event):
+ self._set(self.settings, event["user"], event)
+
+ def channel_set(self, event):
+ if event["channel"].mode_or_above(event["user"].nickname,
+ "o"):
+ self._set(self.channel_settings, event["channel"], event)
+ else:
+ event["stderr"].write("You do not have the modes required")
diff --git a/modules/signals.py b/modules/signals.py
new file mode 100644
index 00000000..482f11c0
--- /dev/null
+++ b/modules/signals.py
@@ -0,0 +1,48 @@
+import signal
+from random import randint
+
+QUOTES = {
+ "You can build a throne with bayonets, but it's difficult to sit on it." : "Boris Yeltsin",
+ "We don't appreciate what we have until it's gone. Freedom is like that. It's like air. When you have it, you don't notice it." : "Boris Yeltsin",
+ "Storm clouds of terror and dictatorship are gathering over the whole country... They must not be allowed to bring eternal night." : "Boris Yeltsin",
+ "They accused us of suppressing freedom of expression. This was a lie and we could not let them publish it." : "Nelba Blandon, as director of censorship, Nicaragua",
+ "Death solves all problems - no man, no problem." : "Joseph Stalin",
+ "Make the lie big, make it simple, keep saying it, and eventually they will believe it." : "Adolf Hitler",
+ "If we don't end war, war will end us." : "H.G. Wells",
+ "All that is necessary for evil to succeed is for good men to do nothing." : "Edmund Burke",
+ "Live well. It is the greatest revenge." : "The Talmud",
+ "To improve is to change, so to be perfect is to have changed often." : "Winston Churchill",
+ "I believe it is peace for our time." : "Neville Chamberlain",
+ "Orbiting Earth in the spaceship, I saw how beautiful our planet is. People, let us preserve and increase this beauty, not destroy it!" : "Yuri Gagarin",
+ "Science is not everything, but science is very beautiful." : "Robert Oppenheimer",
+ "We choose to go to the Moon! We choose to go to the Moon in this decade and do the other things, not because they are easy, but because they are hard." : "",
+ "You teach a child to read, and he or her will be able to pass a literacy test." : "George W Bush",
+ "I know the human being and fish can coexist peacefully." : "George W Bush",
+ "They misunderestimated me." : "George W Bush",
+ "I'm an Internet expert too." : "Kim Jong Il",
+ "So long, and thanks for all the fish" : "",
+ "It is a lie that I made the people starve." : "Nicolae Ceaușescu",
+ "As far as I know - effective immediately, without delay." : "Günter Schabowski"
+}
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ signal.signal(signal.SIGINT, self.SIGINT)
+ signal.signal(signal.SIGUSR1, self.SIGUSR1)
+
+ def SIGINT(self, signum, frame):
+ print()
+ for server in self.bot.servers.values():
+ server.send_quit(self.random_quote())
+ self.bot.register_write(server)
+ self.bot.running = False
+
+ def SIGUSR1(self, signum, frame):
+ print("Reloading config file")
+ self.bot.config_object.load_config()
+
+ def random_quote(self):
+ quote = list(QUOTES)[randint(0, len(QUOTES)-1)]
+ return (" - " if QUOTES[quote] else "").join([quote,
+ QUOTES[quote]])
diff --git a/modules/thesaurus.py b/modules/thesaurus.py
new file mode 100644
index 00000000..923b1901
--- /dev/null
+++ b/modules/thesaurus.py
@@ -0,0 +1,43 @@
+import Utils
+
+URL_THESAURUS = "http://words.bighugelabs.com/api/2/%s/%s/json"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("synonym",
+ "antonym").hook(self.thesaurus, min_args=1,
+ help="Get synonyms/antonyms for a provided phrase")
+
+ def thesaurus(self, event):
+ phrase = event["args_split"][0]
+ page = Utils.get_url(URL_THESAURUS % (self.bot.config[
+ "bighugethesaurus-api-key"], phrase), json=True)
+ syn_ant = event["command"][:3]
+ if page:
+ if not len(event["args_split"]) > 1:
+ word_types = []
+ for word_type in page.keys():
+ if syn_ant in page[word_type]:
+ word_types.append(word_type)
+ if word_types:
+ word_types = sorted(word_types)
+ event["stdout"].write(
+ "Available categories for %s: %s" % (
+ phrase, ", ".join(word_types)))
+ else:
+ event["stderr"].write("No categories available")
+ else:
+ category = event["args_split"][1].lower()
+ if category in page:
+ if syn_ant in page[category]:
+ event["stdout"].write("%ss for %s: %s" % (
+ event["command"].title(), phrase, ", ".join(
+ page[category][syn_ant])))
+ else:
+ event["stderr"].write("No %ss for %s" % (
+ event["command"], phrase))
+ else:
+ event["stderr"].write("Category not found")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/title.py b/modules/title.py
new file mode 100644
index 00000000..22795091
--- /dev/null
+++ b/modules/title.py
@@ -0,0 +1,33 @@
+import re
+import Utils
+
+REGEX_URL = re.compile("https?://\S+", re.I)
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("title", "t").hook(
+ self.title, help="Get the title of the provided or most "
+ "recent URL.")
+
+ def title(self, event):
+ url = None
+ if len(event["args"]) > 0:
+ url = event["args_split"][0]
+ else:
+ url = event["channel"].log.find(REGEX_URL)
+ if url:
+ url = re.search(REGEX_URL, url.message).group(0)
+ if not url:
+ event["stderr"].write("No URL provided/found.")
+ return
+ soup = Utils.get_url(url, soup=True)
+ if not soup:
+ event["stderr"].write("Failed to get URL.")
+ return
+ title = soup.title
+ if title:
+ title = title.text.replace("\n", " ").replace("\r", ""
+ ).replace(" ", " ").strip()
+ event["stdout"].write(title)
+ else:
+ event["stderr"].write("No title found.")
diff --git a/modules/to.py b/modules/to.py
new file mode 100644
index 00000000..033cb0e4
--- /dev/null
+++ b/modules/to.py
@@ -0,0 +1,26 @@
+
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("message").on("channel"
+ ).hook(self.channel_message)
+ bot.events.on("received").on("command").on("to").hook(
+ self.to, min_args=2, help=("Relay a message to a "
+ "user the next time they talk in a channel"),
+ channel_only=True)
+
+ def channel_message(self, event):
+ setting = "to-%s" % event["user"].nickname
+ messages = event["channel"].get_setting(setting, [])
+ for nickname, message in messages:
+ event["channel"].send_message("%s: <%s> %s" % (
+ event["user"].nickname, nickname, message))
+ event["channel"].del_setting(setting)
+
+ def to(self, event):
+ setting = "to-%s" % event["args_split"][0]
+ messages = event["channel"].get_setting(setting, [])
+ messages.append([event["user"].nickname,
+ " ".join(event["args_split"][1:])])
+ event["channel"].set_setting(setting, messages)
+ event["stdout"].write("Message saved")
diff --git a/modules/trakt.py b/modules/trakt.py
new file mode 100644
index 00000000..8e23cfca
--- /dev/null
+++ b/modules/trakt.py
@@ -0,0 +1,58 @@
+import Utils
+
+URL_TRAKT = "https://api-v2launch.trakt.tv/users/%s/watching"
+URL_TRAKTSLUG = "https://trakt.tv/%s/%s"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("boot").on("done").hook(self.boot_done)
+ bot.events.on("received").on("command").on("nowwatching",
+ "nw").hook(self.now_watching)
+
+ def boot_done(self, event):
+ self.bot.events.on("postboot").on("configure").on("set"
+ ).call(setting="trakt", help="Set username on trakt.tv")
+
+ def now_watching(self, event):
+ if event["args"]:
+ username = event["args_split"][0]
+ else:
+ username = event["user"].get_setting("trakt",
+ event["user"].nickname)
+ page = Utils.get_url(URL_TRAKT % username, headers={
+ "Content-Type": "application/json",
+ "trakt-api-version": "2", "trakt-api-key":
+ self.bot.config["trakt-api-key"]}, json=True,
+ code=True)
+ if page:
+ code, page = page
+ if code == 204:
+ event["stderr"].write(
+ "%s is not watching anything" % username)
+ else:
+ type = page["type"]
+ if type == "movie":
+ title = page["movie"]["title"]
+ year = page["movie"]["year"]
+ slug = page["movie"]["ids"]["slug"]
+ event["stdout"].write(
+ "%s is now watching %s (%s) %s" % (
+ username, title, year,
+ URL_TRAKTSLUG % ("movie", slug)))
+ elif type == "episode":
+ season = page["episode"]["season"]
+ episode_number = page["episode"]["number"]
+ episode_title = page["episode"]["title"]
+ show_title = page["show"]["title"]
+ show_year = page["show"]["year"]
+ slug = page["show"]["ids"]["slug"]
+ event["stdout"].write(
+ "%s is now watching %s s%se%s - %s %s" % (
+ username, show_title, str(season).zfill(2),
+ str(episode_number).zfill(2), episode_title,
+ URL_TRAKTSLUG % ("shows", slug)))
+ else:
+ print("ack! unknown trakt media type!")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/translate.py b/modules/translate.py
new file mode 100644
index 00000000..5e0cc15a
--- /dev/null
+++ b/modules/translate.py
@@ -0,0 +1,51 @@
+import re
+import Utils
+
+URL_TRANSLATE = "https://translate.google.com/"
+REGEX_LANGUAGES = re.compile("(\w{2})?:(\w{2})? ")
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("translate", "tr").hook(
+ self.translate, help="Translate the provided phrase or the "
+ "last line seen.")
+
+ def translate(self, event):
+ phrase = event["args"]
+ if not phrase:
+ phrase = event["channel"].log.get()
+ if phrase:
+ phrase = phrase.message
+ if not phrase:
+ event["stderr"].write("No phrase provided.")
+ return
+ source_language = "auto"
+ target_language = "en"
+
+ language_match = re.match(REGEX_LANGUAGES, phrase)
+ if language_match:
+ if language_match.group(1):
+ source_language = language_match.group(1)
+ if language_match.group(2):
+ target_language = language_match.group(2)
+ phrase = phrase.split(" ", 1)[1]
+
+ soup = Utils.get_url(URL_TRANSLATE, post_params={
+ "sl": source_language, "tl": target_language, "js": "n",
+ "prev": "_t", "hl": "en", "ie": "UTF-8", "text": phrase,
+ "file": "", "edit-text": ""}, method="POST", soup=True)
+ if soup:
+ languages_element = soup.find(id="gt-otf-switch")
+ translated_element = soup.find(id="result_box").find("span")
+ if languages_element and translated_element:
+ source_language, target_language = languages_element.attrs[
+ "href"].split("&sl=", 1)[1].split("&tl=", 1)
+ target_language = target_language.split("&", 1)[0]
+ translated = translated_element.text
+ event["stdout"].write("(%s > %s) %s" % (source_language,
+ target_language, translated))
+ return
+ event["stderr"].write("Failed to translate, try checking "
+ "source/target languages (https://cloud.google.com/translate/"
+ "v2/using_rest#language-params")
+
diff --git a/modules/twitter.py b/modules/twitter.py
new file mode 100644
index 00000000..7d732713
--- /dev/null
+++ b/modules/twitter.py
@@ -0,0 +1,69 @@
+import datetime, re, time, traceback
+import twitter
+import Utils
+
+REGEX_TWITTERURL = re.compile(
+ "https?://(?:www\.)?twitter.com/[^/]+/status/(\d+)", re.I)
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("twitter", "tw"
+ ).hook(self.twitter)
+
+ def make_timestamp(self, s):
+ seconds_since = time.time()-datetime.datetime.strptime(s,
+ "%a %b %d %H:%M:%S %z %Y").timestamp()
+ since, unit = Utils.time_unit(seconds_since)
+ return "%s %s ago" % (since, unit)
+
+ def twitter(self, event):
+ api_key = self.bot.config["twitter-api-key"]
+ api_secret = self.bot.config["twitter-api-secret"]
+ access_token = self.bot.config["twitter-access-token"]
+ access_secret = self.bot.config["twitter-access-secret"]
+
+ if event["args"]:
+ target = event["args"]
+ else:
+ target = event["log"].find(REGEX_TWITTERURL)
+ if target:
+ target = target.message
+ if target:
+ twitter_object = twitter.Twitter(auth=twitter.OAuth(
+ access_token, access_secret, api_key, api_secret))
+ url_match = re.search(REGEX_TWITTERURL, target)
+ if url_match or target.isdigit():
+ tweet_id = url_match.group(1) if url_match else target
+ try:
+ tweet = twitter_object.statuses.show(id=tweet_id)
+ except:
+ traceback.print_exc()
+ tweet = None
+ elif target.startswith("@"):
+ try:
+ tweet = twitter_object.statuses.user_timeline(
+ screen_name=target[1:], count=1)[0]
+ except:
+ traceback.print_exc()
+ tweet = None
+ if tweet:
+ username = "@%s" % tweet["user"]["screen_name"]
+ if "retweeted_status" in tweet:
+ original_username = "@%s" % tweet["retweeted_status"
+ ]["user"]["screen_name"]
+ original_text = tweet["retweeted_status"]["text"]
+ retweet_timestamp = self.make_timestamp(tweet[
+ "created_at"])
+ original_timestamp = self.make_timestamp(tweet[
+ "retweeted_status"]["created_at"])
+ event["stdout"].write("(%s (%s) retweeted %s (%s)) %s" % (
+ username, retweet_timestamp,
+ original_username, original_timestamp, original_text))
+ else:
+ event["stdout"].write("(%s, %s) %s" % (username,
+ self.make_timestamp(tweet["created_at"]), tweet["text"]))
+ else:
+ event["stderr"].write("Invalid tweet identifiers provided")
+ else:
+ event["stderr"].write("No tweet provided to get information about")
diff --git a/modules/urbandictionary.py b/modules/urbandictionary.py
new file mode 100644
index 00000000..a9815617
--- /dev/null
+++ b/modules/urbandictionary.py
@@ -0,0 +1,34 @@
+import json, re
+import Utils
+
+URL_URBANDICTIONARY = "http://api.urbandictionary.com/v0/define"
+REGEX_DEFNUMBER = re.compile("-n(\d+) \S+")
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("urbandictionary", "ud"
+ ).hook(self.ud, min_args=1,
+ help="Get the definition of a provided term")
+
+ def ud(self, event):
+ term = event["args"]
+ number = 1
+ match = re.match(REGEX_DEFNUMBER, term)
+ if match:
+ number = int(match.group(1))
+ term = term.split(" ", 1)[1]
+ page = Utils.get_url(URL_URBANDICTIONARY, get_params={"term": term},
+ json=True)
+ if page:
+ if len(page["list"]):
+ if number > 0 and len(page["list"]) > number-1:
+ definition = page["list"][number-1]
+ event["stdout"].write("%s: %s" % (definition["word"],
+ definition["definition"].replace("\n", " ").replace(
+ "\r", "").replace(" ", " ")))
+ else:
+ event["stderr"].write("Definition number does not exist")
+ else:
+ event["stderr"].write("No results found")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/weather.py b/modules/weather.py
new file mode 100644
index 00000000..868f9fca
--- /dev/null
+++ b/modules/weather.py
@@ -0,0 +1,35 @@
+import Utils
+
+URL_WEATHER = "http://api.openweathermap.org/data/2.5/weather"
+
+class Module(object):
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("weather").hook(
+ self.weather, min_args=1,
+ help="Get current weather data for a provided location")
+ self.bot = bot
+
+ def weather(self, event):
+ api_key = self.bot.config["openweathermap-api-key"]
+ page = Utils.get_url(URL_WEATHER, get_params={
+ "q": event["args"], "units": "metric",
+ "APPID": api_key},
+ json=True)
+ if page:
+ if "weather" in page:
+ location = "%s, %s" % (page["name"], page["sys"][
+ "country"])
+ celsius = "%dC" % page["main"]["temp"]
+ fahrenheit = "%dF" % ((page["main"]["temp"]*(9/5))+32)
+ description = page["weather"][0]["description"].title()
+ humidity = "%s%%" % page["main"]["humidity"]
+ wind_speed = "%sKM/h" % page["wind"]["speed"]
+
+ event["stdout"].write(
+ "(%s) %s/%s | %s | Humidity: %s | Wind: %s" % (
+ location, celsius, fahrenheit, description, humidity,
+ wind_speed))
+ else:
+ event["stderr"].write("No weather information for this location")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/wolframalpha.py b/modules/wolframalpha.py
new file mode 100644
index 00000000..534187dc
--- /dev/null
+++ b/modules/wolframalpha.py
@@ -0,0 +1,42 @@
+import re
+import Utils
+
+URL_WA = "http://api.wolframalpha.com/v2/query"
+REGEX_CHARHEX = re.compile("\\\\:(\S{4})")
+
+class Module(object):
+ _name = "Wolfram|Alpha"
+ def __init__(self, bot):
+ bot.events.on("received").on("command").on("wolframalpha", "wa"
+ ).hook(self.wa, min_args=1, help=
+ "Evauate a given string on Wolfram|Alpha")
+ self.bot = bot
+
+ def wa(self, event):
+ soup = Utils.get_url(URL_WA, get_params={"input": event["args"],
+ "appid": self.bot.config["wolframalpha-api-key"],
+ "format": "plaintext", "reinterpret": "true"}, soup=True)
+
+ if soup:
+ if int(soup.find("queryresult").get("numpods")) > 0:
+ input = soup.find(id="Input").find("subpod").find("plaintext"
+ ).text
+ for pod in soup.find_all("pod"):
+ if pod.get("primary") == "true":
+ answer = pod.find("subpod").find("plaintext")
+ text = "(%s) %s" % (input.replace(" | ", ": "),
+ answer.text.strip().replace(" | ", ": "
+ ).replace("\n", " | ").replace("\r", ""))
+ while True:
+ match = re.search(REGEX_CHARHEX, text)
+ if match:
+ text = re.sub(REGEX_CHARHEX, chr(int(
+ match.group(1), 16)), text)
+ else:
+ break
+ event["stdout"].write(text)
+ break
+ else:
+ event["stderr"].write("No results found")
+ else:
+ event["stderr"].write("Failed to load results")
diff --git a/modules/youtube.py b/modules/youtube.py
new file mode 100644
index 00000000..b5066e6f
--- /dev/null
+++ b/modules/youtube.py
@@ -0,0 +1,105 @@
+import re
+import Utils
+
+REGEX_YOUTUBE = re.compile(
+ "https?://(?:www.)?(?:youtu.be/|youtube.com/watch\?[^\S]*v=)(\w{11})",
+ re.I)
+REGEX_ISO8601 = re.compile("PT(\d+H)?(\d+M)?(\d+S)?", re.I)
+
+URL_YOUTUBESEARCH = "https://www.googleapis.com/youtube/v3/search"
+URL_YOUTUBEVIDEO = "https://www.googleapis.com/youtube/v3/videos"
+
+URL_YOUTUBESHORT = "https://youtu.be/%s"
+
+ARROW_UP = "▲"
+ARROW_DOWN = "▼"
+
+class Module(object):
+ def __init__(self, bot):
+ self.bot = bot
+ bot.events.on("received").on("command").on("yt", "youtube"
+ ).hook(self.yt,
+ help="Find a video on youtube")
+ bot.events.on("received").on("message").on("channel").hook(
+ self.channel_message)
+ bot.events.on("boot").on("done").hook(self.boot_done)
+
+ def validate_setchannel(self, s):
+ return s.lower() == "true"
+ def boot_done(self, event):
+ self.bot.events.on("postboot").on("configure").on(
+ "channelset").call(setting="autoyoutube",
+ help="Disable/Enable automatically getting info from youtube URLs",
+ validate=self.validate_setchannel)
+
+ def get_video_page(self, video_id, part):
+ return Utils.get_url(URL_YOUTUBEVIDEO, get_params={"part": part,
+ "id": video_id, "key": self.bot.config["google-api-key"]},
+ json=True)
+ def video_details(self, video_id):
+ snippet = self.get_video_page(video_id, "snippet")
+ if snippet["items"]:
+ snippet = snippet["items"][0]["snippet"]
+ statistics = self.get_video_page(video_id, "statistics")[
+ "items"][0]["statistics"]
+ content = self.get_video_page(video_id, "contentDetails")[
+ "items"][0]["contentDetails"]
+ video_uploader = snippet["channelTitle"]
+ video_title = snippet["title"]
+ video_views = statistics["viewCount"]
+ video_likes = statistics["likeCount"]
+ video_dislikes = statistics["dislikeCount"]
+ video_duration = content["duration"]
+
+ match = re.match(REGEX_ISO8601, video_duration)
+ video_duration = ""
+ video_duration += "%s:" % match.group(1)[:-1].zfill(2
+ ) if match.group(1) else ""
+ video_duration += "%s:" % match.group(2)[:-1].zfill(2
+ ) if match.group(2) else ""
+ video_duration += "%s" % match.group(3)[:-1].zfill(2
+ ) if match.group(3) else ""
+ return "%s (%s) uploaded by %s, %s views (%s%s%s%s) %s" % (
+ video_title, video_duration, video_uploader, "{:,}".format(
+ int(video_views)), video_likes, ARROW_UP, ARROW_DOWN, video_dislikes,
+ URL_YOUTUBESHORT % video_id)
+
+ def yt(self, event):
+ video_id = None
+ search = None
+ if event["args"]:
+ search = event["args"]
+ else:
+ last_youtube = event["channel"].log.find(REGEX_YOUTUBE)
+ if last_youtube:
+ video_id = re.search(REGEX_YOUTUBE, last_youtube.message).group(1)
+ if search or video_id:
+ if not video_id:
+ search_page = Utils.get_url(URL_YOUTUBESEARCH,
+ get_params={"q": search, "part": "snippet",
+ "maxResults": "1", "type": "video",
+ "key": self.bot.config["google-api-key"]},
+ json=True)
+ if search_page:
+ if search_page["pageInfo"]["totalResults"] > 0:
+ video_id = search_page["items"][0]["id"]["videoId"]
+ else:
+ event["stderr"].write("No videos found")
+ else:
+ event["stderr"].write("Failed to load results")
+ if video_id:
+ event["stdout"].write(self.video_details(video_id))
+ else:
+ event["stderr"].write("No search phrase provided")
+ else:
+ event["stderr"].write("No search phrase provided")
+
+ def channel_message(self, event):
+ match = re.search(REGEX_YOUTUBE, event["message"])
+ if match and event["channel"].get_setting("autoyoutube", False):
+ youtube_id = match.group(1)
+ video_details = self.video_details(youtube_id)
+ if video_details:
+ self.bot.events.on("send").on("stdout").call(target=event[
+ "channel"], message=video_details, module_name="Youtube",
+ server=event["server"])