複数サービス間での分散トレースは、必要な情報をサービス間で伝搬するContext Propagationによって実現されています。
HTTPでサービスを呼び出している場合、HTTPヘッダーを利用して伝搬させることがほとんどで、最近になってW3C Trace Contextという規格で標準化が進められています。
それ以外の方法、たとえばService Busなどを介している場合、トレースをつなげるためにはContext Propagationをこちらで実装する必要があります。ただ、Service Busなどのツール・サービスによってはツール・サービス側が提供していることもあり、今回はその一例としてAzure Service Busを使った例を試してみました。
Azure Service Busがどのようなしくみを提供しているかについてはドキュメントがあります。
ドキュメントによると、トレースをつなげるのに必要な情報(Context)としてW3C Trace Contextのtraceparentヘッダーを利用しているとあります。具体的には、<version>-<trace id>-<parent id>-<trace flags>
というフォーマットです。例えば 00-182984d0d8c4fa391e63fcf168413636-797cf612be994b1c-00
という値があったとき、versionは00、trace idは182984d0d8c4fa391e63fcf168413636、parent idは797cf612be994b1c、trace flagsは00となります。この値を、Service BusのメッセージのDiagnostic-Id
というプロパティに格納して伝搬させます。送信側が格納する処理を、Azure Service Bus SDKを使っていれば自動で行ってくれるとのことです。
ドキュメントでは「OpenTelemetry を使用した追跡」という部分では具体的な方法が書いていないので、今回試してみました。
まず、メッセージを送るプロデューサー側はASP.NET CoreのWebアプリで、メッセージを受け取るコンシューマー側は.NET Coreのコンソールアプリケーションです。コンシューマー側のWebアプリはOpenTelemetryでの計装がセットアップされていれば、それ以上の手間が必要ありません。送信処理は次のようになりますが、これはドキュメントにあった処理をそのまま使っています。
var clientOptions = new ServiceBusClientOptions() { TransportType = ServiceBusTransportType.AmqpWebSockets }; client = new ServiceBusClient(connectionString, clientOptions); sender = client.CreateSender(queueName); // create a batch using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(); //デバッグ用に出力(計装そのものには不要) var activity = Activity.Current; Console.WriteLine($"activity id: {activity?.Id}"); Console.WriteLine($"activity Trace id: {activity?.TraceId}"); Console.WriteLine($"activity Span id: {activity?.SpanId}"); for (int i = 1; i <= numOfMessages; i++) { // try adding a message to the batch if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}: {MessageBody}"))) { // if it is too large for the batch throw new Exception($"The message {i} is too large to fit in the batch."); } } try { await sender.SendMessagesAsync(messageBatch); Console.WriteLine($"A batch of {numOfMessages} messages has been published to the queue."); } finally { await sender.DisposeAsync(); await client.DisposeAsync(); }
受信側はいままで紹介していないコンソールアプリでのOpenTelemetryの計装になるのですべてのコードを載せておきます。tracerProvider を作る部分はASP.NET Coreの場合とほぼ同じですが、ActivitySourceを生成し、インスタンスを保持したうえで、トレースの計測を行う部分でActivitySourceからActivityを生成・起動する必要があります。今回のように親SpanのIDを渡したい場合は引数で渡すことができます。
using Azure.Messaging.ServiceBus; using OpenTelemetry.Resources; using OpenTelemetry; using System.Diagnostics; using OpenTelemetry.Trace; using OpenTelemetry.Exporter; using System.Reflection; //Service Busへの接続文字列 var connectionString = "<replace_your_conncetion_string>"; //Service Bus queueの名前 var queueName = "<replace_your_queue_name>"; ServiceBusClient client; ServiceBusProcessor processor; var serviceName = "OpenTelemetryLabs.AzureIntegrations.Console"; var assemblyVersion = Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "unknown"; Action<ResourceBuilder> configureResource = r => r.AddService( serviceName, serviceVersion: assemblyVersion, serviceInstanceId: Environment.MachineName); using var tracerProvider = Sdk.CreateTracerProviderBuilder() .AddSource(serviceName) .ConfigureResource(configureResource) .AddOtlpExporter(opt => { //OTLPのエクスポート先の指定 }) .Build(); var MyActivitySource = new ActivitySource(serviceName); var clientOptions = new ServiceBusClientOptions() { TransportType = ServiceBusTransportType.AmqpWebSockets }; client = new ServiceBusClient(connectionString, clientOptions); processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions()); try { processor.ProcessMessageAsync += ProcessMessageAsync; processor.ProcessErrorAsync += (args) => { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; }; while (true) { Console.WriteLine("Starting the receiver..."); await processor.StartProcessingAsync(); await Task.Delay(TimeSpan.FromSeconds(1)); Console.WriteLine("Stopping the receiver..."); await processor.StopProcessingAsync(); Console.WriteLine("Stopped receiving messages. Waiting 60 seconds."); await Task.Delay(TimeSpan.FromSeconds(60)); } } finally { await processor.DisposeAsync(); await client.DisposeAsync(); } async Task ProcessMessageAsync(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); //メッセージの本文 Console.WriteLine($"Received: {body}"); if (args.Message.ApplicationProperties.TryGetValue("Diagnostic-Id", out var objectId) && objectId is string diagnosticId) { //デバッグ用にDiagnosticId(traceparentヘッダー)の値を出力 Console.WriteLine($"DiagnosticId: {diagnosticId}"); //traceparentヘッダーを送信したSpanを親のスパンとして子スパンを開始 using var activity = MyActivitySource.StartActivity("ProcessMessageAsync", ActivityKind.Consumer, parentId: diagnosticId); //デバッグ用に子スパンの情報を出力 Console.WriteLine($"activity id: {activity?.Id}"); Console.WriteLine($"activity Trace id: {activity?.TraceId}"); Console.WriteLine($"activity Span id: {activity?.SpanId}"); Console.WriteLine($"activity Parent id: {activity?.ParentId}"); //タグを追加 activity?.SetTag("message", body); await Task.Delay(1000); await args.CompleteMessageAsync(args.Message); } }
さて、実際に実行するとプロデューサー側ではこのようにActivityのTrace IDとSpan IDが出力されます。
activity id: 00-59ed36599d26220d1362c2ac04e66cb3-394930dbfcb529b3-01 activity Trace id: 59ed36599d26220d1362c2ac04e66cb3 activity Span id: 394930dbfcb529b3
すると、コンシューマー側ではこのように出力されます。DiagnosticIdで上と同じ値が渡ってきており、これを親としてSpanを作成するとTrace Idは同じで、Span Idが別のものになっています。また、Parent Idが設定されていることも確認できます。
DiagnosticId: 00-59ed36599d26220d1362c2ac04e66cb3-394930dbfcb529b3-01 activity id: 00-59ed36599d26220d1362c2ac04e66cb3-0066bfbb9bc123a8-01 activity Trace id: 59ed36599d26220d1362c2ac04e66cb3 activity Span id: 0066bfbb9bc123a8 activity Parent id: 00-59ed36599d26220d1362c2ac04e66cb3-394930dbfcb529b3-01
これで目的は達成できましたが、最後にオブザーバビリティバックエンドでどのように見えるかの一例をあげます。上で出力されたTrace Idをもつ1つのトレースとして表示されます。また、プロデューサー側は1回の処理=1つのトレースで3つのメッセージを送信するため、3つのコンシューマー側のスパンが確認できます。
プロデューサー側のSpanのIdも確認でき、
コンシューマー側のParent Idに設定されていることがわかります。
なお、コンシューマー側のSpanが完了するまでがトレースの継続時間となるため、場合によっては非常に長くなる場合があります。サービスによっては1つのトレースでつなげるのではなく、複数のトレースに共通の属性の値としてもたせておき、必要に応じて関連付けを検索できるようにしたほうがいい場合もあります。また、複数の異なるプロデューサーのメッセージを、1つのコンシューマーで処理する場合、複数の親をもった子スパンを現在のOpenTelemetryの分散トレースでは表現できないため、複数のトレースとして表現する必要があります。