从一个文件向RabbitMQ发布多条消息

snvhrwxg  于 2023-04-30  发布在  RabbitMQ
关注(0)|答案(8)|浏览(242)

将单个消息发布到RabbitMQ队列可以通过UI轻松完成,只需将消息放入UI中并单击“PublishMessage”按钮即可。
如何发布一批消息?
我有一个文件,其中包含要发送到RabbitMQ的消息。每行有一条消息。
如何将文件中的所有消息发布到RabbitMQ服务器?
有没有一种方法可以从命令行中完成?

zfycwa2u

zfycwa2u1#

使用rabbitmqadmin

while read -r line; do 
  echo $line | rabbitmqadmin publish exchange=amq.default routing_key=my_queue ; 
done < messages

不为rabbitmqadmin publish指定payload参数意味着它从stdin读取payload。

laawzig2

laawzig22#

可以使用curlrabbitmq API

curl -u login:pass -i -H "content-type:application/json" -X POST http://localhost:15672/api/exchanges/%2Fvhost/exchange/publish -d'{"properties":{},"routing_key":"","payload":"you message","payload_encoding":"string"}'

这是一个消息的例子,通过类推,你可以写一个脚本

z3yyvxxp

z3yyvxxp3#

你需要使用rabbitmqadmin cli工具:
https://www.rabbitmq.com/management-cli.html
rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello, world"

h22fl7wq

h22fl7wq4#

作为looseend答案的变体,您也可以使用GNU Parallel
如果您有一个大文件,这将产生更好的性能。

cat messages | parallel -j 100 \
  ./rabbitmqadmin -H $RABBITMQ_HOST \
                  -u $RABBITMQ_USERNAME \
                  -p $RABBITMQ_PASSWORD  \
                  publish exchange=amq.default \
                  routing_key=myqueue \
                  payload="{}"

这将运行100个工作。如果不需要,请省略主机和凭据。

unhi4e5o

unhi4e5o5#

我已经更新了rabbitmqadmin文件,以支持文件内容发布。尝试查找包含EXTRA_VERBS = {def invoke_publish(self):的行,并更新它们的相关代码,如下所示

EXTRA_VERBS = {
    'publish': {'mandatory': ['routing_key'],
                'optional':  {'payload': None,
                              'pfile': None,
                              'properties': {},
                              'exchange': 'amq.default',
                              'payload_encoding': 'string'},
                'json':      ['properties'],
                'uri':       '/exchanges/{vhost}/{exchange}/publish'},
    'get':     {'mandatory': ['queue'],
                'optional':  {'count': '1', 'requeue': 'true',
                              'payload_file': None, 'encoding': 'auto'},
                'uri':       '/queues/{vhost}/{queue}/get'}
}

def invoke_publish(self):
    (uri, upload) = self.parse_args(self.args, EXTRA_VERBS['publish'])
    if not 'payload' and 'pfile' in upload:
        data = sys.stdin.read()
        upload['payload'] = b64(data)
        upload['payload_encoding'] = 'base64'
    elif not 'payload' in upload:
        with open('populate/' + upload['pfile']) as f: data = f.read()
        upload['payload'] = b64(data)
        upload['payload_encoding'] = 'base64'
    resp = json.loads(self.post(uri, json.dumps(upload)))
    if resp['routed']:
        self.verbose("Message published")
    else:
        self.verbose("Message published but NOT routed")

如果您想使用绝对路径提供文件,请从下面一行中删除'populate/' +

with open('populate/' + upload['pfile']) as f: data = f.read()

没有要打开的更新(。..)下面的命令对我来说工作得很好,假设文件规则。json被放置在相对目录“populate”中

python rabbitmqadmin.py publish exchange=feed-mgmt-in routing_key='#' properties='{"type":"domain-collections/rules"}' pfile="rules.json"
wz3gfoph

wz3gfoph6#

https://github.com/selency/amqp-publish可用于将RabbitMQ消息发布到标准5672端口,即使管理端口15672未启用。

# cd /tmp
# curl -vL https://github.com/selency/amqp-publish/releases/download/v1.0.0/amqp-publish.linux-amd64 -o amqp-publish
# chmod +x ./amqp-publish

# ./amqp-publish --uri="amqp://admin:password@localhost:5672/" --exchange="foo" --routing-key="awesome-routing-key" --body="hello, world!"
qkf9rpyu

qkf9rpyu7#

https://github.com/alanxz/rabbitmq-c中的amqp-publish可用于将RabbitMQ消息发布到标准5672端口,即使管理端口15672未启用。
手动安装在k8s pod内:

# cd /tmp
# curl -LO http://mirror.centos.org/centos/7/os/x86_64/Packages/librabbitmq-0.8.0-3.el7.x86_64.rpm
# curl -LO http://mirror.centos.org/centos/7/os/x86_64/Packages/librabbitmq-examples-0.8.0-3.el7.x86_64.rpm
# yum install librabbitmq-tools-0.11.0-5.el9.x86_64.rpm
# yum install librabbitmq-examples-0.8.0-3.el7.x86_64.rpm

对我有用的消息发布命令示例:

# amqp-publish --url="amqp://usr:pwd@host:5672" -e exchange-name -r routing-key <message-body.json

consume响应队列的示例命令:

# amqp-consume --url="amqp://usr:pwd@host:5672" -q queue-name cat
wdebmtf2

wdebmtf28#

使用amqp-tool

npm install amqp-tool -g

amqp-tool --host rabbitmq.local -u <user> -p <password> -q <queue name> --import file.json

相关问题