銀の光と碧い空

クラウドなインフラとC#なアプリ開発の狭間にいるエンジニアの日々

.NET アプリから BigQuery に Streaming Insert する方法

このブログは Google Cloud Platform Advent Calendar 2014 の7日目のエントリです。

Google Cloud Platform Advent Calendar 2014 - Qiita

さて、以前テーブルをAPIから生成する方法は紹介したのですが、データをStreaming Insertする方法について書いていなかったので、そこを説明したいと思います。また、すでに本番運用して、WebサーバーからデータをBigQueryに送信しているのですが、どのような点に注意しているかも紹介したいと思います。

C# から Streaming Insertする方法

まずはサンプルコードをのせます。

StreamingInsert.cs

CreateTable メソッドは前回の記事を参考にしてください。LoadRowsメソッドは実際に送信するデータですが、でたらめな値を入れるのであればこんな感じになります。特徴としては、テーブル定義に対応した列名とその値を Dictionary<string,object> で指定することです。

private IList<TableDataInsertAllRequest.RowsData> LoadRows()
{
    return Enumerable.Range(0, 450)
        .Select(_ => new TableDataInsertAllRequest.RowsData
        {
            InsertId = Guid.NewGuid().ToString(), //リクエストの一意なIdなのでGuidで代用
            Json = new Dictionary<string, object>
            {
                { "date", DateTime.Now },
                { "source", "hoge" }
            }
        }).ToList();
}

長くなっていますが、基本的には

var req = new TableDataInsertAllRequest
{
    Rows = LoadRows()
};

var response = await bigquery.Tabledata.InsertAll(req,
                    "testprj", "testdataset", "testtbl").ExecuteAsync();

でInsertできます。面倒なことをしているのは、

  • Insert先のテーブルがないときにテーブルを生成する処理
  • 失敗時にリトライする処理

を入れているからです。

テーブルの自動生成処理を入れるかどうかはInsertの要件次第でしょう。たとえば期間ごとにテーブルを分けて格納する場合*1に、新しいテーブルを作るたタイミングで別のツールから生成するのではなく、このコードの中で生成することを想定しています*2

また、後者の失敗時のリトライですが、これは割と必須の処理です。BigQuery のStreaming InsertはバックエンドのエラーでInsertが失敗することがあり、その場合にはリトライすることが推奨されています。リトライも単純に一定時間おきに繰り返すのではなく、指数関数的な伸び+ランダムに追加した時間だけ待機する Google.Apis.Util.ExponentialBackOff を利用しています。なお、所定の最高試行回数(ExponentialBackOff の初期値だと10)を超えると、Insertをあきらめデータは破棄することになります。

本番運用で考慮していること

というわけで上のような Streaming Insert なコードを本番環境で動かして BigQuery にデータを送信しています*3

さて、そもそもどうやってこのコードを動かしているかですが、 上のコードを SLABのSink として実装し、 Out-of-Process Service として動かしています。送信するイベントを送信するのはIIS上のASP.NETアプリになります。

テーブルの自動生成と失敗時のリトライは上のコードのように考慮していますが、ほかに次のようなことを考慮して動かしています。

  • Insert先のDataSet名、Table名、テーブル定義などは EventSource のメタ情報として定義しています。 DataSet名 は EventKeyword、Table名は EventTask、テーブル定義は Paylowd 経由で取得、という感じです。
  • 1回のリクエストあたり500行というハードリミットがあるので、450行ずつバッファリングして送信
  • SLABのSinkでは BufferedEventPublisher<EventEntry> でイベントをバッファリングしてまとめて送信できるが、バッファが溜まった時に発火するeventPublisher*4の中でリトライを繰り返すと、後続のログもたまり続けるため、1回失敗した場合は、別の BufferedEventPublisher にためていくようにしている
  • Insertしている Out-of-Process Service の健全性は、Windows EventLogおよびパフォーマンスカウンタを通して確認している。*5

といった点です。

というわけで、意外な組み合わせかもしれませんが、.NET アプリからもBigQueryにデータを送信して活用しています、というお話でした。

*1:test_20141201, test_20141202 ... みたいなテーブル名を想定

*2:その場合、テーブル名は日時に応じて動的に判断することになり、テーブル構造を指定する必要もある

*3:動いているコードそのものをお見せしていないのは、テーブル名やテーブル定義の生成部分に弊社固有のロジックが入ってコードの見通しが悪いためです...

*4:CreateAndStartメソッドの第2引数で指定

*5:実際はそれらを AWS CloudWatch で集約してアラートをしかけている