rabbitmq JaegerUI不显示使用OpenTelemetry的嵌套请求传播

gg58donl  于 2023-04-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(157)

我一直使用OpenTracing来获取API的分布式跟踪。但最近,我需要将跟踪传播到一个.Net项目,该项目作为RabbitMQ队列的消费者,因此我使用this article作为基础。
为了获得这个跟踪,包括从RabbitMQ队列中获取的消息,我需要使用OpenTelemetry
为了能够跟踪端到端请求的周期(RabbitMQ发布者API -〉RabbitMQ消费者API),我使用以下流程来配置此通信:

配置OpenTelemetry的类代码:

using framework.companyCSharp.Services.Queue.Folder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using System;

namespace company.project.CSharp.Extensions
{
    public static class DistributedTracing
    {
        public static void ConfigureOpenTracing(this IServiceCollection services, IConfiguration configuration)
        {
            services.AddOpenTelemetryTracing(traceProvider =>
            {
                traceProvider
                    .AddSource(OpenTelemetryExtensions.ServiceName)
                    .SetResourceBuilder(
                        ResourceBuilder.CreateDefault()
                            .AddService(serviceName: configuration["Jaeger:JAEGER_SERVICE_NAME"],
                                serviceVersion: OpenTelemetryExtensions.ServiceVersion))
                    .AddAspNetCoreInstrumentation()

                    .AddJaegerExporter(exporter =>
                    {
                        exporter.AgentHost = configuration["Jaeger:JAEGER_AGENT_HOST"];
                        exporter.AgentPort = Convert.ToInt32(configuration["Jaeger:JAEGER_AGENT_PORT"]);
                    });

            });
            
        }
    }
}

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;

namespace framework.companyCSharp.Services.Queue.Folder
{
    public static class OpenTelemetryExtensions
    {
        public static string Local { get; }
        public static string Kernel { get; }
        public static string Framework { get; }
        public static string ServiceName { get; }
        public static string ServiceVersion { get; }

        static OpenTelemetryExtensions()
        {
            Local = Environment.MachineName;
            Kernel = Environment.OSVersion.VersionString;
            Framework = RuntimeInformation.FrameworkDescription;
            ServiceName = typeof(OpenTelemetryExtensions).Assembly.GetName().Name + "RabbitMQ";
            ServiceVersion = typeof(OpenTelemetryExtensions).Assembly.GetName().Version.ToString();
        }

        public static ActivitySource CreateActivitySource() =>
            new ActivitySource(ServiceName, ServiceVersion);
    }
}

向RabbitMQ队列发送消息的类代码:

using framework.companyCSharp.Interfaces;
using framework.companyCSharp.Services.Queue.Folder;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry;
using RabbitMQ.Client;
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using System.Collections.Generic;
using framework.companyCSharp.Extensions;

namespace framework.companyCSharp.Services.Queue
{
    public class PublisherQueueService : QueueService, IPublisherQueueService
    {
        public PublisherQueueService(IOptions<QueueConfig> queueConfig) : base(queueConfig)
        {}

        public void SendMessage(object Mensagem)
        {
            try
            { 
                if (Mensagem == null)
                {
                    throw new Exception("PublisherQueueService: Mensagem nula");
                }                

                var activityName = $"{this.NomeDaFila} send";

                var activity = OpenTelemetryExtensions.CreateActivitySource()
                .StartActivity(activityName, ActivityKind.Producer);

                IBasicProperties properties = Canal.CreateBasicProperties();
                properties.Persistent = true;

                ActivityContext contextToInject = default;
                if (activity != null)
                {
                    contextToInject = activity.Context;
                }
                else if (Activity.Current != null)
                {
                    contextToInject = Activity.Current.Context;
                }

                Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), properties,
                    InjectTraceContextIntoBasicProperties);

                string mensagem = JsonConvert.SerializeObject(Mensagem);

                byte[] body = Encoding.Default.GetBytes(mensagem);                

                Canal.BasicPublish(exchange: String.Empty, routingKey: QueueConfig.QueueName, basicProperties: properties, body: body);

            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        private void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, string value)
        {
            try
            {
                if (props.Headers == null)
                {
                    props.Headers = new Dictionary<string, object>();
                }

                props.Headers[key] = value;
            }
            catch (Exception ex)
            {
                throw new Exception("Failed to inject trace context." + ex.GetAllInnerExceptionMessage());
            }
        }
    }
}

