- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
能够使用正确的 TCP 防火墙权限在 Google Chrome 中运行 Google Datalab(笔记本)。使用简单的脚本,这将启动最新的 spark 集群(1 个主节点和 3 个使用 Dataproc 的工作节点)。首先我们在spark-submit
中测试下面的代码,然后在启动 DataLab 后,我不确定如何修复以下错误。
第一步:从 Cloud Shell 启动 Dataproc 集群
gcloud dataproc clusters create cluster1021 \
--subnet default --zone us-west1-a \
--master-machine-type n1-standard-2 \
--master-boot-disk-size 30 --num-workers 2 \
--worker-machine-type n1-standard-2 \
--worker-boot-disk-size 30 --image-version 1.3-deb9 \
--project bigdata-228217 \
--initialization-actions 'gs://dataproc-initialization-actions/datalab/datalab.sh','gs://dataproc-initialization-actions/connectors/connectors.sh' \
--metadata 'gcs-connector-version=1.9.11' \
--metadata 'bigquery-connector-version=0.13.11'
成功启动后,我进行了测试,发现 Bigquery 连接器正在使用 spark-submit wordcount.py
来自 Google here. 的示例
第二步:将此代码作为 wordcount.py
包含在主目录中与 touch wordcount.py
,然后粘贴以下来自 nano wordcount.py
的代码并保存。
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda record: json.loads(record[1]))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'
sql_context = SQLContext(sc)
(word_counts
.toDF(['word', 'word_count'])
.write.format('json').save(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--replace '
'--autodetect '
'{dataset}.{table} {files}'.format(
dataset=output_dataset, table=output_table, files=output_files
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
现在,从 shell,spark-submit
的输出以下是结果 -- 表明 BigQuery 连接器有效。
spark-submit wordcount.py
...
(pinnace,3)
(bone,21)
(lug,2)
(vailing,2)
(bombast,3)
(gaping,11)
(hem,5)
('non,1)
(stinks,1)
(forsooth,48)
第 3 步设置防火墙以允许在浏览器中查看 TCP DataLab
为 DataLab 创建防火墙规则
在设置页面上,您将为 DataLab 防火墙规则创建一个名称,并允许使用以下 TCP 端口,并在您的网络 IP 地址后立即添加“/32”——您可以找到 here .
第 4 步:使用 <YOUR IP>:8080
在 Google Chrome 中启动 DataLab您应该会看到 DataLab Notebook
你会看到这个。现在打开一个新笔记本,在第一个单元格中我停止了 spark 上下文并将上面的莎士比亚代码粘贴到第二个单元格中。
这是输出。问题,我需要做什么才能让 Bigquery 连接器在 Datalab 中与 Pyspark 一起工作?
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-4-62761a09a7c5> in <module>()
36 'org.apache.hadoop.io.LongWritable',
37 'com.google.gson.JsonObject',
---> 38 conf=conf)
39
40 # Perform word count.
/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
735 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
736 valueClass, keyConverter, valueConverter,
--> 737 jconf, batchSize)
738 return RDD(jrdd, self)
739
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:313)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:296)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
最佳答案
从this line来看, Datalab init 操作将 BQ 和 GCS 连接器安装到 Docker 容器中。
由于 Dataproc 1.3 默认不附带 BQ 连接器,并且由于您指定了连接器初始化操作,即在集群上安装 BQ 连接器,因此在 DataLab 初始化操作之后,Docker 无法在 Datalab 初始化操作执行期间将 BQ 连接器挂载到 Datalab 容器中。
要解决此问题,您需要更改初始化操作的顺序:
gcloud dataproc clusters create \
. . .
--initialization-actions=gs://dataproc-initialization-actions/datalab/connectors.sh,gs://dataproc-initialization-actions/connectors/datalab.sh
作为一个小改进,您目前不需要指定 GCS 连接器版本 (--metadata 'gcs-connector-version=1.9.11'
),因为最新的 Dataproc 1.3 镜像已经预装了 GCS 连接器 1.9.11。
关于python - 在 DataLab 笔记本中使用 Dataproc 和 Spark BigQuery 连接器时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54155753/
SQLite、Content provider 和 Shared Preference 之间的所有已知区别。 但我想知道什么时候需要根据情况使用 SQLite 或 Content Provider 或
警告:我正在使用一个我无法完全控制的后端,所以我正在努力解决 Backbone 中的一些注意事项,这些注意事项可能在其他地方更好地解决......不幸的是,我别无选择,只能在这里处理它们! 所以,我的
我一整天都在挣扎。我的预输入搜索表达式与远程 json 数据完美配合。但是当我尝试使用相同的 json 数据作为预取数据时,建议为空。点击第一个标志后,我收到预定义消息“无法找到任何内容...”,结果
我正在制作一个模拟 NHL 选秀彩票的程序,其中屏幕右侧应该有一个 JTextField,并且在左侧绘制弹跳的选秀球。我创建了一个名为 Ball 的类,它实现了 Runnable,并在我的主 Draf
这个问题已经有答案了: How can I calculate a time span in Java and format the output? (18 个回答) 已关闭 9 年前。 这是我的代码
我有一个 ASP.NET Web API 应用程序在我的本地 IIS 实例上运行。 Web 应用程序配置有 CORS。我调用的 Web API 方法类似于: [POST("/API/{foo}/{ba
我将用户输入的时间和日期作为: DatePicker dp = (DatePicker) findViewById(R.id.datePicker); TimePicker tp = (TimePic
放宽“邻居”的标准是否足够,或者是否有其他标准行动可以采取? 最佳答案 如果所有相邻解决方案都是 Tabu,则听起来您的 Tabu 列表的大小太长或您的释放策略太严格。一个好的 Tabu 列表长度是
我正在阅读来自 cppreference 的代码示例: #include #include #include #include template void print_queue(T& q)
我快疯了,我试图理解工具提示的行为,但没有成功。 1. 第一个问题是当我尝试通过插件(按钮 1)在点击事件中使用它时 -> 如果您转到 Fiddle,您会在“内容”内看到该函数' 每次点击都会调用该属
我在功能组件中有以下代码: const [ folder, setFolder ] = useState([]); const folderData = useContext(FolderContex
我在使用预签名网址和 AFNetworking 3.0 从 S3 获取图像时遇到问题。我可以使用 NSMutableURLRequest 和 NSURLSession 获取图像,但是当我使用 AFHT
我正在使用 Oracle ojdbc 12 和 Java 8 处理 Oracle UCP 管理器的问题。当 UCP 池启动失败时,我希望关闭它创建的连接。 当池初始化期间遇到 ORA-02391:超过
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 9 年前。 Improve
引用这个plunker: https://plnkr.co/edit/GWsbdDWVvBYNMqyxzlLY?p=preview 我在 styles.css 文件和 src/app.ts 文件中指定
为什么我的条形这么细?我尝试将宽度设置为 1,它们变得非常厚。我不知道还能尝试什么。默认厚度为 0.8,这是应该的样子吗? import matplotlib.pyplot as plt import
当我编写时,查询按预期执行: SELECT id, day2.count - day1.count AS diff FROM day1 NATURAL JOIN day2; 但我真正想要的是右连接。当
我有以下时间数据: 0 08/01/16 13:07:46,335437 1 18/02/16 08:40:40,565575 2 14/01/16 22:2
一些背景知识 -我的 NodeJS 服务器在端口 3001 上运行,我的 React 应用程序在端口 3000 上运行。我在 React 应用程序 package.json 中设置了一个代理来代理对端
我面临着一个愚蠢的问题。我试图在我的 Angular 应用程序中延迟加载我的图像,我已经尝试过这个2: 但是他们都设置了 src attr 而不是 data-src,我在这里遗漏了什么吗?保留 d
我是一名优秀的程序员,十分优秀!