gpt4 book ai didi

rabbitmq - Airflow Scheduler 和 Web 服务器在对 RabbitMQ 上运行的任务进行排队时挂起

转载 作者:行者123 更新时间:2023-12-02 18:36:50 26 4
gpt4 key购买 nike

我正在努力让 Airflow 工作人员运行任务。我开始服务:

airflow worker --debug
airflow webserver
airflow scheduler
airflow flower #to check celery queues in UI at localhost:5555

这些进程运行良好,但是当调度程序将要运行的任务添加到队列中或者当我尝试从 Airflow UI 运行任务时,调度程序和网络服务器被挂起 - 连续加载不再继续 - 同时添加任务到队列: Scheduler hanging Airflow UI Webserver hanging Workers are not receiving tasks from the rabbitmq queue Flower server is hanging

我认为问题与调度程序/网络服务器和队列之间的通信有关。我在airflow.cfg 文件中与代理相关的设置是:broker_url = amqp://guest:***@ksaprice_rabbitmq:15672// - 我也尝试过:broker_url = pyamqp://guest:***@ksaprice_rabbitmq:15672// 。 rabbitmq 服务器运行良好,我也测试了登录名和密码凭据。

我使用的版本是:

  • Airflow ==1.8.1
  • celery =4.1
  • rabbitmq 服务器 3.6

我是 Airflow 和 Rabbitmq 的新手。

更新:我的排队问题已通过 @Jean-Sébastien Pédron 的回答解决,但我的工作人员仍然没有执行任务,并且花朵没有显示工作人员,尽管 airflow worker服务正在8793端口运行。

Rabbitmq 报告:

