Webサービスやアプリを開発して運営するのが趣味です。



BigQueryにTwitterのタイムラインを入れる [Ruby]

2014年07月03日 10:20

Twitterのタイムラインを保存しておくとなにかと便利なので、色々と保存形式を変えながら4年くらい記録し続けている。ツイートの保存が便利すぎるので、ツイセーブというサービス化までした。かつてはテキストで、MongoDBやMySQLとかGroongaとかいろいろやってきた。どれも問題ないんだけど、増え続けるログデータを保存する場所として考えるとBigQueryが現代にマッチしてるようなのでそちらに移行した。

BigQuery

BigQueryにTLを保存するとできること

TLの全てのデータをフルスキャンできる。これはかなり便利で、今回このブログ記事を書くにあたっても 'BigQuery' をTLから検索すれば、信頼できるフォローイングの人々の声を見ることができた。これにより「某CA社では5000台のMongoDBクラスタでBigQueryに対抗している」という不確かな情報まで獲得することができました。また、BigQueryにはJSONを展開して処理する機能もあるため、TwitterのようなJSONの生データを入れておけなんとでもできるようになる。例えば screen_name の変更を追跡したり、特定アカウントの Following / Follwer 増減グラフも容易に作成することができる。

Google Data Center

BigQuery安すぎて怖い

「hadoop - Googleの虎の子「BigQuery」をFluentdユーザーが使わない理由がなくなった理由 #gcpja - Qiita」 前に話題になったこの記事を読めばBigQueryのヤバさが伝わると思う。「120億行を5秒でフルスキャン可能」という文字列のインパクトはちょっとデータベース知ってる人なら変な汗かきまくるはず。BigQueryとは、どれだけデータ量が増えても数秒で処理できるキチガイじみたサービスです。僕のように個人がちょっとTL遡って検索したいという用途だけでも、Googleのデータセンターにある途方も無い数のサーバーを動かすことができると思うとテンション上がらずにはいられない。

ストレージ代も格安で、1TB あたり月額 $26 程度で保管してくれる。完全に異常な価格設定だと思う。ただしクエリごとに課金される料金体制なので、1TB処理に使用するごとに $5 程度料金が発生する。試しに100万件程度のTLのデータから正規表現でフルスキャンかけてみたところ、Bytes Processed: 94.5 MB となり完全に無料枠に収まった。100万件程度ならまあgrepでいいかなとは思うけど、このままいろいろなデータ突っ込み続けてTBレベルになった場合、BigQuery同等のHadoopクラスタ構築したら何百万円積むはめになるのかわからない。
※ストレージ代間違っていたので訂正しました

下のコードについて

TLをBigQueryに流すだけのRubyコードを以下に書いておきました。実際の運用では favorite や delete の監視も行うため、実際のコードはもう少し多くなります。BigQueryの多くのメソッドに対応しているデファクトスタンダードなRubyGemsはいまのところ見受けられないように感じた。BigBroda gemよさそう。まあ Google API Client gem に軽くラップするだけといえばそうなので各自思うように書けばいいと思う。Google API 叩いたら JSON Schema で仕様返ってきて便利だった。

require 'google/api_client'
require 'twitter'
require 'pp'

class BigQuery
  APP_NAME = 'https://twitter.com/9m'
  TABLE_ID = 'timeline'
  SCHEMA_FIELDS = [
    {
      name: 'status_id',
      type: 'INTEGER'
    },
    {
      name: 'screen_name',
      type: 'STRING'
    },
    {
      name: 'text',
      type: 'STRING'
    },
    {
      name: 'created_at',
      type: 'TIMESTAMP'
    },
    {
      name: 'json',
      type: 'STRING'
    }
  ]

  def initialize(opts = {})
    @client = Google::APIClient.new(
      application_name: APP_NAME,
      application_version: '0.1'
    )

    key = Google::APIClient::PKCS12.load_key(File.open(
      opts[:key], mode: 'rb'),
      "notasecret"
    )

    @asserter = Google::APIClient::JWTAsserter.new(
      opts[:service_email],
      "https://www.googleapis.com/auth/bigquery",
      key
    )

    @client.authorization = @asserter.authorize

    @bq = @client.discovered_api("bigquery", "v2")

    @project_id = opts[:project_id]
    @dataset    = opts[:dataset]
    @table_id   = TABLE_ID
  end

  def create_table
    response = @client.execute({
      :api_method => @bq.tables.insert,
      :parameters => {
        'projectId' => @project_id,
        'datasetId' => @dataset
      },
      :body_object => {
        'tableReference' => {
          'projectId' => @project_id,
          'datasetId' => @dataset,
          'tableId'   => TABLE_ID
        },
        'schema' => {
          'fields' => SCHEMA_FIELDS
        }
      }
    })

    JSON.parse(response.body)
  end

  def insert(rows)
    rows = rows.map do |row|
      row[:created_at] = format_timestamp(row[:created_at])
      {
        'json' => row
      }
    end

    response = @client.execute({
      :api_method => @bq.tabledata.insert_all,
      :parameters => {
        'projectId' => @project_id,
        'datasetId' => @dataset,
        'tableId' => @table_id,
      },
      :body_object => {
        "rows" => rows
      }
    })

    JSON.parse(response.body)
  end

  private

  def format_timestamp(time)
    time.strftime("%Y-%m-%d %H:%M:%S")
  end
end

class Timeline
  def initialize(bq)
    @bq = bq
    @rows = []
  end

  def client
    @client ||= Twitter::Streaming::Client.new do |config|
      config.consumer_key        = "YOUR_CONSUMER_KEY"
      config.consumer_secret     = "YOUR_CONSUMER_SECRET"
      config.access_token        = "YOUR_ACCESS_TOKEN"
      config.access_token_secret = "YOUR_ACCESS_SECRET"
    end
  end

  def row_object(object)
    {
      status_id:  object.id,
      screen_name: object.user.screen_name,
      text: fulltext_object(object),
      created_at: object.created_at,
      json: object.to_h.to_json
    }
  end

  def fulltext_object(object)
    ''.tap do |text|
      text << object.text
      if object.entities?
        object.uris.each do |url|
          text = text.gsub(url.url.to_s, url.expanded_url.to_s)
        end
        object.media.each do |media|
          text = text.gsub(media.url.to_s, media.expanded_url.to_s)
        end
      end
    end
  end

  def run
    client.user do |object|
      next unless object.is_a? Twitter::Tweet

      @rows << row_object(object)

      if @rows.size > 1000
        pp @bq.insert(@rows)
        @rows = []
      end
    end
  rescue => e
    @bq.insert(@rows)
    raise e
  end
end

bq = BigQuery.new({
  client_id: '',
  service_email: '',
  key: '',
  project_id: '',
  dataset: '',
})

# スキーマ作成(作成済みならエラーメッセージが返るだけ)
pp bq.create_table

Timeline.new(bq).run

大切なデータをGoogleに預けられないというコンプライアンスの問題なのかわからないけど、日本の利用者が少ないという問題があると思う。実際使ってみるにあたって、日本語の情報が少なすぎるというように感じた。公式のドキュメントだけ見れば分かるでしょうという話はあるけど、ちょっとした解説とソースコード見れば手っ取り早く試せるというのも大きくあると思う。