Go语言 proto:无法分析无效的wire-format数据

lokaqttq  于 2023-06-19  发布在  Go
关注(0)|答案(1)|浏览(214)

我是protobufs的新手,目前正在编写一个从NATS服务器读取数据的客户端。从NATS服务器发送的数据是protobuf。
我正在写的客户端是Go。这是我写的.proto文件:

syntax = "proto3";

package execution;

option go_package = "./protos/execution";

enum OrderStatus {
  Working = 0;
  Rejected = 1;
  Cancelled = 2;
  Completed = 3;
}

enum OrderType {
  Limit = 0;
  Market = 1;
  StopLimit = 2;
  StopMarket = 3;
}

enum OrderSide {
  Buy = 0;
  Sell = 1;
}

enum RejectReason {
  NoRejection = 0;
  InstrumentNotFound = 1;
  OrderNotFound = 2;
  InvalidOrderType = 3;
  InvalidAccount = 4;
  InvalidSide = 5;
  InvalidAmount = 6;
  InvalidLimitPrice = 7;
  InvalidQuoteLimit = 8;
  InvalidActivationPrice = 9;
  InvalidTimeInForce = 10;
  MarketHalted = 11;
  MarketPaused = 12;
  NoCounterOrders = 13;
  MissingExpirationTime = 14;
  IncorrectExpirationTime = 15;
  InternalError = 16;
  IllegalStatusSwitch = 17;
  OrderAlreadyExists = 18;
  InstrumentNotReady = 19;
  ExternalSystemError = 20;
}

enum ReportCause {
  NONE = 0;
  NewOrder = 1;
  CancelOrder = 2;
  MassCancel = 3;
  Expiration = 4;
  Trigger = 5;
  MarketStatusChange = 6;
}

enum TimeInForce {
  GoodTillCancel = 0;
  ImmediateOrCancel = 1;
  FillOrKill = 2;
}

enum CancelReason {
  NotCancelled = 0;
  CancelledByTrader = 1;
  CancelledBySystem = 2;
  SelfMatchPrevention = 3;
  OrderTimeInForce = 4;
  Liquidation = 100;
}

message TradeData {
  int64 TradeId = 1;
  string Amount = 4;
  string ExecutionPrice = 5;
  OrderStatus OrderStatus = 7;
  int64 AccountId = 11;
  string MatchedOrderExternalId = 14;
  int64 MatchedOrderId = 16;
  string RemainingAmount = 17;
}

message Execution {
  string Origin = 4;
  OrderSide Side = 7;
  string RequestedPrice = 8;
  string RequestedAmount = 9;
  string RemainingAmount = 10;
  int64 ExecutedAt = 13;
  OrderStatus OrderStatus = 14;
  repeated TradeData Trades = 16;
  OrderType OrderType = 20;
  int64 Version = 22;
  int64 AccountId = 23;
  RejectReason RejectReason = 25;
  ReportCause ReportCause = 26;
  string InstructionId = 27;
  string ExternalOrderId = 28;
  int32 ExecutionEngineMarketId = 29;
  int64 OrderId = 30;
  CancelReason CancelReason = 31;
  int64 TxId = 32;
  TimeInForce TimeInForce = 34;
  string CancelledBy = 35;
}

发布服务器是用C#编写的,其proto消息的代码如下:

[ProtoContract]
    public class ExecutionReport : IMarketResponse, IInstructionMessage, IOrderMatcherResponse
    {
        [ProtoIgnore]
        FeedMessageType IFeedMessage.Type => FeedMessageType.ExecutionReport;

        // ReSharper disable FieldCanBeMadeReadOnly.Global
        [ProtoMember(4)] public string Origin;
        [ProtoMember(7)] public OrderSide Side;
        [ProtoMember(8)] public decimal RequestedPrice;
        [ProtoMember(9)] public decimal RequestedAmount;
        [ProtoMember(10)] public decimal RemainingAmount;
        [ProtoMember(13)] public long ExecutedAt;
        [ProtoMember(14)] public OrderStatus OrderStatus;
        [ProtoMember(16)] public List<TradeData> Trades = new List<TradeData>();
        [ProtoMember(20)] public OrderType OrderType;
        [ProtoMember(22)] public long Version { get; set; }
        [ProtoMember(23)] public long AccountId;
        [ProtoMember(25)] public RejectReason RejectReason;
        [ProtoMember(26)] public ReportCause ReportCause;
        [ProtoMember(27)] public Guid InstructionId { get; set; }
        [ProtoMember(28)] public Guid ExternalOrderId;
        [ProtoMember(29)] public int ExecutionEngineMarketId { get; set; }
        [ProtoMember(30)] public long OrderId;
        [ProtoMember(31)] public CancelReason CancelReason;
        [ProtoMember(32)] public long TxId;
        [ProtoMember(34)] public TimeInForce TimeInForce;
        [ProtoMember(35)] public string CancelledBy;
    }

