ruby RabbitMQ Streaming Plugin的x-stream-offset的正确日期格式是什么?

0lvr5msh  于 2023-02-18  发布在  Ruby
关注(0)|答案(2)|浏览(115)

Bunny一直给我一个错误,我相信是从RabbitMQ发回的,即以下字符串是无效的日期格式:
无效:

  • ISO8601时间戳字符串
  • 毫秒整数
  • 年-月-日字符串
  • Unix时间或历元时间整数Time.now.to_i

有效期:

  • 整数(获取请求的消息和其后的所有内容)
  • first(字符串)(获取从第一个开始的所有消息)
  • last(字符串)
  • 下一个(字符串)
iso8601 = '2021-08-28T13:40:31-07:00'

opts = {
  exclusive: false,
  manual_ack: true,
  block: true,
  arguments: {
    'x-stream-offset': iso8601
  }
}

queue.subscribe(opts) do |delivery_info, _properties, payload|
  msg = JSON.parse(payload)
  puts msg

  ch.ack(delivery_info.delivery_tag, false)
end

我收到一个错误,指出它是一个无效的流偏移参数
下面是我正在运行的rake任务的错误消息

Bunny::PreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-stream-offset' for queue 'stream_test' in vhost '/': {invalid_stream_offset_arg,{longstr,<<"2021-08-28T13:40:31-07:00">>}}
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:2014:in `raise_if_channel_close!'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:944:in `basic_consume_with'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/queue.rb:191:in `subscribe'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/lib/stream/lib/read.rb:39:in `read'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/tasks/read.rake:9:in `block in <top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/rake-12.3.3/exe/rake:27:in `<top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `eval'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `<main>'

--
下面是一个连接到rabbitMQ作为流(没有二进制协议)的工作示例-需要streaming plugin

# frozen_string_literal: true

require 'bunny'
require 'json'
require 'pp'

rabbit_user = 'guest'
rabbit_pass = 'guest'
rabbit_host = 'localhost:5672'

conn = Bunny.new(
  "amqp://#{rabbit_user}:#{rabbit_pass}@#{rabbit_host}",
  client_properties: { connection_name: :stream }
)

conn.start

ch = conn.create_channel(nil, 16)

queue = ch.queue(
  'stream_test',
  durable: true,
  auto_delete: false,
  exclusive: false,
  arguments: {
    'x-queue-type': 'stream',
    'x-max-length-bytes': 500_000_000
  }
)

50000.times do |i|
  queue.publish(JSON.dump({ hello: "world #{i + 1}" }), routing_key: 'stream_test')
  puts "published #{i + 1}"
end

ch.basic_qos(25)

opts = {
  exclusive: false,
  manual_ack: true,
  ## block will make it consume the main IO instead of being a seperate thread
  ## it is not recommended in production
  block: true,
  arguments: {
    'x-stream-offset': 'first'
  }
}

queue.subscribe(opts) do |delivery_info, _properties, payload|
  msg = JSON.parse(payload)
  puts msg

  ch.ack(delivery_info.delivery_tag, false)
end

puts 'done'
sleep 1
conn.close

Java client examples显示它应该能够接受时间戳,但我似乎无法发送时间戳,是否存在可接受日期格式?
流媒体特性的Java客户端文档说明
Timestamp-一个时间戳值,用于指定附加到日志的时间点。它将钳位到最近的偏移量,如果时间戳超出流的范围,它将分别钳位日志的开始或结束。对于AMQP 0.9.1,使用的时间戳是精度为1秒的POSIX时间,即自1970年1月1日00:00:00 UTC以来的秒数。
我找不到POSIX时间戳的示例

xghobddn

xghobddn1#

查看ErlangRabbitMQ源代码here(在任何开发人员文档中均未显示)后,您似乎可以通过传入{number_of_seconds}s以获取秒数或{number_of_minutes}m以获取分钟数来获取时间序列数据
因此,时间序列数据可以格式化为字符串-看起来它确实接受某种时间格式,但同样不清楚如何使其工作,出于我的需要,字符串值是可以接受的

%% Erlang Source Code for RabbitMQ
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
    Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
    case lists:keyfind(Key, 1, KVList) of
        {_, {timestamp, Ts}} ->
            [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
        {_, {utf8, Spec}} ->
            [{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc
        {_, {_, Offset}} when is_integer(Offset) ->
            [{<<"x-stream-offset">>, long, Offset}]; %% integer offset
        _ ->
            []
    end;
source_filters_to_consumer_args(_Source) ->
    [].

Ruby中的答案是执行以下操作:

## float/decimal values are rejected, only accepts whole numbers,
## so you have to round seconds to the nearest whole number
offset = "10s"

opts = {
  exclusive: false,
  manual_ack: true,
  ## block will make it consume the main IO instead of being a seperate thread
  ## it is not recommended in production
  block: true,
  arguments: {
    'x-stream-offset': offset
  }
}

queue.subscribe(opts) do |delivery_info, _properties, payload|
  msg = JSON.parse(payload)
  puts msg

  ch.ack(delivery_info.delivery_tag, false)
end
kq0g1dla

kq0g1dla2#

我想知道RabbitMQ .NET客户端也是如此,结果发现有一个AmqpTimestamp类型。
在电线上,它最终是类似'T'{(ulong bytes)timestamp}的东西。
这个办法奏效了:

var streamOffset = new AmqpTimestamp((long)Math.Floor((DateTime.Parse("2023-02-17T08:11:00Z", CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal) - DateTime.UnixEpoch).TotalSeconds));

channel.BasicQos(0, 10, false);
channel.BasicConsume(
    queue: stream,
    autoAck: false,
    consumer: consumer,
    arguments: new Dictionary<string, object> { { "x-stream-offset", streamOffset } });

相关问题