Status of node ksaprice_rabbitmq@4eed789778c0
[{pid,233},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.10"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"},
{amqp_client,"RabbitMQ AMQP Client","3.6.10"},
{cowboy,"Small, fast, modular HTTP server.","1.0.4"},
{cowlib,"Support library for manipulating Web protocols.","1.0.2"},
{inets,"INETS CXC 138 49","6.3.4"},
{rabbit,"RabbitMQ","3.6.10"},
{mnesia,"MNESIA CXC 138 12","4.14.2"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.6.10"},
{compiler,"ERTS CXC 138 10","7.0.3"},
{os_mon,"CPO CXC 138 46","2.4.1"},
{ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
{ssl,"Erlang/OTP SSL application","8.1"},
{public_key,"Public key infrastructure","1.3"},
{crypto,"CRYPTO","3.7.2"},
{xmerl,"XML parser","1.3.12"},
{asn1,"The Erlang ASN1 compiler version 4.0.4","4.0.4"},
{syntax_tools,"Syntax tools","2.1.1"},
{sasl,"SASL CXC 138 11","3.0.2"},
{stdlib,"ERTS CXC 138 10","3.2"},
{kernel,"ERTS CXC 138 10","5.1.1"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 19 [erts-8.2.1] [source] [64-bit] [smp:2:2] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
[{total,77018832},
{connection_readers,334888},
{connection_writers,14640},
{connection_channels,132040},
{connection_other,477152},
{queue_procs,65480},
{queue_slave_procs,0},
{plugins,2287080},
{other_proc,19854000},
{mnesia,77272},
{metrics,239992},
{mgmt_db,852688},
{msg_index,44208},
{other_ets,2577600},
{binary,3923976},
{code,24680786},
{atom,1033401},
{other_system,20660789}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,830581964},
{disk_free_limit,50000000},
{disk_free,56083853312},
{file_descriptors,
[{total_limit,1048476},
{total_used,13},
{sockets_limit,943626},
{sockets_used,10}]},
{processes,[{limit,1048576},{used,420}]},
{run_queue,0},
{uptime,45431},
{kernel,{net_ticktime,60}}]

Cluster status of node ksaprice_rabbitmq@4eed789778c0
[{nodes,[{disc,[ksaprice_rabbitmq@4eed789778c0]}]},
{running_nodes,[ksaprice_rabbitmq@4eed789778c0]},
{cluster_name,<<"ksaprice_rabbitmq@4eed789778c0">>},
{partitions,[]},
{alarms,[{ksaprice_rabbitmq@4eed789778c0,[]}]}]

Application environment of node ksaprice_rabbitmq@4eed789778c0
[{amqp_client,[{prefer_ipv6,false},{ssl_options,[]}]},
{asn1,[]},
{compiler,[]},
{cowboy,[]},
{cowlib,[]},
{crypto,[]},
{inets,[]},
{kernel,
[{error_logger,tty},
{inet_default_connect_options,[{nodelay,true}]},
{inet_dist_listen_max,25672},
{inet_dist_listen_min,25672}]},
{mnesia,[{dir,"/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq"}]},
{os_mon,
[{start_cpu_sup,false},
{start_disksup,false},
{start_memsup,false},
{start_os_sup,false}]},
{public_key,[]},
{rabbit,
[{auth_backends,[rabbit_auth_backend_internal]},
{auth_mechanisms,['PLAIN','AMQPLAIN']},
{background_gc_enabled,false},
{background_gc_target_interval,60000},
{backing_queue_module,rabbit_priority_queue},
{channel_max,0},
{channel_operation_timeout,15000},
{cluster_keepalive_interval,10000},
{cluster_nodes,{[],disc}},
{cluster_partition_handling,ignore},
{collect_statistics,fine},
{collect_statistics_interval,5000},
{config_entry_decoder,
[{cipher,aes_cbc256},
{hash,sha512},
{iterations,1000},
{passphrase,undefined}]},
{credit_flow_default_credit,{400,200}},
{default_permissions,[<<".*">>,<<".*">>,<<".*">>]},
{default_user,<<"guest">>},
{default_user_tags,[administrator]},
{default_vhost,<<"/">>},
{delegate_count,16},
{disk_free_limit,50000000},
{disk_monitor_failure_retries,10},
{disk_monitor_failure_retry_interval,120000},
{enabled_plugins_file,"/etc/rabbitmq/enabled_plugins"},
{error_logger,tty},
{fhc_read_buffering,false},
{fhc_write_buffering,true},
{frame_max,131072},
{halt_on_upgrade_failure,true},
{handshake_timeout,10000},
{heartbeat,60},
{hipe_compile,false},
{hipe_modules,
[rabbit_reader,rabbit_channel,gen_server2,rabbit_exchange,
rabbit_command_assembler,rabbit_framing_amqp_0_9_1,rabbit_basic,
rabbit_event,lists,queue,priority_queue,rabbit_router,rabbit_trace,
rabbit_misc,rabbit_binary_parser,rabbit_exchange_type_direct,
rabbit_guid,rabbit_net,rabbit_amqqueue_process,
rabbit_variable_queue,rabbit_binary_generator,rabbit_writer,
delegate,gb_sets,lqueue,sets,orddict,rabbit_amqqueue,
rabbit_limiter,gb_trees,rabbit_queue_index,
rabbit_exchange_decorator,gen,dict,ordsets,file_handle_cache,
rabbit_msg_store,array,rabbit_msg_store_ets_index,rabbit_msg_file,
rabbit_exchange_type_fanout,rabbit_exchange_type_topic,mnesia,
mnesia_lib,rpc,mnesia_tm,qlc,sofs,proplists,credit_flow,pmon,
ssl_connection,tls_connection,ssl_record,tls_record,gen_fsm,ssl]},
{lazy_queue_explicit_gc_run_operation_threshold,1000},
{log_levels,[{connection,info}]},
{loopback_users,[]},
{memory_monitor_interval,2500},
{mirroring_flow_control,true},
{mirroring_sync_batch_size,4096},
{mnesia_table_loading_retry_limit,10},
{mnesia_table_loading_retry_timeout,30000},
{msg_store_credit_disc_bound,{4000,800}},
{msg_store_file_size_limit,16777216},
{msg_store_index_module,rabbit_msg_store_ets_index},
{msg_store_io_batch_size,4096},
{num_ssl_acceptors,1},
{num_tcp_acceptors,10},
{password_hashing_module,rabbit_password_hashing_sha256},
{plugins_dir,
"/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/plugins"},
{plugins_expand_dir,
"/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq-plugins-expand"},
{queue_explicit_gc_run_operation_threshold,1000},
{queue_index_embed_msgs_below,4096},
{queue_index_max_journal_entries,32768},
{reverse_dns_lookups,false},
{sasl_error_logger,tty},
{server_properties,[]},
{ssl_allow_poodle_attack,false},
{ssl_apps,[asn1,crypto,public_key,ssl]},
{ssl_cert_login_from,distinguished_name},
{ssl_handshake_timeout,5000},
{ssl_listeners,[]},
{ssl_options,[]},
{tcp_listen_options,
[{backlog,128},
{nodelay,true},
{linger,{true,0}},
{exit_on_close,false}]},
{tcp_listeners,[5672]},
{trace_vhosts,[]},
{vm_memory_high_watermark,0.4},
{vm_memory_high_watermark_paging_ratio,0.5}]},
{rabbit_common,[]},
{rabbitmq_management,
[{cors_allow_origins,[]},
{cors_max_age,1800},
{http_log_dir,none},
{listener,[{port,15672}]},
{load_definitions,none},
{management_db_cache_multiplier,5},
{process_stats_gc_timeout,300000},
{stats_event_max_backlog,250}]},
{rabbitmq_management_agent,
[{rates_mode,basic},
{sample_retention_policies,
[{global,[{605,5},{3660,60},{29400,600},{86400,1800}]},
{basic,[{605,5},{3600,60}]},
{detailed,[{605,5}]}]}]},
{rabbitmq_web_dispatch,[]},
{ranch,[]},
{sasl,[{errlog_type,error},{sasl_error_logger,tty}]},
{ssl,[]},
{stdlib,[]},
{syntax_tools,[]},
{xmerl,[]}]

Connections:
pid name port peer_port host peer_host ssl peer_cert_subject peer_cert_issuer peer_cert_validity auth_mechanismssl_protocol ssl_key_exchange ssl_cipher ssl_hash protocol user vhost timeout frame_max channel_max client_properties connected_at recv_oct recv_cnt send_oct send_cnt send_pend state channels reductions garbage_collection
<ksaprice_rabbitmq@4eed789778c0.1.7700.0> 172.25.0.2:47982 -> 172.25.0.4:5672 5672 47982 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749265595 1897 10 606 7 0 running 1 235055 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,170}]
<ksaprice_rabbitmq@4eed789778c0.1.7755.0> 172.25.0.2:48764 -> 172.25.0.4:5672 5672 48764 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346461 289 5 554 4 0 running 1 226409 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]
<ksaprice_rabbitmq@4eed789778c0.1.7764.0> 172.25.0.2:48766 -> 172.25.0.4:5672 5672 48766 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346494 1647 21 1030 20 0 running 1 228859 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<ksaprice_rabbitmq@4eed789778c0.1.7767.0> 172.25.0.2:48768 -> 172.25.0.4:5672 5672 48768 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346494 569 9 662 8 0 running 1 226947 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,164}]
<ksaprice_rabbitmq@4eed789778c0.1.7770.0> 172.25.0.2:48770 -> 172.25.0.4:5672 5672 48770 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346495 1647 20 1030 20 0 running 1 228798 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,167}]
<ksaprice_rabbitmq@4eed789778c0.1.7787.0> 172.25.0.2:48772 -> 172.25.0.4:5672 5672 48772 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346511 85953 485 1042 21 0 running 1 280680 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,110}]
<ksaprice_rabbitmq@4eed789778c0.1.7806.0> 172.25.0.2:48774 -> 172.25.0.4:5672 5672 48774 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346548 665 7 566 5 0 running 1 226665 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,168}]
<ksaprice_rabbitmq@4eed789778c0.1.7815.0> 172.25.0.2:48776 -> 172.25.0.4:5672 5672 48776 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346551 1647 21 1030 20 0 running 1 228859 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<ksaprice_rabbitmq@4eed789778c0.1.7839.0> 172.25.0.2:48780 -> 172.25.0.4:5672 5672 48780 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346576 1691 9 566 5 0 running 1 226936 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,169}]
<ksaprice_rabbitmq@4eed789778c0.1.7842.0> 172.25.0.2:48778 -> 172.25.0.4:5672 5672 48778 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346576 1496 9 566 5 0 running 1 226885 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]

