nがひとつ多い。

えぬなおの技術的なことを書いていくとこ。

【Rust】Twitterのstream apiの現状とRustでの実装について

とある嫁案件が事の発端

嫁からTwitterにてとあるキーワードを検索したいが、いちいち検索するの面倒だからリアルタイムでslackとかに投げてほしいとの依頼を受けた。はっきり言ってこの開発はなんとなく相当面倒な仕様になるのが予想できたし、仕事なら開発から導入までで数十万くらいならやるかなといったところだが、勉強がてらやってみることにした。 

要件と成果物

要件

- リアルタイムでTwitterの検索して、新しいTweetがきたら通知してほしい。

- 実装はえぬなおの自由

成果物

- 勉強がてらピュアRustでスレッドセーフなコード。

- 通知は家族内slackに投げることにした。

- 主なライブラリは https://github.com/tesaguri/twitter-stream-rs(学生さんが作ったものみたい……衝撃)

- Twitter APIからリクエスト失敗した時のエラーハンドリングはドキュメント通りに自前実装

- 新たに金を払いたくないから自宅Kubernetesにデプロイ

とりあえず出来上がった。動きも悪くはない。

asciinema.org

 

このコンソールに映ってる奴が加工されてslackに飛ぶ。

 

f:id:nnao45:20190513004305p:plain

 

github.com

しかしこの開発、俺の嗅覚は間違っていなくて色々面倒だったから苦労話でも以下に書く。

Twitterまわり

TwitterのStreamって廃止されたんじゃないの?

User Streamはな。

mobilelaby.com

 

まずはじめに俺は「そういえばUser Streamって廃止されたよな………?」と思った。しかしよく調べるとどうやらUser StreamってのはTwitter Stream APIの1つであるらしい。

 

Twitterでは以下のStreaming APIが用意されています。

● Public streams
Twitter上で流れる公開データのストリームのこと。特定のユーザやトピックをフォローしたり、データマイニングに適しています。

● User streams
ユーザ1人のストリームのこと。Twitterのシングルユーザビューで参照できるほぼ全てのデータを網羅する。

● Sites streams
ユーザストリームのマルチユーザ版。サイトストリームは、多くのユーザに代わってTwitterに接続しなければならないサーバを対象としている。

 

Twitter、「User Streams API」を8月17日に廃止。サードアプリはタイムライン取得が制限へ

 

別に今回は俺や嫁のアカウントじゃなくてもいいキーワードであったため、Public Streamを使うのでこの件とは何も関係なかった。知らなかった……。

Twitter APIを叩くのに開発者用のアカウントを作らなきゃいけない。

色々なサイトを見るに、Twitter APIを叩くにはユーザートークンを発行する必要があるらしい。developer.twitter.comで作れる。

 

以下が詳しいが、

qiita.com

 

これが結構大変で、300文字以上の開発者用の機能を使う目的や、アプリを使うためのドメインとか(localhostとかアドレスじゃだめ)、初心者だとこれだけでも心が折れる。幸いにも俺は調子が良かったので頑張って登録したが、Rustで非同期プログラミングの実装とかやっている途中だったら危なかったかもしれない。 

Rustまわり

使ったライブラリについて

今回は以下のライブラリを使ったと前述した、

 

github.com

 

基本的には以下のようなコードでツイートをストリーミングとして受け取ることができる。

 

gist.github.com

https://github.com/tesaguri/twitter-stream-rs/blob/master/examples/echo_bot.rs

変数TRACKに検索したいキーワードを代入して、かかったのを引っ掛けてツイートを受け取れる。

関数としては変数streamが非同期メッセージでjsonシリアライズなバイトデータを取ってくるので、それをmoveクロージャに代入して処理をつくり、それら処理をTokioライブラリによって実装する形となる。

github.com

 

Rust有識者なら分かると思うが、受け取ったデータに対して標準出力するなりどっかに渡すなりする動作は変数bot内のクロージャに閉じ込められているので相当に扱いが面倒なのは覚悟しておいたほうがいい。

 

まぁメッセージングってのは得てしてそんなもんだ。

 

型としてはtweetustという有名なrustでTwitter APIを扱えるようにしたライブラリのtweet型で返ってくる。

github.com

 

