大众运输Kafka如何在消息处理过程中从saga状态机生成消息

xpszyzbs  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(489)

publishasync不工作
示例program.cs:

namespace MassTransitKafka
{
    class Program
    {
        private static ServiceProvider _serviceProvider;

        static async Task Main(string[] args)
        {
            var services = new ServiceCollection();
            services.AddMassTransit(x =>
            {
                x.UsingInMemory((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context);
                });

                x.AddRider(rider =>
                {
                    rider.AddProducer<Enter1>(nameof(Enter1));
                    rider.AddProducer<Enter2>(nameof(Enter2));
                    rider.AddProducer<Enter3>(nameof(Enter3));
                    rider.AddProducer<EnterEnter>(nameof(EnterEnter));
                    rider.AddSagaStateMachine<TestSaga1StateMachine, TestSaga1State>(typeof(TestSaga1StateDefinition))
                        .InMemoryRepository();

                    rider.UsingKafka((context, k) =>
                    {
                        k.Host("localhost:9092");

                        k.TopicEndpoint<Null, Enter1>(nameof(Enter1), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                        k.TopicEndpoint<Null, Enter2>(nameof(Enter2), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                        k.TopicEndpoint<Null, Enter3>(nameof(Enter3), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                        k.TopicEndpoint<Null, EnterEnter>(nameof(EnterEnter), nameof(TestSaga1StateMachine), c =>
                        {
                            c.AutoOffsetReset = AutoOffsetReset.Earliest;
                            c.ConfigureSaga<TestSaga1State>(context);
                        });
                    });
                });
            });
            _serviceProvider = services.BuildServiceProvider();
            var busControl = _serviceProvider.GetRequiredService<IBusControl>();
            var observer = new ReceiveObserver();
            busControl.ConnectReceiveObserver(observer);

            await busControl.StartAsync();
            var tokenSource = new CancellationTokenSource();
            ThreadPool.QueueUserWorkItem(s =>
            {
                Work(busControl, tokenSource.Token).GetAwaiter().GetResult();
            });

            while (true)
            {
                var quit = Console.ReadLine();
                if (quit == "quit")
                {
                    tokenSource.Cancel();
                    break;
                }
            }
        }

        private static async Task Work(IPublishEndpoint publisher, CancellationToken token)
        {
            var correlationId = Guid.NewGuid();
            var enter1Producer = _serviceProvider.GetRequiredService<ITopicProducer<Enter1>>();

            await enter1Producer.Produce(new {CorrelationId = correlationId, EnteredText = "1"}, token);

            while (token.IsCancellationRequested == false)
            {
                var cancelled = token.WaitHandle.WaitOne(5000);
                if (cancelled)
                    break;
            }
        }

        private static Dictionary<string, string> Configuration
        {
            get
            {
                return new Dictionary<string, string>
                {
                    { "bootstrap.servers", "localhost:9092" },
                    { "group.id", "saga.group.id" }
                };
            }
        }
    }
}

示例testsaga1statemachine.cs

public class TestSaga1StateMachine : MassTransitStateMachine<TestSaga1State>
    {
        public TestSaga1StateMachine()
        {
            InstanceState(_ => _.CurrentState);
            Event(() => Enter1Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
            Event(() => Enter2Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
            Event(() => Enter3Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
            Event(() => EnterEnterEvent, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));

            Initially(
                When(Enter1Event)
                    .Then(context => context.Instance.SaveEnter1(context.Data))
// Messages are not sent here
                    .PublishAsync(context => context.Init<Enter2>(new {EnteredText = "2"}))
                    .TransitionTo(Entered1)
                );
            During(Entered1,
                When(Enter2Event)
                    .Then(context => context.Instance.SaveEnter2(context.Data))
// Messages are not sent here
                    .PublishAsync(context => context.Init<Enter3>(new {EnteredText = "3"}))
                    .TransitionTo(Entered2)
                );
            During(Entered2,
                When(Enter3Event)
                    .Then(context => context.Instance.SaveEnter3(context.Data))
// Messages are not sent here
                    .PublishAsync(context => context.Init<EnterEnter>(new {EnteredText = "Enter"}))
                    .TransitionTo(Entered3)
            );
            During(Entered3,
                When(EnterEnterEvent)
                    .Then(context => context.Instance.Print())
                    .TransitionTo(EnteredEnter)
                    .Finalize());

            SetCompletedWhenFinalized();
        }

        public State Entered1 { get; set; }
        public State Entered2 { get; set; }
        public State Entered3 { get; set; }
        public State EnteredEnter { get; set; }

        public Event<Enter1> Enter1Event { get; set; }
        public Event<Enter2> Enter2Event { get; set; }
        public Event<Enter3> Enter3Event { get; set; }
        public Event<EnterEnter> EnterEnterEvent { get; set; }
    }

这个项目只是为了我的学习。我不明白如何在上面生成消息总线配置与文档中的相同。第一条enter1消息发布成功,传奇也收到了,但是如何从传奇中向Kafka发送消息还不清楚

ijxebb2r

ijxebb2r1#

您需要创建一个自定义状态机活动,该活动依赖于producer接口(配置kafka时设置),以便生成到kafka主题的消息。我最近做了一个视频作为第二季的一部分。
您可以在单元测试中看到producer设置的示例

services.AddMassTransit(x =>
{
    x.AddRider(rider =>
    {
        rider.AddProducer<KafkaMessage>(Topic);

        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

然后,在您的自定义状态机活动中,您将在 ITopicProducer<KafkaMessage> 并用它来产生信息。它可能看起来像这样:

public class ProduceEnter2Activity :
    Activity<TestSaga1State>
{
    readonly ITopicProducer<Enter2> _producer;

    public ProduceEnter2Activity(ITopicProducer<Enter2> producer)
    {
        _producer = producer;
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("notifyMember");
    }

    public void Accept(StateMachineVisitor visitor)
    {
        visitor.Visit(this);
    }

    public async Task Execute(BehaviorContext<TestSaga1State> context, Behavior<TestSaga1State> next)
    {
        await Execute(context);

        await next.Execute(context);
    }

    public async Task Execute<T>(BehaviorContext<TestSaga1State, T> context, Behavior<TestSaga1State, T> next)
    {
        await Execute(context);

        await next.Execute(context);
    }

    public Task Faulted<TException>(BehaviorExceptionContext<TestSaga1State, TException> context, Behavior<TestSaga1State> next)
        where TException : Exception
    {
        return next.Faulted(context);
    }

    public Task Faulted<T, TException>(BehaviorExceptionContext<TestSaga1State, T, TException> context, Behavior<TestSaga1State, T> next)
        where TException : Exception
    {
        return next.Faulted(context);
    }

    async Task Execute(BehaviorContext<TestSaga1State> context)
    {
        await _producer.Produce(new Enter2(...));
    }
}

然后,在状态机中,使用:

.Activity(x => x.OfInstanceType<ProduceEnter2Activity>())

相关问题