Channels:
pid name connection number user vhost reductions transactional confirm consumer_count messages_unacknowledged messages_unconfirmed messages_uncommitted acks_uncommitted prefetch_count global_prefetch_count state garbage_collection
<ksaprice_rabbitmq@4eed789778c0.1.7706.0> 172.25.0.2:47982 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7700.0> 1 admin ksaprice_rabbitmq_vh 4140 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<ksaprice_rabbitmq@4eed789778c0.1.7761.0> 172.25.0.2:48764 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7755.0> 1 admin ksaprice_rabbitmq_vh 1706 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<ksaprice_rabbitmq@4eed789778c0.1.7776.0> 172.25.0.2:48768 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7767.0> 1 admin ksaprice_rabbitmq_vh 4737 false false 1 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]
<ksaprice_rabbitmq@4eed789778c0.1.7788.0> 172.25.0.2:48770 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7770.0> 1 admin ksaprice_rabbitmq_vh 8608 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<ksaprice_rabbitmq@4eed789778c0.1.7793.0> 172.25.0.2:48766 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7764.0> 1 admin ksaprice_rabbitmq_vh 7977 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<ksaprice_rabbitmq@4eed789778c0.1.7812.0> 172.25.0.2:48772 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7787.0> 1 admin ksaprice_rabbitmq_vh 116017 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,1}]
<ksaprice_rabbitmq@4eed789778c0.1.7827.0> 172.25.0.2:48776 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7815.0> 1 admin ksaprice_rabbitmq_vh 7977 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<ksaprice_rabbitmq@4eed789778c0.1.7835.0> 172.25.0.2:48774 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7806.0> 1 admin ksaprice_rabbitmq_vh 3048 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<ksaprice_rabbitmq@4eed789778c0.1.7854.0> 172.25.0.2:48778 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7842.0> 1 admin ksaprice_rabbitmq_vh 2854 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<ksaprice_rabbitmq@4eed789778c0.1.7855.0> 172.25.0.2:48780 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7839.0> 1 admin ksaprice_rabbitmq_vh 3245 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]

