gpt4 book ai didi

python - flink与python,作业执行失败

转载 作者:行者123 更新时间:2023-12-01 08:15:14 25 4
gpt4 key购买 nike

第一次尝试时,我想从文件中读取 JSON 数据并将其传递给 Flink。我定义了一个源(逐行读取 JSON 字符串)和一个占位符过滤器。参见代码:

from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys

class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))


class Dummy_Filter(FilterFunction):
def filter(self, value):
return True

#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader()) \
.filter(Dummy_Filter()) \
.output()
env.execute()

当我构建作业并将其移动到我启动的 Flink 集群时,我收到以下错误消息:

VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh ./json_parser_flink.py Starting execution of program Failed to run plan: null Traceback (most recent call last): File "", line 1, in File "/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py", line 25, in main at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 31615948194c951be03d46576929aa23)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

我没有忘记调用execute()。

最佳答案

我发现了问题。 Fast 需要 SourceFunction 中有一个 run() 函数。

关于python - flink与python,作业执行失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55024413/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com