aboutsummaryrefslogtreecommitdiff
path: root/modules/tweets/__init__.py
blob: 2788ca80b9062b5e0bc6df477968f7ab27589b3e (about) (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#--depends-on commands
#--depends-on permissions
#--require-config twitter-api-key
#--require-config twitter-api-secret
#--require-config twitter-access-token
#--require-config twitter-access-secret

import json, re
from src import ModuleManager, utils
from . import format
import tweepy

_bot = None
_events = None
_exports = None

REGEX_TWITTERURL = re.compile(
    "https?://(?:www\.)?twitter.com/[^/]+/status/(\d+)", re.I)

def _get_follows():
    return _bot.database.channel_settings.find_by_setting("twitter-follow")

class BitBotStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        _bot.trigger(lambda: self._on_status(status))
    def _on_status(self, status):
        given_username = status.user.screen_name.lower()

        follows = []
        for server_id, channel_name, value in _get_follows():
            for username in value:
                if username.lower() == given_username:
                    server = _bot.get_server_by_id(server_id)
                    if server and channel_name in server.channels:
                        follows.append([server, server.channels.get(channel_name)])

        tweet = format._tweet(_exports, status)
        for server, channel in follows:
            _events.on("send.stdout").call(target=channel,
                module_name="Tweets", server=server, message=tweet)

@utils.export("channelset", {"setting": "auto-tweet",
    "help": "Enable/disable automatically getting tweet info",
    "validate": utils.bool_or_none, "example": "on"})
class Module(ModuleManager.BaseModule):
    _stream = None
    def on_load(self):
        global _bot
        global _events
        global _exports
        _bot = self.bot
        _events = self.events
        _exports = self.exports
        self._start_stream()

    def unload(self):
        self._dispose_stream()

    def _dispose_stream(self):
        if not self._stream == None:
            self._stream.disconnect()

    def _get_auth(self):
        auth = tweepy.OAuthHandler(self.bot.config["twitter-api-key"],
            self.bot.config["twitter-api-secret"])
        auth.set_access_token(self.bot.config["twitter-access-token"],
            self.bot.config["twitter-access-secret"])
        return auth
    def _get_api(self, auth):
        return tweepy.API(auth)

    def _from_id(self, tweet_id):
        return self._get_api(self._get_auth()).get_status(tweet_id)
    def _from_username(self, username):
        return self._get_api(self._get_auth()).user_timeline(
            screen_name=username, count=1)[0]

    def _start_stream(self):
        self._dispose_stream()

        usernames = set([])
        for server_id, channel_name, value in _get_follows():
            for username in value:
                usernames.add(username)
        if not usernames:
            return False

        auth = self._get_auth()
        api = self._get_api(auth)

        user_ids = []
        for username in usernames:
            user_ids.append(str(api.get_user(screen_name=username).id))

        self._stream = tweepy.Stream(auth=auth, listener=BitBotStreamListener())

        print(usernames)
        self._stream.filter(follow=user_ids, is_async=True)
        return True

    @utils.hook("received.command.tfollow", min_args=2, channel_only=True)
    def tfollow(self, event):
        """
        :help: Stream tweets from a given account to the current channel
        :usage: add|remove @<username>
        :permission: twitter-follow
        """
        username = event["args_split"][1]
        if username.startswith("@"):
            username = username[1:]

        subcommand = event["args_split"][0].lower()
        follows = event["target"].get_setting("twitter-follow", [])
        action = None

        if subcommand == "add":
            action = "followed"
            if username in follows:
                raise utils.EventError("Already following %s" % username)
            follows.append(username)
        elif subcommand == "remove":
            action = "unfollowed"
            if not username in follows:
                raise utils.EventError("Not following %s" % username)
            follows.remove(username)
        else:
            raise utils.EventError("Unknown subcommand")

        event["target"].set_setting("twitter-follow", follows)
        self._start_stream()
        event["stdout"].write("%s @%s" % (action.title(), username))

    @utils.hook("received.command.tw", alias_of="tweet")
    @utils.hook("received.command.tweet")
    def tweet(self, event):
        """
        :help: Get/find a tweet
        :usage: [@username/URL/ID]
        """

        if event["args"]:
            target = event["args"]
        else:
            target = event["target"].buffer.find(REGEX_TWITTERURL)
            if target:
                target = target.message
        if target:
            url_match = re.search(REGEX_TWITTERURL, target)
            if url_match or target.isdigit():
                tweet_id = url_match.group(1) if url_match else target
                tweet = self._from_id(tweet_id)
            else:
                if target.startswith("@"):
                    target = target[1:]
                tweet = self._from_username(target)

            if tweet:
                tweet_str = format._tweet(self.exports, tweet)
                event["stdout"].write(tweet_str)
            else:
                event["stderr"].write("Invalid tweet identifiers provided")
        else:
            event["stderr"].write("No tweet provided to get information about")

    @utils.hook("command.regex", pattern=REGEX_TWITTERURL)
    def regex(self, event):
        """
        :command: tweet
        """
        if event["target"].get_setting("auto-tweet", False):
            event.eat()
            tweet_id = event["match"].group(1)
            tweet = self._from_id(tweet_id)
            if tweet:
                tweet_str = format._tweet(self.exports, tweet)
                event["stdout"].write(tweet_str)