gpt4 book ai didi

Apache Beam portable runner to run golang based job on flink(阿帕奇光束便携式跑步器在Flink上运行基于Golang的作业)

转载 作者:bug小助手 更新时间:2023-10-27 20:35:15 25 4
gpt4 key购买 nike



I am trying to learn Apache Beam and trying to create a sample project to learn stream processing. For now, I want to read from a Kafka topic "word" and print the data on console.
I have deployed flink(8081) as a standalone cluster and kafka(port 9091) on local using docker.

我正在努力学习阿帕奇光束,并试图创建一个学习流处理的样例项目。现在,我想从卡夫卡的主题“Word”中阅读,并在控制台上打印数据。我已经使用docker将Flink(8081)部署为独立集群,并将Kafka(端口9091)部署在本地。


Since there is no proper documentation with clear example to do this I tried my own, you can find the code below.

由于没有包含明确示例的适当文档来完成此操作,我尝试了自己的文档,因此您可以找到下面的代码。


package main

import (
"context"
"flag"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
"google.golang.org/appengine/log"
)

var (
// expansionAddr = flag.String("expansion_addr", "",
// "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
// bootstrapServers = flag.String("bootstrap_servers", "",
// "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
// topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")

)

func init() {
register.DoFn2x0[context.Context, []byte](&LogFn{})
}

// LogFn is a DoFn to log rides.
type LogFn struct{}

// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) {
log.Infof(ctx, "Word info: %v", string(elm))
}

// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
time.Sleep(2 * time.Second)
}

func main() {
flag.Parse()
//beam initialization
beam.Init()
ctx := context.Background()
//creating pipeline object and scope
pipeline := beam.NewPipeline()
scope := pipeline.Root()

//reading from kafka IO --> This is not a native support as of now for beam and golang.
//it uses a cross-compiled library from java to acheive the kafka connector

//defining kafka details
brokerAddr := ""
bootstrapServer := "bootstrap-server:kafka-1:9091"
topic := "word"
// input reader is our consumer which reads from the input topic, this is defined as per kafkaio docs.
inputReader := kafkaio.Read(scope, brokerAddr, bootstrapServer, []string{topic})
vals := beam.DropKey(scope, inputReader)
beam.ParDo0(scope, &LogFn{}, vals)

if _, err := flink.Execute(ctx, pipeline); err != nil {
log.Errorf(ctx, "Failed to execute job: %v", err)
}
}


I am getting this error while executing the code.

我在执行代码时遇到此错误。


`
2023/09/10 01:06:49 Downloaded: C:\tmp\artifacts\beam-sdks-java-io-expansion-service-2.49.0-m4yWpU_pIZFwgP3wHCijYZg7hfO6Eg5-Dx3eXCSRTb0.jar (sha256: 9b8c96a54fe921917080fdf01c28a361983b85f3ba120e7e0f1dde5c24914dbd, size: 59370861)
panic: not an App Engine context

`2023/09/10 01:06:49下载:C:\tmp\artifacts\beam-sdks-java-io-expansion-service-2.49.0-m4yWpU_pIZFwgP3wHCijYZg7hfO6Eg5-Dx3eXCSRTb0.jar(sha256:9b8c96a54fe921917080fdf01c28a361983b85f3ba120e7e0f1dde5c24914dbd,大小:59370861)死机:不是App Engine上下文


`


Went over the documentation but couldn't find anything much on this.
Tried changing the context in flink.Execute() to appengine base context as well but doesn't seem to work.

我翻了一遍文件,但找不到任何关于这方面的信息。还尝试将flink.Execute()中的上下文更改为appEngine基本上下文,但似乎不起作用。


更多回答
优秀答案推荐
更多回答

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com