gpt4 book ai didi

java - 如何从 Java 等非 python 语言调用 celery 任务延迟函数?

转载 作者:太空狗 更新时间:2023-10-29 22:30:40 26 4
gpt4 key购买 nike

我在 3 集群机器上设置了 celery + rabbitmq。我还创建了一个任务,它根据文件中的数据生成一个正则表达式,并使用该信息来解析文本。

from celery import Celery

celery = Celery('tasks', broker='amqp://localhost//')
import re

@celery.task
def add(x, y):
return x + y


def get_regular_expression():
with open("text") as fp:
data = fp.readlines()
str_re = "|".join([x.split()[2] for x in data ])
return str_re



@celery.task
def analyse_json(tw):
str_re = get_regular_expression()
re.match(str_re,tw.text)

我可以使用以下 python 代码非常轻松地调用此任务:-

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x)

但是,现在我想从 Java 而不是 python 进行相同的调用。我不确定做同样的事情最简单的方法是什么。

我编写了这段用于向 AMQP 代理发送消息的代码。代码运行正常,但任务没有执行。我不确定如何指定应该执行的任务的名称。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

class try1 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "celery", "celery");
String messageBody = "{\"text\":\"i am good\"}" ;
byte[] msgBytes = messageBody.getBytes("ASCII") ;
channel.basicPublish(queueName, queueName,
new AMQP.BasicProperties
("application/json", null, null, null,
null, null, null, null,
null, null, null, "guest",
null, null),messageBody.getBytes("ASCII")) ;
connection.close();

}

这是 rabbitMq 错误日志中的输出:-

connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}

我们将不胜感激。

谢谢,阿米特

最佳答案

有几个问题。

1) String queueName = channel.queueDeclare().getQueue() 命令返回了错误的队列名称。我将队列名称更改为“celery”并且工作正常。2) json 的格式必须是这种类型:- {“id”:“4cc7438e-afd4-4f8f-a2f3-f46567e7ca77”, "task": "celery.task.PingTask", “参数”:[], “kwargs”:{}, “重试”:0, “eta”:“2009-11-17T12:30:56.527191”

http://docs.celeryproject.org/en/latest/internals/protocol.html

这两个更改后它工作正常。

-阿米特

关于java - 如何从 Java 等非 python 语言调用 celery 任务延迟函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20843236/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com