Queues on ksaprice_rabbitmq_vh:
pid name durable auto_delete arguments owner_pid exclusive messages_ready messages_unacknowledged messages reductions policy exclusive_consumer_pid exclusive_consumer_tag consumers consumer_utilisation memory slave_pids synchronised_slave_pids recoverable_slaves state garbage_collection messages_ram messages_ready_ram messages_unacknowledged_ram messages_persistent message_bytes message_bytes_ready message_bytes_unacknowledged message_bytes_ram message_bytes_persistent head_message_timestamp disk_reads disk_writes backing_queue_status messages_paged_out message_bytes_paged_out
<ksaprice_rabbitmq@4eed789778c0.1.7670.0> default true false [] false 6 0 6 88075 0 89344 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}] 6 6 0 6 1231 1231 0 1231 1231 0 6 [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,6}, {len,6}, {target_ram_count,infinity}, {next_seq_id,6}, {avg_ingress_rate,9.303060867567184e-92}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}] 0 0
<ksaprice_rabbitmq@4eed789778c0.1.7861.0> celeryev.b957bbf3-8b97-4633-897f-a887b49e617b false true [{"x-message-ttl",5000},{"x-expires",60000}] false 0 0 0 4739 1 1.0 22160 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}] 0 0 0 0 0 0 0 0 0 0 0 [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {target_ram_count,infinity}, {next_seq_id,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}] 0 0

