【超訳】multidplyr パッケージの使い方 #rstatsj
注: この記事は2017年02月25日に最終更新したQiita投稿記事を、こちらのブログに移植したものです。
この記事は R の multidplyr パッケージ(Version: 0.0.0.9000)の [Vignette] (https://github.com/hadley/multidplyr/blob/master/vignettes/multidplyr.md) を超訳したものです。multidplyr のライセンスが GPL-3 であることに基づき、本記事も GPL-3 ライセンスとします。
英語力に自信がないので、正確な情報は上記の原文をご覧ください。 その際、誤訳などをご指摘くださると大変助かります! よろしくお願いします。
multidplyr の使い方
Hadley Wickham
multidplyr は、 dplyr のための新しいバックエンドです。 あなたがよく知っている dplyr の 「verb1」 を使いながら、単一コア上で計算する代わりに、複数のコアに分散させて計算することができます。 自分のコンピュータ上にローカルクラスタを効率的に作成でき、その後 multidplyr は各ノードに何をすべきかを伝えてくれます。
基本
multidplyr は、データの取り扱いというのは高価な操作なのでなるべくしないようにするという主義のもと、設計されています。 基本的な操作の順序は以下の通りです。
partition()
を呼び出し、複数のコアにデータセットを分割します。これにより、パーティション化されたデータフレーム(partitioned data frame、略して party df)が作成されます。party df に適用される各 dplyr 「verb」は、それぞれのコアで独立して操作を実行します。それぞれのコアでそれぞれ実行結果が残り、また別の party df を返します。
各コアで実行する必要がある高価な操作が完了したら、
collect()
を呼んで各コア上のデータを取得し、ローカルコンピュータに戻します。
それでは、 nycflights13::flights
データを使って、簡単な例を見てみましょう。このデータセットは、2013年にニューヨークから出発したおよそ30万件のフライト情報が含まれています。
まず、必要なパッケージを読み込みます:
library(dplyr) library(multidplyr) library(nycflights13)
次に、フライト番号でデータフレームを分割し、各フライトの平均遅延時間を計算し、結果を集めます(collect()
):
flights1 <- partition(flights, flight) #> Initialising 7 core cluster. flights2 <- summarise(flights1, dep_delay = mean(dep_delay, na.rm = TRUE)) flights3 <- collect(flights2)
これは通常の dplyr のコードと同じように見えますが、裏側はかなり異なっています。 flights1
と flights2
は、「party df」なのです。これらは、通常のデータフレームのように見えますが、「shard の数」という追加の属性を持っています。この例では、flights2
は7つのノードに分散しており、各ノードの行数が503行から609行までの範囲で異なっていることが Shards からわかります。 partition()
は常に、1つのグループを1つのノードで保持するようにしてくれるのです。
flights2 #> Source: party_df [3,844 x 2] #> Shards: 7 [482--603 rows] #> #> # S3: party_df #> flight dep_delay #> {0} {1} #> 1 1 5.2932761 #> 2 16 -0.2500000 #> 3 19 10.0500000 #> 4 28 13.6000000 #> 5 32 11.7884615 #> 6 40 12.5166667 #> 7 46 0.0000000 #> 8 49 -0.4827586 #> 9 59 3.6527197 #> 10 65 7.6979167 #> # ... with 3,834 more rows
パフォーマンス
このデータサイズでは、ローカルクラスタを使用すると、実際のパフォーマンスは非常に遅くなってしまいます!
system.time({ flights %>% partition() %>% summarise(mean(dep_delay, na.rm = TRUE)) %>% collect() }) #> user system elapsed #> 0.434 0.057 0.967 system.time({ flights %>% group_by() %>% summarise(mean(dep_delay, na.rm = TRUE)) }) #> user system elapsed #> 0.005 0.000 0.005
これは、各ノードにデータを送信し、最後に結果を取得することによるオーバーヘッドが原因です。
基本的な dplyr「verb」の場合、10万〜100万ほどのデータサイズでない限り、multidplyr が大幅な高速化をもたらすことはまずありません。しかし、もしあなたが do()
を使ってもっと複雑なことをしているのであれば、高速化するかもしれません。どのように機能するか見てみましょう。
まず、50 件以上のフライトがある目的地をレコードに含むフライトデータに絞り、その日付が一年の始まりから何日目なのかを計算します。
common_dest <- flights %>% count(dest) %>% filter(n >= 365) %>% semi_join(flights, .) %>% mutate(yday = lubridate::yday(ISOdate(year, month, day))) #> Joining, by = "dest" dim(common_dest) #> [1] 332942 20
およそ 332,000 件のレコードが残りました。
今度は、multidplyrにローカルクラスタを作成させるのではなく、自分で作成してみます。create_cluster()
関数は、 parallel :: makepsockcluster()
周りの便利なラッパーを提供します。この2つの関数の主な違いとしては、 create_cluster()
は、どのオブジェクトも参照しないときに、自動的にクローズするようにクラスターを設定することです。
cluster <- create_cluster(2) #> Initialising 2 core cluster. cluster #> socket cluster with 2 nodes on host 'localhost'
set_default_cluster()
を使って partition()
がデフォルトでこのクラスタを使うように設定することもできます:
set_default_cluster(cluster)
抽出したフライトデータを、このクラスターに分割してみましょう:
by_dest <- common_dest %>% partition(dest, cluster = cluster) by_dest #> Source: party_df [332,942 x 20] #> Groups: dest #> Shards: 2 [154,965--177,977 rows] #> #> # S3: party_df #> year month day dep_time sched_dep_time dep_delay arr_time #> <int> <int> <int> <int> <int> <dbl> <int> #> 1 2013 1 1 554 600 -6 812 #> 2 2013 1 1 600 600 0 837 #> 3 2013 1 1 606 610 -4 837 #> 4 2013 1 1 615 615 0 833 #> 5 2013 1 1 658 700 -2 944 #> 6 2013 1 1 754 759 -5 1039 #> 7 2013 1 1 807 810 -3 1043 #> 8 2013 1 1 814 810 4 1047 #> 9 2013 1 1 830 835 -5 1052 #> 10 2013 1 1 855 859 -4 1143 #> # ... with 332,932 more rows, and 13 more variables: sched_arr_time <int>, #> # arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>, #> # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, #> # minute <dbl>, time_hour <dttm>, yday <dbl>
パーティションが均一かどうか、常にチェックすると良いでしょう。 最大の恩恵を受けることができるのは、行数が各ノードで概ね均一であるときなのです。
それでは、各目的地に対して平滑化一般化加法モデルを適用し、一年の間に、もしくは一日の間にどのくらい遅延時間が変わるか推定してみましょう。
mgcv パッケージを各ノードで読み込むために、 cluster_library()
を用いる必要があることに注意してください。
この実行には 3.7 秒かかります。
cluster_library(by_dest, "mgcv") system.time({ models <- by_dest %>% do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .)) }) #> user system elapsed #> 0.002 0.000 3.448
ローカルで実行して 5.6 秒かかった結果と比べてみてください。
system.time({ models <- common_dest %>% group_by(dest) %>% do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .)) }) #> user system elapsed #> 4.472 0.629 5.122
これはそれほど高速化したわけではありませんが、一般的には、数秒しかかからないことを並列化するかどうか気にしたりはしません。 ノードにメッセージを送信するコストは概ね固定されているため、並列化するタスクが長くなればなるほど、得られる速度も線形に近づいていきます。
ノードの数を増やすことでも高速化できますが、残念ながら Vignette では最大で 2 つのノードしか使えないため、ここでお見せすることはできません。
制限
最適な高速化のためには、各ノードでほぼ同じ量の作業を行う必要があります。ということは、データを多くの部分に分割するような変数でグループ化することで、各ノードがほぼ同じ量のデータを持つことができるようになります。 もし、あなたのデータに対して multidplyr のデフォルトの戦略が適さない場合は、「1 〜 ノードの数の値」をとるグルーピング変数を自分で作る必要があるかもしれません。
現状は、1 つのインスタンスですべてのデータをメモリに読み込んでから、それぞれのノードに分割します。もしそれを避けたい場合は、データを自分で分割し、「手動で」読み込む必要があります。
```r cluster <- create_cluster(4) cluster_assign_each(cluster, "filename", list("a.csv", "b.csv", "c.csv", "d.csv") ) cluster_assign_expr(cluster, "my_data", quote(readr::read_csv(filename)))
my_data <- src_cluster(cluster) %>% tbl("my_data") ```
現状は、 parallel パッケージによって作られたクラスターのみを使うことができます。 これらのクラスタを複数のマシンにセットアップすることは可能ですが、少しトリッキーです。 distributed R 用の標準 API がもうすぐ提供されればいいのですが、もしそうなれば、 multidplyr はもっと多くの種類のクラスターで動作できるようになります。
訳注
-
ここではdplyrの関数群を指しています。@yutannihilation さんが翻訳してくださった dplyrのVinetteの翻訳記事 によると、『Hadleyは、dplyrの各概念を「language of data manipulation」の文法になぞらえて説明します。あまり訳しすぎるとよく分からなくなりそうなので、ここは「動詞」と訳すのではなく、「verb」のまま残してあります。』とのことですので、本記事でもそれに従います。↩