Bunny
一直给我一个错误,我相信是从RabbitMQ发回的,即以下字符串是无效的日期格式:
无效:
- ISO8601时间戳字符串
- 毫秒整数
- 年-月-日字符串
- Unix时间或历元时间整数
Time.now.to_i
有效期:
- 整数(获取请求的消息和其后的所有内容)
- first(字符串)(获取从第一个开始的所有消息)
- last(字符串)
- 下一个(字符串)
iso8601 = '2021-08-28T13:40:31-07:00'
opts = {
exclusive: false,
manual_ack: true,
block: true,
arguments: {
'x-stream-offset': iso8601
}
}
queue.subscribe(opts) do |delivery_info, _properties, payload|
msg = JSON.parse(payload)
puts msg
ch.ack(delivery_info.delivery_tag, false)
end
我收到一个错误,指出它是一个无效的流偏移参数
下面是我正在运行的rake任务的错误消息
Bunny::PreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-stream-offset' for queue 'stream_test' in vhost '/': {invalid_stream_offset_arg,{longstr,<<"2021-08-28T13:40:31-07:00">>}}
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:2014:in `raise_if_channel_close!'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:944:in `basic_consume_with'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/queue.rb:191:in `subscribe'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/lib/stream/lib/read.rb:39:in `read'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/tasks/read.rake:9:in `block in <top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/rake-12.3.3/exe/rake:27:in `<top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `eval'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `<main>'
--
下面是一个连接到rabbitMQ作为流(没有二进制协议)的工作示例-需要streaming plugin
# frozen_string_literal: true
require 'bunny'
require 'json'
require 'pp'
rabbit_user = 'guest'
rabbit_pass = 'guest'
rabbit_host = 'localhost:5672'
conn = Bunny.new(
"amqp://#{rabbit_user}:#{rabbit_pass}@#{rabbit_host}",
client_properties: { connection_name: :stream }
)
conn.start
ch = conn.create_channel(nil, 16)
queue = ch.queue(
'stream_test',
durable: true,
auto_delete: false,
exclusive: false,
arguments: {
'x-queue-type': 'stream',
'x-max-length-bytes': 500_000_000
}
)
50000.times do |i|
queue.publish(JSON.dump({ hello: "world #{i + 1}" }), routing_key: 'stream_test')
puts "published #{i + 1}"
end
ch.basic_qos(25)
opts = {
exclusive: false,
manual_ack: true,
## block will make it consume the main IO instead of being a seperate thread
## it is not recommended in production
block: true,
arguments: {
'x-stream-offset': 'first'
}
}
queue.subscribe(opts) do |delivery_info, _properties, payload|
msg = JSON.parse(payload)
puts msg
ch.ack(delivery_info.delivery_tag, false)
end
puts 'done'
sleep 1
conn.close
Java client examples显示它应该能够接受时间戳,但我似乎无法发送时间戳,是否存在可接受日期格式?
流媒体特性的Java客户端文档说明
Timestamp-一个时间戳值,用于指定附加到日志的时间点。它将钳位到最近的偏移量,如果时间戳超出流的范围,它将分别钳位日志的开始或结束。对于AMQP 0.9.1,使用的时间戳是精度为1秒的POSIX时间,即自1970年1月1日00:00:00 UTC以来的秒数。
我找不到POSIX时间戳的示例
2条答案
按热度按时间xghobddn1#
查看ErlangRabbitMQ源代码here(在任何开发人员文档中均未显示)后,您似乎可以通过传入
{number_of_seconds}s
以获取秒数或{number_of_minutes}m
以获取分钟数来获取时间序列数据因此,时间序列数据可以格式化为字符串-看起来它确实接受某种时间格式,但同样不清楚如何使其工作,出于我的需要,字符串值是可以接受的
Ruby中的答案是执行以下操作:
kq0g1dla2#
我想知道RabbitMQ .NET客户端也是如此,结果发现有一个
AmqpTimestamp
类型。在电线上,它最终是类似
'T'{(ulong bytes)timestamp}
的东西。这个办法奏效了: