gpt4 book ai didi

java - 齐柏林飞艇 : How to create DataFrame from within custom interpreter?

转载 作者:行者123 更新时间:2023-12-01 14:34:55 24 4
gpt4 key购买 nike

我正在为领域特定语言开发自定义解释器。根据 Apache Zeppelin 文档 (https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html) 中给出的示例,解释器运行良好。现在我想将一些结果存储在一个新的 DataFrame 中。

我找到了创建 DataFrame ( http://spark.apache.org/docs/latest/sql-programming-guide.html ) 的代码,但我不能在我的解释器中使用它,因为我基本上找不到从我的自定义解释器中访问有效运行时 SparkContext(通常称为“sc”)的方法。

我试过(静态)SparkContext.getOrCreate() 但这甚至导致了 ClassNotFoundException。然后我将整个 zeppelin-spark-dependencies...jar 添加到我的解释器文件夹,这解决了类加载问题,但现在我收到 SparkException(“必须设置主 url...”)。

知道如何从自定义解释器中访问笔记本的 SparkContext 吗?非常感谢!

更新

感谢 Kangrok Lee 在下面的评论,我的代码现在看起来如下:见下文。它运行并似乎创建了一个 DataFrame(至少它不再抛出任何异常)。但是我不能在后续的 SQL 段落中使用创建的 DataFrame(第一段使用我的“%opl”解释器,如下所示,它应该创建“结果”DataFrame):

%opl
1 2 3
> 1
> 2
> 3

%sql
select * from result
> Table not found: result; line 1 pos 14

所以我处理 SparkContext 的方式可能还是有问题。有任何想法吗?非常感谢!

package opl;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplInterpreter2 extends Interpreter {

static {
Interpreter.register("opl","opl",OplInterpreter2.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.master", "local[4]", "spark.master")
.add("spark.app.name", "Opl Interpreter", "spark.app.name")
.add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer")
.build());
}

private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);

private void log(Object o) {
if (logger != null)
logger.warn("OplInterpreter2 "+o);
}

public OplInterpreter2(Properties properties) {
super(properties);
log("CONSTRUCTOR");
}

@Override
public void open() {
log("open()");
}

@Override
public void cancel(InterpreterContext arg0) {
log("cancel()");
}

@Override
public void close() {
log("close()");
}

@Override
public List<String> completion(String arg0, int arg1) {
log("completion()");
return new ArrayList<String>();
}

@Override
public FormType getFormType() {
log("getFormType()");
return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext arg0) {
log("getProgress()");
return 100;
}

@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
log("interpret() "+string);
PrintStream oldSys = System.out;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
execute(string);
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
baos.toString());
} catch (Exception ex) {
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.ERROR,
InterpreterResult.Type.TEXT,
ex.toString());
}
}

private void execute(String code) throws Exception {
SparkContext sc = SparkContext.getOrCreate();
SQLContext sqlc = SQLContext.getOrCreate(sc);
StructType structType = new StructType().add("value",DataTypes.IntegerType);
ArrayList<Row> list = new ArrayList<Row>();
for (String s : code.trim().split("\\s+")) {
int value = Integer.parseInt(s);
System.out.println(value);
list.add(RowFactory.create(value));
}
DataFrame df = sqlc.createDataFrame(list,structType);
df.registerTempTable("result");
}
}

最佳答案

虽然我认为这不是一个很好的解决方案,但我终于找到了解决方案。在下面的代码中,我使用了在 org.apache.zeppelin.spark.PySparkInterpreter.java 中找到的函数 getSparkInterpreter()。

这要求我将打包的代码 (jar) 放入 Spark 解释器文件夹,而不是它自己的解释器文件夹,我认为这应该是首选方式(根据 https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html )。此外,我的解释器不会作为自己的解释器出现在 Zeppelin 的解释器配置页面中。但它仍然可以用在 Zeppelin 段落中。

并且:在代码中,我可以创建一个 DataFrame,这也可以在我的段落之外使用——这正是我想要实现的。

package opl;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplInterpreter2 extends Interpreter {

static {
Interpreter.register(
"opl",
"spark",//"opl",
OplInterpreter2.class.getName(),
new InterpreterPropertyBuilder()
.add("sth", "defaultSth", "some thing")
.build());
}

private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);

private void log(Object o) {
if (logger != null)
logger.warn("OplInterpreter2 "+o);
}

public OplInterpreter2(Properties properties) {
super(properties);
log("CONSTRUCTOR");
}

@Override
public void open() {
log("open()");
}

@Override
public void cancel(InterpreterContext arg0) {
log("cancel()");
}

@Override
public void close() {
log("close()");
}

@Override
public List<String> completion(String arg0, int arg1) {
log("completion()");
return new ArrayList<String>();
}

@Override
public FormType getFormType() {
log("getFormType()");
return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext arg0) {
log("getProgress()");
return 100;
}

@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
log("interpret() "+string);
PrintStream oldSys = System.out;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
execute(string);
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
baos.toString());
} catch (Exception ex) {
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.ERROR,
InterpreterResult.Type.TEXT,
ex.toString());
}
}

private void execute(String code) throws Exception {
SparkInterpreter sintp = getSparkInterpreter();
SQLContext sqlc = sintp.getSQLContext();
StructType structType = new StructType().add("value",DataTypes.IntegerType);
ArrayList<Row> list = new ArrayList<Row>();
for (String s : code.trim().split("\\s+")) {
int value = Integer.parseInt(s);
System.out.println(value);
list.add(RowFactory.create(value));
}
DataFrame df = sqlc.createDataFrame(list,structType);
df.registerTempTable("result");
}

private SparkInterpreter getSparkInterpreter() {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
}

关于java - 齐柏林飞艇 : How to create DataFrame from within custom interpreter?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37099590/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com