def invoke_publish(self):
(uri, upload) = self.parse_args(self.args, EXTRA_VERBS['publish'])
if not 'payload' and 'pfile' in upload:
data = sys.stdin.read()
upload['payload'] = b64(data)
upload['payload_encoding'] = 'base64'
elif not 'payload' in upload:
with open('populate/' + upload['pfile']) as f: data = f.read()
upload['payload'] = b64(data)
upload['payload_encoding'] = 'base64'
resp = json.loads(self.post(uri, json.dumps(upload)))
if resp['routed']:
self.verbose("Message published")
else:
self.verbose("Message published but NOT routed")
如果您想使用绝对路径提供文件,请从下面一行中删除'populate/' +。
with open('populate/' + upload['pfile']) as f: data = f.read()
8条答案
按热度按时间zfycwa2u1#
使用rabbitmqadmin
不为
rabbitmqadmin publish
指定payload参数意味着它从stdin读取payload。laawzig22#
可以使用curl和rabbitmq API:
这是一个消息的例子,通过类推,你可以写一个脚本
z3yyvxxp3#
你需要使用
rabbitmqadmin
cli工具:https://www.rabbitmq.com/management-cli.html
rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello, world"
h22fl7wq4#
作为looseend答案的变体,您也可以使用GNU Parallel
如果您有一个大文件,这将产生更好的性能。
这将运行100个工作。如果不需要,请省略主机和凭据。
unhi4e5o5#
我已经更新了rabbitmqadmin文件,以支持文件内容发布。尝试查找包含
EXTRA_VERBS = {
和def invoke_publish(self):
的行,并更新它们的相关代码,如下所示和
如果您想使用绝对路径提供文件,请从下面一行中删除
'populate/' +
。没有要打开的更新(。..)下面的命令对我来说工作得很好,假设文件规则。json被放置在相对目录“populate”中
wz3gfoph6#
https://github.com/selency/amqp-publish可用于将RabbitMQ消息发布到标准5672端口,即使管理端口15672未启用。
qkf9rpyu7#
https://github.com/alanxz/rabbitmq-c中的amqp-publish可用于将RabbitMQ消息发布到标准5672端口,即使管理端口15672未启用。
手动安装在k8s pod内:
对我有用的消息发布命令示例:
consume响应队列的示例命令:
wdebmtf28#
使用amqp-tool: