我一直使用OpenTracing来获取API的分布式跟踪。但最近,我需要将跟踪传播到一个.Net项目,该项目作为RabbitMQ队列的消费者,因此我使用this article作为基础。
为了获得这个跟踪,包括从RabbitMQ队列中获取的消息,我需要使用OpenTelemetry。
为了能够跟踪端到端请求的周期(RabbitMQ发布者API -〉RabbitMQ消费者API),我使用以下流程来配置此通信:
配置OpenTelemetry的类代码:
using framework.companyCSharp.Services.Queue.Folder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using System;
namespace company.project.CSharp.Extensions
{
public static class DistributedTracing
{
public static void ConfigureOpenTracing(this IServiceCollection services, IConfiguration configuration)
{
services.AddOpenTelemetryTracing(traceProvider =>
{
traceProvider
.AddSource(OpenTelemetryExtensions.ServiceName)
.SetResourceBuilder(
ResourceBuilder.CreateDefault()
.AddService(serviceName: configuration["Jaeger:JAEGER_SERVICE_NAME"],
serviceVersion: OpenTelemetryExtensions.ServiceVersion))
.AddAspNetCoreInstrumentation()
.AddJaegerExporter(exporter =>
{
exporter.AgentHost = configuration["Jaeger:JAEGER_AGENT_HOST"];
exporter.AgentPort = Convert.ToInt32(configuration["Jaeger:JAEGER_AGENT_PORT"]);
});
});
}
}
}
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace framework.companyCSharp.Services.Queue.Folder
{
public static class OpenTelemetryExtensions
{
public static string Local { get; }
public static string Kernel { get; }
public static string Framework { get; }
public static string ServiceName { get; }
public static string ServiceVersion { get; }
static OpenTelemetryExtensions()
{
Local = Environment.MachineName;
Kernel = Environment.OSVersion.VersionString;
Framework = RuntimeInformation.FrameworkDescription;
ServiceName = typeof(OpenTelemetryExtensions).Assembly.GetName().Name + "RabbitMQ";
ServiceVersion = typeof(OpenTelemetryExtensions).Assembly.GetName().Version.ToString();
}
public static ActivitySource CreateActivitySource() =>
new ActivitySource(ServiceName, ServiceVersion);
}
}
向RabbitMQ队列发送消息的类代码:
using framework.companyCSharp.Interfaces;
using framework.companyCSharp.Services.Queue.Folder;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry;
using RabbitMQ.Client;
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using System.Collections.Generic;
using framework.companyCSharp.Extensions;
namespace framework.companyCSharp.Services.Queue
{
public class PublisherQueueService : QueueService, IPublisherQueueService
{
public PublisherQueueService(IOptions<QueueConfig> queueConfig) : base(queueConfig)
{}
public void SendMessage(object Mensagem)
{
try
{
if (Mensagem == null)
{
throw new Exception("PublisherQueueService: Mensagem nula");
}
var activityName = $"{this.NomeDaFila} send";
var activity = OpenTelemetryExtensions.CreateActivitySource()
.StartActivity(activityName, ActivityKind.Producer);
IBasicProperties properties = Canal.CreateBasicProperties();
properties.Persistent = true;
ActivityContext contextToInject = default;
if (activity != null)
{
contextToInject = activity.Context;
}
else if (Activity.Current != null)
{
contextToInject = Activity.Current.Context;
}
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), properties,
InjectTraceContextIntoBasicProperties);
string mensagem = JsonConvert.SerializeObject(Mensagem);
byte[] body = Encoding.Default.GetBytes(mensagem);
Canal.BasicPublish(exchange: String.Empty, routingKey: QueueConfig.QueueName, basicProperties: properties, body: body);
}
catch (Exception ex)
{
throw ex;
}
}
private void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, string value)
{
try
{
if (props.Headers == null)
{
props.Headers = new Dictionary<string, object>();
}
props.Headers[key] = value;
}
catch (Exception ex)
{
throw new Exception("Failed to inject trace context." + ex.GetAllInnerExceptionMessage());
}
}
}
}
using framework.companyCSharp.Interfaces;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;
namespace framework.companyCSharp.Services.Queue
{
public class QueueService : IQueueService
{
protected readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private string Servidor { get; set; }
private string Usuario { get; set; }
private string Senha { get; set; }
protected string NomeDaFila { get; set; }
protected IModel Canal { get; private set; }
protected IConnection Conexao { get; private set; }
public QueueConfig QueueConfig { get; private set; }
public QueueService(IOptions<QueueConfig> queueConfig)
{
if (queueConfig == null || queueConfig.Value == null)
throw new System.Exception("QueueService: Falha ao injetar queueConfig valor nulo");
this.QueueConfig = queueConfig.Value;
if (IsValidConfigs())
{
InicializaServidorDeFilas();
}
}
private bool IsValidConfigs()
{
bool resposta = true;
if (string.IsNullOrEmpty(QueueConfig.Server))
throw new System.Exception("O Nome do servidor de fila deve ser informado em appsettings");
if (string.IsNullOrEmpty(QueueConfig.UserName))
throw new System.Exception("UserName do servidor de fila deve ser informado em appsettings");
if (string.IsNullOrEmpty(QueueConfig.Password))
throw new System.Exception("Password do servidor de fila deve ser informado em appsettings");
if (string.IsNullOrEmpty(QueueConfig.QueueName))
throw new System.Exception("QueueName do servidor de fila deve ser informado em appsettings");
return resposta;
}
public void InicializaServidorDeFilas()
{
if (Canal != null && Canal.IsOpen) return;
this.Servidor = this.QueueConfig.Server;
this.Usuario = this.QueueConfig.UserName;
this.Senha = this.QueueConfig.Password;
this.NomeDaFila = this.QueueConfig.QueueName;
var factory = new ConnectionFactory()
{
HostName = this.Servidor,
UserName = this.Usuario,
Password = this.Senha
};
IConnection connection = factory.CreateConnection();
Canal = connection.CreateModel();
this.NomeDaFila = this.QueueConfig.QueueName;
Canal.QueueDeclare(queue: this.NomeDaFila, durable: true, exclusive: false, autoDelete: false, arguments: null);
}
}
}
获取和处理RabbitMQ队列消息的类代码:
using api.companyEFD.Domain.NotasFiscais.InterfaceServicos;
using api.companyEFD.Shared.ViewModels;
using framework.companyCSharp.Services.Queue;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using framework.companyCSharp.Extensions;
using framework.companyCSharp.Services;
using api.companyEFD.Shared.Resources;
using OpenTelemetry;
using System.Diagnostics;
using framework.companyCSharp.Services.Queue.Folder;
namespace api.companyEFD.Services
{
public class ConsumerQueueService : QueueService, IHostedService, IDisposable
{
private ISalvarNotaFiscalNoBanco _salvarNotaFiscalNoBanco;
private readonly IServiceProvider _serviceProvider;
static readonly SemaphoreSlim semaphoreSlim = new(1, 1);
private readonly IConfiguration _config;
public ConsumerQueueService(
IOptions<QueueConfig> queueConfig,
IServiceProvider serviceProvider,
IConfiguration config
) : base(queueConfig)
{
_serviceProvider = serviceProvider;
_config = config;
}
public void Dispose()
{
Canal.Close();
Conexao?.Close();
}
public Task StartAsync(CancellationToken cancellationToken)
{
try
{
cancellationToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(Canal);
consumer.Received += new EventHandler<BasicDeliverEventArgs>(
async delegate (Object canal, BasicDeliverEventArgs evento)
{
await OnRecebeMensagem(canal, evento);
});
consumer.Shutdown += OnConsumerShutdown;
consumer.Registered += OnConsumerRegistered;
consumer.Unregistered += OnConsumerUnregistered;
consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
Canal.BasicConsume(NomeDaFila, false, consumer);
return Task.CompletedTask;
}
catch (Exception ex)
{
TratamentoDeErro(ex);
return Task.CompletedTask;
}
}
private void TratamentoDeErro(Exception ex)
{
var TraceId = Guid.NewGuid().ToString();
var OpenTracingId = GetOpenTracingId();
var erro = new
{
TraceId,
OpenTracingId,
Message = ex.GetAllInnerExceptionMessage(),
ex?.StackTrace
};
LogService.Log(erro.ToString());
}
private async Task OnRecebeMensagem(object canal, BasicDeliverEventArgs evento)
{
try
{
var parentContext = Propagator.Extract(default, evento.BasicProperties, this.ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage;
var activityName = $"{evento.RoutingKey} receive";
using var activity = OpenTelemetryExtensions.CreateActivitySource()
.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
await semaphoreSlim.WaitAsync();
try
{
var content = Encoding.UTF8.GetString(evento.Body.ToArray());
QueueVM<JObject> mensagem = JsonConvert.DeserializeObject<QueueVM<JObject>>(content);
string Usuario = _config.GetValue<string>("SQLServer:Usuario");
string Senha = _config.GetValue<string>("SQLServer:Senha");
if (string.IsNullOrEmpty(Usuario) || string.IsNullOrEmpty(Senha))
{
throw new Exception(EfdResources.ClienteBdLoginNaoConfigurado);
}
else
{
mensagem.ConfigBd.usuario = Usuario;
mensagem.ConfigBd.senha = Senha;
}
if (mensagem?.Tipo == Shared.ENums.ETipoInformacao.NFe)
{
using IServiceScope scope = _serviceProvider.CreateScope();
_salvarNotaFiscalNoBanco = scope.ServiceProvider.GetRequiredService<ISalvarNotaFiscalNoBanco>();
await _salvarNotaFiscalNoBanco.SalvarNoBanco(mensagem.GetFirstData().ToObject<NotaFiscalVM>(), mensagem.ConfigBd);
}
Canal.BasicAck(evento.DeliveryTag, false);
}
finally
{
semaphoreSlim.Release();
}
}
catch (Exception ex)
{
TratamentoDeErro(new Exception("Erro ao processar mensagem do RabbitMQ." + "\n" + ex.GetAllInnerExceptionMessage()));
}
}
private IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key)
{
try
{
if (props.Headers == null) return Enumerable.Empty<string>();
if (props.Headers.TryGetValue(key, out var value))
{
var bytes = value as byte[];
return new[] { Encoding.UTF8.GetString(bytes!) };
}
}
catch (Exception ex)
{
throw new Exception("Falha durante a extração do trace context: " + ex.GetAllInnerExceptionMessage());
}
return Enumerable.Empty<string>();
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
#region Eventos
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
#endregion
}
}
从OpenTracing切换到OpenTelemetry后,在查看JaegerUI中的跟踪时,我可以看到端到端跟踪几乎令人满意地工作,除了现在不再显示层次级别,但进程显示为一个关于另一个的堆叠。这不再允许验证每个通信发生的正确顺序。
使用OpenTracing:
<PackageReference Include="Jaeger" Version="1.0.3" />
<PackageReference Include="OpenTracing.Contrib.NetCore" Version="0.8.0" />
使用OpenTelemetry:
<PackageReference Include="OpenTelemetry.Exporter.Jaeger" Version="1.3.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9.4" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.0.0-rc9.4" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.0.0-rc9.4" />
任何帮助都将受到欢迎。
1条答案
按热度按时间t3psigkw1#
尝试使用Odigos。它将自动检测您的应用程序并生成分布式跟踪(使用上下文传播),并将它们发送到Jaeger(或您希望的任何其他目的地)。安装只需5分钟。https://github.com/keyval-dev/odigos