詳しく書くと、まるごと必要な情報はストリーミングからfor_eachメソッドで取り出した jsonをサンプルコードの通りだと Ok(StreamMessage::Tweet(tweet)) = json::from_str(&json) のようにtwitter-stream-rsと同リポジトリtwitter-stream-messageライブラリを使用する事で受け取る事ができる。 逆に言えばこのライブラリを使わないとjsonからデシリアライズされた生文字列から値を頑張って取り出す必要があり、使わない手はないと思われる。

420コード

さてとそれでそれならばとサンプル通り普通に使ったところ、少しほっておくといつの間に終了するアプリが出来てしまった。そんな甘くないらしい。

エラーコードを見てみると、420っていうエラーコードが返っているではないか……知らね……。

公式ドキュメントを見て見る限りは、

Response Codes — Twitter Developers

Returned when an app is being rate limited for making too many requests.

は………w

俺はこの時点ではこのapiを2スレッドで叩いていただけなんだが、それでもう制限にぶち当たるらしい。きっつー。

[追記:2019年 5月15日 水曜日 10時44分02秒 JST]

どうやらtwitterのstream apiは2本クライアントたててやると速度制限に引っかかるらしい・・・1本でやれば早々止まらない。

また、検索したい言葉をAND検索やOR検索したときは、ANDは「えぬなお Rust」、ORなら「えぬなお, Rust」のようにtrackに文字列を渡せば上手くいく。これを駆使してtwitter1アカウントにつき1ストリームを意識するといい。

westplain.sakuraweb.com

 

420コードへの対処

ほぼこのエラーコードなので、どうにかする事に。

幸いにも日本語情報を見つけた。

qiita.com

twitterのドキュメントを調べてみると420が返ってきた場合(と他のエラーの場合)の再接続のベストプラクティスが示されているのでそれに従う。またstreamも一度にひとつだけ開くようにする。
Back off exponentially for HTTP 420 errors. Start with a 1 minute wait and double each attempt. Note that every HTTP 420 received increases the time you must wait until rate limiting will no longer will be in effect for your account.
参考: https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/connecting
HTTP 420 errorの場合インターバルを置き再々接続までの時間を最大値まで指数関数的に増やすようにと指示されている。(例:60秒,120秒,240秒,…3600秒)
node-twitterでは実装の例などは無いので自分で実装する必要があるが、issue/159にコードの例があり参考にできる。 https://github.com/desmondmorris/node-twitter/issues/159

 ちゅーことで、fluentdのretry_waitみたいな感じで指数関数的にretryするような仕組みを作る必要があるわけだ。

420コードへの対策

グローバルにstatic変数をおくとライフタイムや所有権管理が面倒すぎるし、まぁ鉄板なとこでクロージャ内にカウンタを埋め込んで管理する事にする。

gist.github.com

ライブラリの方のサンプルコードにはないloop構文だが、実際これを置いておかないと、stream apiからエラーが帰ってきてstreamから離脱させられた時にそのままプロセスごと落ちてしまう事となる。

速度制限のエラーコード程度でプロセスを再起動してたら夜も眠れないので、前述したベストプラクティスのように連続してエラーで帰ってきたら60s、120sと再度stream apiに聞きに行くまでのインターバルを伸ばしながら立ち上げるようにした。

以下が参考になったというかそのまま使用させて頂いた。

saba1024.hateblo.jp

クロージャをこういう感じでグローバル変数代わりに使うときはBoxとmoveクロージャを使って、クロージャに所有権を渡して、加えてクロージャ経由以外からカウンタに当たる変数が更新できないよう封じ込めると上手くいく。

Dockerfile

今回はコンテナでデプロイした。

gist.github.com

マルチステージビルドにしたが、動かす用のコンテナをalpineにしたら動かなくてよくわからなかったので、rust-slimイメージと同じdebian-slimを採用した。イメージは40MBくらいになった・・・軽すぎて草。

別に大したことしてないので、これについては特に説明することもない。

[追記:2019年 5月15日 水曜日 10時44分02秒 JST]

cargo build長すぎたので少し変えた。

以下が詳しい。

RustのCargoプロジェクトで素直に書いたDockerfileをdocker buildするとソースが書き換わるたびにフルビルドが走って滅茶苦茶遅いことはcargoのファイルだけコピーしてビルドすることで解決します - ncaq

終わりに

Rustの非同期処理、async/awaitはよstableになってくれい。