using framework.companyCSharp.Interfaces;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;

namespace framework.companyCSharp.Services.Queue
{
    public class QueueService : IQueueService
    {
        protected readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

        private string Servidor { get; set; }

        private string Usuario { get; set; }

        private string Senha { get; set; }

        protected string NomeDaFila { get; set; }

        protected IModel Canal { get; private set; }

        protected IConnection Conexao { get; private set; }

        public QueueConfig QueueConfig { get; private set; }

        public QueueService(IOptions<QueueConfig> queueConfig)
        {
            if (queueConfig == null || queueConfig.Value == null)
                throw new System.Exception("QueueService: Falha ao injetar queueConfig valor nulo");

            this.QueueConfig = queueConfig.Value;

            if (IsValidConfigs())
            {
                InicializaServidorDeFilas();
            }
        }

        private bool IsValidConfigs()
        {
            bool resposta = true;

            if (string.IsNullOrEmpty(QueueConfig.Server))
                throw new System.Exception("O Nome do servidor de fila deve ser informado em appsettings");

            if (string.IsNullOrEmpty(QueueConfig.UserName))
                throw new System.Exception("UserName do servidor de fila deve ser informado em appsettings");

            if (string.IsNullOrEmpty(QueueConfig.Password))
                throw new System.Exception("Password do servidor de fila deve ser informado em appsettings");

            if (string.IsNullOrEmpty(QueueConfig.QueueName))
                throw new System.Exception("QueueName do servidor de fila deve ser informado em appsettings");

            return resposta;
        }

        public void InicializaServidorDeFilas()
        {
            if (Canal != null && Canal.IsOpen) return;

            this.Servidor = this.QueueConfig.Server;
            this.Usuario = this.QueueConfig.UserName;
            this.Senha = this.QueueConfig.Password;
            this.NomeDaFila = this.QueueConfig.QueueName;

            var factory = new ConnectionFactory()
            {
                HostName = this.Servidor,
                UserName = this.Usuario,
                Password = this.Senha
            };

            IConnection connection = factory.CreateConnection();

            Canal = connection.CreateModel();

            this.NomeDaFila = this.QueueConfig.QueueName;

            Canal.QueueDeclare(queue: this.NomeDaFila, durable: true, exclusive: false, autoDelete: false, arguments: null);
        }
    }
}

获取和处理RabbitMQ队列消息的类代码:

using api.companyEFD.Domain.NotasFiscais.InterfaceServicos;
using api.companyEFD.Shared.ViewModels;
using framework.companyCSharp.Services.Queue;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using framework.companyCSharp.Extensions;
using framework.companyCSharp.Services;
using api.companyEFD.Shared.Resources;
using OpenTelemetry;
using System.Diagnostics;
using framework.companyCSharp.Services.Queue.Folder;

namespace api.companyEFD.Services
{
    public class ConsumerQueueService : QueueService, IHostedService, IDisposable
    {
        private ISalvarNotaFiscalNoBanco _salvarNotaFiscalNoBanco;
        private readonly IServiceProvider _serviceProvider;
        static readonly SemaphoreSlim semaphoreSlim = new(1, 1);
        private readonly IConfiguration _config;

        public ConsumerQueueService(
            IOptions<QueueConfig> queueConfig,
            IServiceProvider serviceProvider,
            IConfiguration config
            ) : base(queueConfig)
        {
            _serviceProvider = serviceProvider;
            _config = config;
        }

        public void Dispose()
        {
            Canal.Close();
            Conexao?.Close();
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                cancellationToken.ThrowIfCancellationRequested();

                var consumer = new EventingBasicConsumer(Canal);

                consumer.Received += new EventHandler<BasicDeliverEventArgs>(
                async delegate (Object canal, BasicDeliverEventArgs evento)
                {
                    await OnRecebeMensagem(canal, evento);
                });

                consumer.Shutdown += OnConsumerShutdown;
                consumer.Registered += OnConsumerRegistered;
                consumer.Unregistered += OnConsumerUnregistered;
                consumer.ConsumerCancelled += OnConsumerConsumerCancelled;

                Canal.BasicConsume(NomeDaFila, false, consumer);

                return Task.CompletedTask;
            }
            catch (Exception ex)
            {
                TratamentoDeErro(ex);
                return Task.CompletedTask;
            }
        }

