今回はTwitter APIを用いてデータを取得しています。
なお、大量のツイートを取得しようとするといろんな取得制限がかかって結構エラーが出ます。そこで、こちらのサイトを参考にさせていただきました。素晴らしいコードで、全くエラーが出ません。
僕のコードは以下の通りです。
# 必要なライブラリをインポート from requests_oauthlib import OAuth1Session import json import datetime, time, sys from abc import ABCMeta, abstractmethod import pandas as pd from pandas import Series, DataFrame from dateutil.parser import parse CK = 'xxxxxxxxxxxxxxxxxxxxxxxxxx' # Consumer Key CS = 'xxxxxxxxxxxxxxxxxxxxxxxxxx' # Consumer Secret AT = 'xxxxxxxxxxxxxxxxxxxxxxxxxx' # Access Token AS = 'xxxxxxxxxxxxxxxxxxxxxxxxxx' # Access Token Secret
事前に取得したConsumer Key、Consumer Secret、Access Token、Access Token Secretを入力します。
次に、ツイートを取得する関数、ツイート取得制限の残り回数を確認する関数、取得制限が解除されるまで待機する関数を定義します。
class TweetsGetter(object): __metaclass__ = ABCMeta def __init__(self): self.session = OAuth1Session(CK, CS, AT, AS) @abstractmethod def specifyUrlAndParams(self, keyword): ''' 呼出し先 URL、パラメータを返す ''' @abstractmethod def pickupTweet(self, res_text, includeRetweet): ''' res_text からツイートを取り出し、配列にセットして返却 ''' @abstractmethod def getLimitContext(self, res_text): ''' 回数制限の情報を取得 (起動時) ''' def collect(self, total = -1, onlyText = False, includeRetweet = False): ''' ツイート取得を開始する ''' #---------------- # 回数制限を確認 #---------------- self.checkLimit() #---------------- # URL、パラメータ #---------------- url, params = self.specifyUrlAndParams() params['include_rts'] = str(includeRetweet).lower() # include_rts は statuses/user_timeline のパラメータ。search/tweets には無効 #---------------- # ツイート取得 #---------------- cnt = 0 unavailableCnt = 0 while True: res = self.session.get(url, params = params) if res.status_code == 503: # 503 : Service Unavailable if unavailableCnt > 10: raise Exception('Twitter API error %d' % res.status_code) unavailableCnt += 1 print ('Service Unavailable 503') self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30) continue unavailableCnt = 0 if res.status_code != 200: raise Exception('Twitter API error %d' % res.status_code) tweets = self.pickupTweet(json.loads(res.text)) if len(tweets) == 0: # len(tweets) != params['count'] としたいが # count は最大値らしいので判定に使えない。 # ⇒ "== 0" にする # https://dev.twitter.com/discussions/7513 break for tweet in tweets: if (('retweeted_status' in tweet) and (includeRetweet is False)): pass else: if onlyText is True: yield tweet['text'] else: yield tweet cnt += 1 if cnt % 100 == 0: print ('%d件 ' % cnt) if total > 0 and cnt >= total: return params['max_id'] = tweet['id'] - 1 # ヘッダ確認 (回数制限) # X-Rate-Limit-Remaining が入ってないことが稀にあるのでチェック if ('X-Rate-Limit-Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers): if (int(res.headers['X-Rate-Limit-Remaining']) == 0): self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset'])) self.checkLimit() else: print ('not found - X-Rate-Limit-Remaining or X-Rate-Limit-Reset') self.checkLimit() def checkLimit(self): ''' 回数制限を問合せ、アクセス可能になるまで wait する ''' unavailableCnt = 0 while True: url = "https://api.twitter.com/1.1/application/rate_limit_status.json" res = self.session.get(url) if res.status_code == 503: # 503 : Service Unavailable if unavailableCnt > 10: raise Exception('Twitter API error %d' % res.status_code) unavailableCnt += 1 print ('Service Unavailable 503') self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30) continue unavailableCnt = 0 if res.status_code != 200: raise Exception('Twitter API error %d' % res.status_code) remaining, reset = self.getLimitContext(json.loads(res.text)) if (remaining == 0): self.waitUntilReset(reset) else: break def waitUntilReset(self, reset): ''' reset 時刻まで sleep ''' seconds = reset - time.mktime(datetime.datetime.now().timetuple()) seconds = max(seconds, 0) print ('\n =====================') print (' == waiting %d sec ==' % seconds) print (' =====================') sys.stdout.flush() time.sleep(seconds + 10) # 念のため + 10 秒 @staticmethod def bySearch(keyword): return TweetsGetterBySearch(keyword) @staticmethod def byUser(screen_name): return TweetsGetterByUser(screen_name)
今回はユーザー指定ではなく、キーワード検索でツイートを取得します。以下の通り、キーワード検索によるツイート取得関数を定義します。
class TweetsGetterBySearch(TweetsGetter): ''' キーワードでツイートを検索 ''' def __init__(self, keyword): super(TweetsGetterBySearch, self).__init__() self.keyword = keyword def specifyUrlAndParams(self): ''' 呼出し先 URL、パラメータを返す ''' url = 'https://api.twitter.com/1.1/search/tweets.json?' params = {'q':self.keyword, 'count':100} return url, params def pickupTweet(self, res_text): ''' res_text からツイートを取り出し、配列にセットして返却 ''' results = [] for tweet in res_text['statuses']: results.append(tweet) return results def getLimitContext(self, res_text): ''' 回数制限の情報を取得 (起動時) ''' remaining = res_text['resources']['search']['/search/tweets']['remaining'] reset = res_text['resources']['search']['/search/tweets']['reset'] return int(remaining), int(reset)
検索するキーワードを入れて、走らせます。今回は、RTとリプライは除外し、M1が放送された12/3の、優勝者が決定した22:05以前のツイートを取得したいので、以下のようにキーワードを入力します。
created_at = [] text = [] # キーワードで取得 getter = TweetsGetter.bySearch(u'マジカルラブリー AND -filter:retweets AND -filter:replies AND until:2017-12-3_22:05:00_JST') # ユーザーを指定して取得 (screen_name) #getter = TweetsGetter.byUser('AbeShinzo') cnt = 0 for tweet in getter.collect(total = 1000000): #cnt += 1 #print ('------ %d' % cnt) #print ('{} {} {}'.format(tweet['id'], tweet['created_at'], '@'+tweet['user']['screen_name'])) #print (tweet['text']) created_at.append(tweet['created_at']) text.append(tweet['text'])
ツイートの日時とテキストが入ったリストをデータフレーム化し、csvファイルに保存します。
created_at = Series(created_at) text = Series(text) #各シリーズをデータフレーム化 m1_df = pd.concat([created_at, text],axis=1) #カラム名 m1_df.columns=['created_at','text'] #csvファイルとして保存 m1_df.to_csv('m1_madicallovely1.csv', sep = '\t',encoding='utf-16')
この操作を、決勝戦に出場した10コンビにしてあげれば、データ取得完了です。
なお、Twitter APIを使ったデータ取得では、現時点では1週間以上前のツイートを取得することはできないので注意しましょう。