Queues on ksaprice_rabbitmq:

Queues on /:

Exchanges on ksaprice_rabbitmq_vh:
name type durable auto_delete internal arguments policy
direct true false false []
amq.direct direct true false false []
amq.fanout fanout true false false []
amq.headers headers true false false []
amq.match headers true false false []
amq.rabbitmq.trace topic true false true []
amq.topic topic true false false []
celery.pidbox fanout false false false []
celeryev topic true false false []
default direct true false false []
reply.celery.pidbox direct false false false []

Exchanges on ksaprice_rabbitmq:
name type durable auto_delete internal arguments policy
direct true false false []
amq.direct direct true false false []
amq.fanout fanout true false false []
amq.headers headers true false false []
amq.match headers true false false []
amq.rabbitmq.trace topic true false true []
amq.topic topic true false false []

Exchanges on /:
name type durable auto_delete internal arguments policy
direct true false false []
amq.direct direct true false false []
amq.fanout fanout true false false []
amq.headers headers true false false []
amq.match headers true false false []
amq.rabbitmq.log topic true false true []
amq.rabbitmq.trace topic true false true []
amq.topic topic true false false []

Bindings on ksaprice_rabbitmq_vh:
source_name source_kind destination_name destination_kind routing_key arguments vhost
exchange celeryev.b957bbf3-8b97-4633-897f-a887b49e617b queue celeryev.b957bbf3-8b97-4633-897f-a887b49e617b [] ksaprice_rabbitmq_vh
exchange default queue default [] ksaprice_rabbitmq_vh
celeryev exchange celeryev.b957bbf3-8b97-4633-897f-a887b49e617b queue # [] ksaprice_rabbitmq_vh
default exchange default queue default [] ksaprice_rabbitmq_vh

Bindings on ksaprice_rabbitmq:

Bindings on /:

Consumers on ksaprice_rabbitmq_vh:
queue_name channel_pid consumer_tag ack_required prefetch_count arguments
celeryev.b957bbf3-8b97-4633-897f-a887b49e617b <ksaprice_rabbitmq@4eed789778c0.1.7776.0> None4 false 0 []

Consumers on ksaprice_rabbitmq:

Consumers on /:

Permissions on ksaprice_rabbitmq_vh:
user configure write read
admin .* .* .*

Permissions on ksaprice_rabbitmq:

Permissions on /:
user configure write read
guest .* .* .*

Policies on ksaprice_rabbitmq_vh:

Policies on ksaprice_rabbitmq:

Policies on /:

Parameters on ksaprice_rabbitmq_vh:

Parameters on ksaprice_rabbitmq:

Parameters on /:

最佳答案

关于 Airflow 悬挂

我不了解 Airflow,但我相信您用于定位 RabbitMQ 的 URL 不正确:

broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//

RabbitMQ 使用TCP 端口 15672 作为其管理 Web UI,因此它是一个监听该端口的 HTTP 服务器。

AMQP 端口为 5672(标准端口)。所以我会尝试使用以下 URL:

broker_url = amqp://guest:***@ksaprice_rabbitmq//

即没有端口,因为客户端应该默认为标准端口。

关于工作人员未执行的任务

rabbitmqctl报告输出中,我们可以看到:

  • 有一个名为 default 的队列,其中有 6 条消息正在等待,但没有消费者订阅它。
  • 有一个名为 celeryev.b957bbf3-8b97-4633-897f-a887b49e617b 的队列,没有消息,并且有一个消费者在等待消息。

因此,任何一方(调度程序或工作人员)必定存在一些不正确或丢失的配置,因为它们当前没有相互通信:任务到达一个队列,但工作人员正在监视另一个队列。

关于rabbitmq - Airflow Scheduler 和 Web 服务器在对 RabbitMQ 上运行的任务进行排队时挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45478431/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com