[ProtoContract]
    [StructLayout(LayoutKind.Sequential)]
    public struct TradeData
    {
        [ProtoMember(1)] public long TradeId;
        [ProtoMember(4)] public decimal Amount;
        [ProtoMember(5)] public decimal ExecutionPrice;
        [ProtoMember(7)] public OrderStatus OrderStatus;
        [ProtoMember(11)] public long AccountId;
        [ProtoMember(14)] public Guid MatchedOrderExternalId;
        [ProtoMember(16)] public long MatchedOrderId;
        [ProtoMember(17)] public decimal RemainingAmount;
    }

当我试图解组数据时,我得到了这个错误

proto: cannot parse invalid wire-format data

这就是我解析数据的方式:

_, err = sc.Subscribe("EXEC", func(m *stan.Msg) {
varr := &protos.Execution{}
err = proto.Unmarshal(m.Data, varr)
if err != nil {
    fmt.Printf("Err unmarshalling!: %v\n\n", err.Error())
} else {
    fmt.Printf("Received a message: %+v\n", varr)
}

我从服务器接收到的一个字节数据示例:

[5 85 0 0 0 56 1 66 3 8 144 78 74 2 8 1 82 2 8 1 104 197 192 132 194 159 143 219 237 8 176 1 25 184 1 11 208 1 1 218 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 226 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 232 1 1 240 1 25 128 2 25]

添加更多详细信息:
这就是C#发送数据的方式:

public async Task SendAsync(IFeedMessage msg)
{
    var subject = FeedSubject.ForMessage(msg);
    var data = msg.SerializeToArray();
    using (_metrics.FeedSendLatency.Start(new MetricTags("subject", subject.Value)))
    {
        await _connection.PublishAsync(subject, data);
    }
}

这是FeedMessage的结构(ExecutionReport也间接继承了FeedMessage)

public interface IFeedMessage
{
    FeedMessageType Type { get; }
    IFeedMessage Clone();
    void Reset();
}

下面是SerializeToArray()的工作原理:

public static ArraySegment<byte> SerializeToArray(this IFeedMessage message)
{
    return message.SerializeToMemory(new MemoryStream());
}

public static ArraySegment<byte> SerializeToMemory(this IFeedMessage message, MemoryStream stream)
{
    var start = stream.Position;
    message.Serialize(stream);
    return new ArraySegment<byte>(stream.GetBuffer(), (int)start, (int)(stream.Position - start));
}

public static void Serialize(this IFeedMessage message, Stream stream)
{
    stream.WriteByte((byte)message.Type);
    RuntimeTypeModel.Default.SerializeWithLengthPrefix(stream, message, message.GetType(), PrefixStyle.Fixed32, 0);
}

我不知道确切的原因是什么。但我写的原始文件似乎是错误的。我读过一些面临同样错误的帖子,但大多数都没有解决同样的问题。如果需要其他细节,请告诉我。
请帮我一下。

xa9qqrwz

xa9qqrwz1#

根据评论中的讨论,我已经设法解组了数据。

    • 备注**:
  • 数据前缀为5字节(这是完全不必要的):
  • 消息类型为1个字节
  • 数据长度为4个字节
  • C#实现使用特定于C#的decimalGuid数据类型。(正如bcl.proto中所评论的,跨平台代码通常应该完全避免它们)。

下面是文件夹结构:

├── bcl.proto
├── execution.proto
├── go.mod
├── go.sum
├── main.go
└── protos
    ├── bcl.pb.go
    └── execution.pb.go
    • bcl.proto**:

此文件是从github.com/protobuf-net/protobuf-net复制的。它是必需的,因为. NET实现使用此proto文件中的DecimalGuid

// The types in here indicate how protobuf-net represents certain types when using protobuf-net specific
// library features. Note that it is not *required* to use any of these types, and cross-platform code
// should usually avoid them completely (ideally starting from a .proto schema)

// Some of these are ugly, sorry. The TimeSpan / DateTime dates here pre-date the introduction of Timestamp
// and Duration, and the "well known" types should be preferred when possible. Guids are particularly
// awkward - it turns out that there are multiple guid representations, and I accidentally used one that
// I can only call... "crazy-endian". Just make sure you check the order!

// It should not be necessary to use bcl.proto from code that uses protobuf-net

syntax = "proto3";

option csharp_namespace = "ProtoBuf.Bcl";
option go_package = "./protos";

package bcl;

message TimeSpan {
  sint64 value = 1; // the size of the timespan (in units of the selected scale)
  TimeSpanScale scale = 2; // the scale of the timespan [default = DAYS]
  enum TimeSpanScale {
    DAYS = 0;
    HOURS = 1;
    MINUTES = 2;
    SECONDS = 3;
    MILLISECONDS = 4;
    TICKS = 5;

    MINMAX = 15; // dubious
  }
}

message DateTime {
  sint64 value = 1; // the offset (in units of the selected scale) from 1970/01/01
  TimeSpanScale scale = 2; // the scale of the timespan [default = DAYS]
  DateTimeKind kind = 3; // the kind of date/time being represented [default = UNSPECIFIED]
  enum TimeSpanScale {
    DAYS = 0;
    HOURS = 1;
    MINUTES = 2;
    SECONDS = 3;
    MILLISECONDS = 4;
    TICKS = 5;

    MINMAX = 15; // dubious
  }
  enum DateTimeKind
  {
     // The time represented is not specified as either local time or Coordinated Universal Time (UTC).
     UNSPECIFIED = 0;
     // The time represented is UTC.
     UTC = 1;
     // The time represented is local time.
     LOCAL = 2;
   }
}

message NetObjectProxy {
  int32 existingObjectKey = 1; // for a tracked object, the key of the **first** time this object was seen
  int32 newObjectKey = 2; // for a tracked object, a **new** key, the first time this object is seen
  int32 existingTypeKey = 3; // for dynamic typing, the key of the **first** time this type was seen
  int32 newTypeKey = 4; // for dynamic typing, a **new** key, the first time this type is seen
  string typeName = 8; // for dynamic typing, the name of the type (only present along with newTypeKey)
  bytes payload = 10; // the new string/value (only present along with newObjectKey)
}

message Guid {
  fixed64 lo = 1; // the first 8 bytes of the guid (note:crazy-endian)
  fixed64 hi = 2; // the second 8 bytes of the guid (note:crazy-endian)
}

message Decimal {
  uint64 lo = 1; // the first 64 bits of the underlying value
  uint32 hi = 2; // the last 32 bis of the underlying value
  uint32 signScale = 3; // the number of decimal digits (bits 1-16), and the sign (bit 0)
}
    • 执行. proto**
syntax = "proto3";

package execution;

option go_package = "./protos";

import "bcl.proto";

enum OrderStatus {
  Working = 0;
  Rejected = 1;
  Cancelled = 2;
  Completed = 3;
}

enum OrderType {
  Limit = 0;
  Market = 1;
  StopLimit = 2;
  StopMarket = 3;
}

enum OrderSide {
  Buy = 0;
  Sell = 1;
}

enum RejectReason {
  NoRejection = 0;
  InstrumentNotFound = 1;
  OrderNotFound = 2;
  InvalidOrderType = 3;
  InvalidAccount = 4;
  InvalidSide = 5;
  InvalidAmount = 6;
  InvalidLimitPrice = 7;
  InvalidQuoteLimit = 8;
  InvalidActivationPrice = 9;
  InvalidTimeInForce = 10;
  MarketHalted = 11;
  MarketPaused = 12;
  NoCounterOrders = 13;
  MissingExpirationTime = 14;
  IncorrectExpirationTime = 15;
  InternalError = 16;
  IllegalStatusSwitch = 17;
  OrderAlreadyExists = 18;
  InstrumentNotReady = 19;
  ExternalSystemError = 20;
}

enum ReportCause {
  NONE = 0;
  NewOrder = 1;
  CancelOrder = 2;
  MassCancel = 3;
  Expiration = 4;
  Trigger = 5;
  MarketStatusChange = 6;
}

enum TimeInForce {
  GoodTillCancel = 0;
  ImmediateOrCancel = 1;
  FillOrKill = 2;
}

enum CancelReason {
  NotCancelled = 0;
  CancelledByTrader = 1;
  CancelledBySystem = 2;
  SelfMatchPrevention = 3;
  OrderTimeInForce = 4;
  Liquidation = 100;
}

message TradeData {
  int64 TradeId = 1;
  bcl.Decimal Amount = 4;
  bcl.Decimal ExecutionPrice = 5;
  OrderStatus OrderStatus = 7;
  int64 AccountId = 11;
  bcl.Guid MatchedOrderExternalId = 14;
  int64 MatchedOrderId = 16;
  bcl.Decimal RemainingAmount = 17;
}

message Execution {
  bytes Origin = 4;
  OrderSide Side = 7;
  bcl.Decimal RequestedPrice = 8;
  bcl.Decimal RequestedAmount = 9;
  bcl.Decimal RemainingAmount = 10;
  int64 ExecutedAt = 13;
  OrderStatus OrderStatus = 14;
  repeated TradeData Trades = 16;
  OrderType OrderType = 20;
  int64 Version = 22;
  int64 AccountId = 23;
  RejectReason RejectReason = 25;
  ReportCause ReportCause = 26;
  bcl.Guid InstructionId = 27;
  bcl.Guid ExternalOrderId = 28;
  int32 ExecutionEngineMarketId = 29;
  int64 OrderId = 30;
  CancelReason CancelReason = 31;
  int64 TxId = 32;
  TimeInForce TimeInForce = 34;
  string CancelledBy = 35;
}
    • protos/**

此文件夹中的文件是使用以下命令从proto文件生成的:

protoc --go_out=protos --go_opt=paths=source_relative bcl.proto execution.proto
    • go. mod**
module mymodule.local

go 1.20

require google.golang.org/protobuf v1.30.0
    • main. go**
package main

import (
    "encoding/binary"
    "log"

    "google.golang.org/protobuf/proto"

    "mymodule.local/protos"
)

func main() {
    data := []byte{5, 85, 0, 0, 0, 56, 1, 66, 3, 8, 144, 78, 74, 2, 8, 1, 82, 2, 8, 1, 104, 197, 192, 132, 194, 159, 143, 219, 237, 8, 176, 1, 25, 184, 1, 11, 208, 1, 1, 218, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 226, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 232, 1, 1, 240, 1, 25, 128, 2, 25}
    if len(data) < 5 {
        log.Fatal("data should contain at least 5 bytes")
    }
    messageType := data[0]
    length := binary.LittleEndian.Uint32(data[1:5])
    data = data[5:]
    if length != uint32(len(data)) {
        log.Fatalf("invalid data length: %d", length)
    }
    execution := &protos.Execution{}

    err := proto.Unmarshal(data, execution)
    if err != nil {
        log.Fatalf("Err unmarshalling!: %v", err)
    }

    log.Printf("message type: %d, message: %+v", messageType, execution)
}

问题中提供的数据的输出:

2023/06/15 17:50:58 message type: 5, message: Side:Sell  RequestedPrice:{lo:10000}  RequestedAmount:{lo:1}  RemainingAmount:{lo:1}  ExecutedAt:638223043314917445  Version:25  AccountId:11  ReportCause:NewOrder  InstructionId:{lo:5574686611683820165  hi:10500929413443338416}  ExternalOrderId:{lo:5574686611683820165  hi:10500929413443338416}  ExecutionEngineMarketId:1  OrderId:25  TxId:25

相关问题