        private void TratamentoDeErro(Exception ex)
        {
            var TraceId = Guid.NewGuid().ToString();
            var OpenTracingId = GetOpenTracingId();

            var erro = new
            {
                TraceId,
                OpenTracingId,
                Message = ex.GetAllInnerExceptionMessage(),
                ex?.StackTrace
            };

            LogService.Log(erro.ToString());
        }

        private async Task OnRecebeMensagem(object canal, BasicDeliverEventArgs evento)
        {
            try
            {
                var parentContext = Propagator.Extract(default, evento.BasicProperties, this.ExtractTraceContextFromBasicProperties);
                Baggage.Current = parentContext.Baggage;

                var activityName = $"{evento.RoutingKey} receive";

                using var activity = OpenTelemetryExtensions.CreateActivitySource()
                    .StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);

                await semaphoreSlim.WaitAsync();
                try
                {

                    var content = Encoding.UTF8.GetString(evento.Body.ToArray());

                    QueueVM<JObject> mensagem = JsonConvert.DeserializeObject<QueueVM<JObject>>(content);

                    string Usuario = _config.GetValue<string>("SQLServer:Usuario");
                    string Senha = _config.GetValue<string>("SQLServer:Senha");

                    if (string.IsNullOrEmpty(Usuario) || string.IsNullOrEmpty(Senha))
                    {
                        throw new Exception(EfdResources.ClienteBdLoginNaoConfigurado);
                    }
                    else
                    {
                        mensagem.ConfigBd.usuario = Usuario;
                        mensagem.ConfigBd.senha = Senha;
                    }

                    if (mensagem?.Tipo == Shared.ENums.ETipoInformacao.NFe)
                    {
                        using IServiceScope scope = _serviceProvider.CreateScope();
                        _salvarNotaFiscalNoBanco = scope.ServiceProvider.GetRequiredService<ISalvarNotaFiscalNoBanco>();

                        await _salvarNotaFiscalNoBanco.SalvarNoBanco(mensagem.GetFirstData().ToObject<NotaFiscalVM>(), mensagem.ConfigBd);
                    }

                    Canal.BasicAck(evento.DeliveryTag, false);

                }
                finally
                {
                    semaphoreSlim.Release();
                }
            }
            catch (Exception ex)
            {
                TratamentoDeErro(new Exception("Erro ao processar mensagem do RabbitMQ." + "\n" + ex.GetAllInnerExceptionMessage()));
            }
        }

        private IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key)
        {
            try
            {
                if (props.Headers == null) return Enumerable.Empty<string>();

                if (props.Headers.TryGetValue(key, out var value))
                {
                    var bytes = value as byte[];
                    return new[] { Encoding.UTF8.GetString(bytes!) };
                }
            }
            catch (Exception ex)
            {
                throw new Exception("Falha durante a extração do trace context: " + ex.GetAllInnerExceptionMessage());
            }

            return Enumerable.Empty<string>();
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }

        #region Eventos
        private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

        private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }

        private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }

        private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }

        private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
        #endregion
    }
}

从OpenTracing切换到OpenTelemetry后,在查看JaegerUI中的跟踪时,我可以看到端到端跟踪几乎令人满意地工作,除了现在不再显示层次级别,但进程显示为一个关于另一个的堆叠。这不再允许验证每个通信发生的正确顺序。
使用OpenTracing:

<PackageReference Include="Jaeger" Version="1.0.3" />
<PackageReference Include="OpenTracing.Contrib.NetCore" Version="0.8.0" />

使用OpenTelemetry:

<PackageReference Include="OpenTelemetry.Exporter.Jaeger" Version="1.3.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9.4" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.0.0-rc9.4" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.0.0-rc9.4" />

任何帮助都将受到欢迎。

t3psigkw

t3psigkw1#

尝试使用Odigos。它将自动检测您的应用程序并生成分布式跟踪(使用上下文传播),并将它们发送到Jaeger(或您希望的任何其他目的地)。安装只需5分钟。https://github.com/keyval-dev/odigos

相关问题