統計学、機械学習などを使って身近な世界を分析したりするブログです

データで見るM1グランプリ2017 〜データ取得編〜

今回はTwitter APIを用いてデータを取得しています。

なお、大量のツイートを取得しようとするといろんな取得制限がかかって結構エラーが出ます。そこで、こちらのサイトを参考にさせていただきました。素晴らしいコードで、全くエラーが出ません。

ailaby.com

僕のコードは以下の通りです。

# 必要なライブラリをインポート
 
from requests_oauthlib import OAuth1Session
import json
import datetime, time, sys
from abc import ABCMeta, abstractmethod
import pandas as pd
from pandas import Series, DataFrame
from dateutil.parser import parse
 
CK = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'    # Consumer Key
CS = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'    # Consumer Secret
AT = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'    # Access Token
AS = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'    # Access Token Secret

事前に取得したConsumer Key、Consumer Secret、Access Token、Access Token Secretを入力します。

次に、ツイートを取得する関数、ツイート取得制限の残り回数を確認する関数、取得制限が解除されるまで待機する関数を定義します。

class TweetsGetter(object):
    __metaclass__ = ABCMeta
 
    def __init__(self):
        self.session = OAuth1Session(CK, CS, AT, AS)
 
    @abstractmethod
    def specifyUrlAndParams(self, keyword):
        '''
        呼出し先 URL、パラメータを返す
        '''
 
    @abstractmethod
    def pickupTweet(self, res_text, includeRetweet):
        '''
        res_text からツイートを取り出し、配列にセットして返却
        '''
 
    @abstractmethod
    def getLimitContext(self, res_text):
        '''
        回数制限の情報を取得 (起動時)
        '''
 
    def collect(self, total = -1, onlyText = False, includeRetweet = False):
        '''
        ツイート取得を開始する
        '''
 
        #----------------
        # 回数制限を確認
        #----------------
        self.checkLimit()
 
        #----------------
        # URL、パラメータ
        #----------------
        url, params = self.specifyUrlAndParams()
        params['include_rts'] = str(includeRetweet).lower()
        # include_rts は statuses/user_timeline のパラメータ。search/tweets には無効
 
        #----------------
        # ツイート取得
        #----------------
        cnt = 0
        unavailableCnt = 0
        while True:
            res = self.session.get(url, params = params)
            if res.status_code == 503:
                # 503 : Service Unavailable
                if unavailableCnt > 10:
                    raise Exception('Twitter API error %d' % res.status_code)
 
                unavailableCnt += 1
                print ('Service Unavailable 503')
                self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
                continue
 
            unavailableCnt = 0
 
            if res.status_code != 200:
                raise Exception('Twitter API error %d' % res.status_code)
 
            tweets = self.pickupTweet(json.loads(res.text))
            if len(tweets) == 0:
                # len(tweets) != params['count'] としたいが
                # count は最大値らしいので判定に使えない。
                # ⇒  "== 0" にする
                # https://dev.twitter.com/discussions/7513
                break
 
            for tweet in tweets:
                if (('retweeted_status' in tweet) and (includeRetweet is False)):
                    pass
                else:
                    if onlyText is True:
                        yield tweet['text']
                    else:
                        yield tweet
 
                    cnt += 1
                    if cnt % 100 == 0:
                        print ('%d件 ' % cnt)
 
                    if total > 0 and cnt >= total:
                        return
 
            params['max_id'] = tweet['id'] - 1
 
            # ヘッダ確認 (回数制限)
            # X-Rate-Limit-Remaining が入ってないことが稀にあるのでチェック
            if ('X-Rate-Limit-Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers):
                if (int(res.headers['X-Rate-Limit-Remaining']) == 0):
                    self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset']))
                    self.checkLimit()
            else:
                print ('not found  -  X-Rate-Limit-Remaining or X-Rate-Limit-Reset')
                self.checkLimit()
 
    def checkLimit(self):
        '''
        回数制限を問合せ、アクセス可能になるまで wait する
        '''
        unavailableCnt = 0
        while True:
            url = "https://api.twitter.com/1.1/application/rate_limit_status.json"
            res = self.session.get(url)
 
            if res.status_code == 503:
                # 503 : Service Unavailable
                if unavailableCnt > 10:
                    raise Exception('Twitter API error %d' % res.status_code)
 
                unavailableCnt += 1
                print ('Service Unavailable 503')
                self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
                continue
 
            unavailableCnt = 0
 
            if res.status_code != 200:
                raise Exception('Twitter API error %d' % res.status_code)
 
            remaining, reset = self.getLimitContext(json.loads(res.text))
            if (remaining == 0):
                self.waitUntilReset(reset)
            else:
                break
 
    def waitUntilReset(self, reset):
        '''
        reset 時刻まで sleep
        '''
        seconds = reset - time.mktime(datetime.datetime.now().timetuple())
        seconds = max(seconds, 0)
        print ('\n     =====================')
        print ('     == waiting %d sec ==' % seconds)
        print ('     =====================')
        sys.stdout.flush()
        time.sleep(seconds + 10)  # 念のため + 10 秒
 
    @staticmethod
    def bySearch(keyword):
        return TweetsGetterBySearch(keyword)
 
    @staticmethod
    def byUser(screen_name):
        return TweetsGetterByUser(screen_name)

今回はユーザー指定ではなく、キーワード検索でツイートを取得します。以下の通り、キーワード検索によるツイート取得関数を定義します。

class TweetsGetterBySearch(TweetsGetter):
    '''
    キーワードでツイートを検索
    '''
    def __init__(self, keyword):
        super(TweetsGetterBySearch, self).__init__()
        self.keyword = keyword
        
    def specifyUrlAndParams(self):
        '''
        呼出し先 URL、パラメータを返す
        '''
        url = 'https://api.twitter.com/1.1/search/tweets.json?'
        params = {'q':self.keyword, 'count':100}
        return url, params
 
    def pickupTweet(self, res_text):
        '''
        res_text からツイートを取り出し、配列にセットして返却
        '''
        results = []
        for tweet in res_text['statuses']:
            results.append(tweet)
 
        return results
 
    def getLimitContext(self, res_text):
        '''
        回数制限の情報を取得 (起動時)
        '''
        remaining = res_text['resources']['search']['/search/tweets']['remaining']
        reset     = res_text['resources']['search']['/search/tweets']['reset']
 
        return int(remaining), int(reset)

検索するキーワードを入れて、走らせます。今回は、RTとリプライは除外し、M1が放送された12/3の、優勝者が決定した22:05以前のツイートを取得したいので、以下のようにキーワードを入力します。

created_at = []
text = []

# キーワードで取得
getter = TweetsGetter.bySearch(u'マジカルラブリー AND -filter:retweets AND -filter:replies AND until:2017-12-3_22:05:00_JST')

# ユーザーを指定して取得 (screen_name)
#getter = TweetsGetter.byUser('AbeShinzo')

cnt = 0
for tweet in getter.collect(total = 1000000):
    #cnt += 1
    #print ('------ %d' % cnt)
    #print ('{} {} {}'.format(tweet['id'], tweet['created_at'], '@'+tweet['user']['screen_name']))
    #print (tweet['text'])
    created_at.append(tweet['created_at'])
    text.append(tweet['text'])

ツイートの日時とテキストが入ったリストをデータフレーム化し、csvファイルに保存します。

created_at = Series(created_at)
text = Series(text)

#各シリーズをデータフレーム化
m1_df = pd.concat([created_at, text],axis=1)

#カラム名
m1_df.columns=['created_at','text']

#csvファイルとして保存
m1_df.to_csv('m1_madicallovely1.csv', sep = '\t',encoding='utf-16')

この操作を、決勝戦に出場した10コンビにしてあげれば、データ取得完了です。

なお、Twitter APIを使ったデータ取得では、現時点では1週間以上前のツイートを取得することはできないので注意しましょう。