publishasync不工作
示例program.cs:
namespace MassTransitKafka
{
class Program
{
private static ServiceProvider _serviceProvider;
static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.AddProducer<Enter1>(nameof(Enter1));
rider.AddProducer<Enter2>(nameof(Enter2));
rider.AddProducer<Enter3>(nameof(Enter3));
rider.AddProducer<EnterEnter>(nameof(EnterEnter));
rider.AddSagaStateMachine<TestSaga1StateMachine, TestSaga1State>(typeof(TestSaga1StateDefinition))
.InMemoryRepository();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<Null, Enter1>(nameof(Enter1), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, Enter2>(nameof(Enter2), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, Enter3>(nameof(Enter3), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, EnterEnter>(nameof(EnterEnter), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
});
});
});
_serviceProvider = services.BuildServiceProvider();
var busControl = _serviceProvider.GetRequiredService<IBusControl>();
var observer = new ReceiveObserver();
busControl.ConnectReceiveObserver(observer);
await busControl.StartAsync();
var tokenSource = new CancellationTokenSource();
ThreadPool.QueueUserWorkItem(s =>
{
Work(busControl, tokenSource.Token).GetAwaiter().GetResult();
});
while (true)
{
var quit = Console.ReadLine();
if (quit == "quit")
{
tokenSource.Cancel();
break;
}
}
}
private static async Task Work(IPublishEndpoint publisher, CancellationToken token)
{
var correlationId = Guid.NewGuid();
var enter1Producer = _serviceProvider.GetRequiredService<ITopicProducer<Enter1>>();
await enter1Producer.Produce(new {CorrelationId = correlationId, EnteredText = "1"}, token);
while (token.IsCancellationRequested == false)
{
var cancelled = token.WaitHandle.WaitOne(5000);
if (cancelled)
break;
}
}
private static Dictionary<string, string> Configuration
{
get
{
return new Dictionary<string, string>
{
{ "bootstrap.servers", "localhost:9092" },
{ "group.id", "saga.group.id" }
};
}
}
}
}
示例testsaga1statemachine.cs
public class TestSaga1StateMachine : MassTransitStateMachine<TestSaga1State>
{
public TestSaga1StateMachine()
{
InstanceState(_ => _.CurrentState);
Event(() => Enter1Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => Enter2Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => Enter3Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => EnterEnterEvent, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Enter1Event)
.Then(context => context.Instance.SaveEnter1(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<Enter2>(new {EnteredText = "2"}))
.TransitionTo(Entered1)
);
During(Entered1,
When(Enter2Event)
.Then(context => context.Instance.SaveEnter2(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<Enter3>(new {EnteredText = "3"}))
.TransitionTo(Entered2)
);
During(Entered2,
When(Enter3Event)
.Then(context => context.Instance.SaveEnter3(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<EnterEnter>(new {EnteredText = "Enter"}))
.TransitionTo(Entered3)
);
During(Entered3,
When(EnterEnterEvent)
.Then(context => context.Instance.Print())
.TransitionTo(EnteredEnter)
.Finalize());
SetCompletedWhenFinalized();
}
public State Entered1 { get; set; }
public State Entered2 { get; set; }
public State Entered3 { get; set; }
public State EnteredEnter { get; set; }
public Event<Enter1> Enter1Event { get; set; }
public Event<Enter2> Enter2Event { get; set; }
public Event<Enter3> Enter3Event { get; set; }
public Event<EnterEnter> EnterEnterEvent { get; set; }
}
这个项目只是为了我的学习。我不明白如何在上面生成消息总线配置与文档中的相同。第一条enter1消息发布成功,传奇也收到了,但是如何从传奇中向Kafka发送消息还不清楚
1条答案
按热度按时间ijxebb2r1#
您需要创建一个自定义状态机活动,该活动依赖于producer接口(配置kafka时设置),以便生成到kafka主题的消息。我最近做了一个视频作为第二季的一部分。
您可以在单元测试中看到producer设置的示例
然后,在您的自定义状态机活动中,您将在
ITopicProducer<KafkaMessage>
并用它来产生信息。它可能看起来像这样:然后,在状态机中,使用: