- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 PySpark 和 Structured Streaming (Spark 2.3) 在两个 Kafka Stream 之间进行左外连接。
import os
import time
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode, get_json_object
from ast import literal_eval
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
spark = SparkSession \
.builder \
.appName("Spark Kafka Structured Streaming") \
.getOrCreate()
schema_impressions = StructType() \
.add("id_req", StringType()) \
.add("ts_imp_request", TimestampType()) \
.add("country", StringType()) \
.add("TS_IMPRESSION", TimestampType())
schema_requests = StructType() \
.add("id_req", StringType()) \
.add("page", StringType()) \
.add("conntype", StringType()) \
.add("TS_REQUEST", TimestampType())
impressions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
.option("subscribe", "ssp.datascience_impressions") \
.load()
requests = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
.option("subscribe", "ssp.datascience_requests") \
.option("startingOffsets", "latest") \
.load()
query_requests = requests \
.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed")) \
.select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \
.withWatermark("timestamp_req", "120 seconds")
query_impressions = impressions \
.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed")) \
.select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \
.withWatermark("timestamp_imp", "120 seconds")
query_requests.printSchema()
query_impressions.printSchema()
> root
|-- timestamp_req: timestamp (nullable = true)
|-- id_req: string (nullable = true)
|-- page: string (nullable = true)
|-- conntype: string (nullable = true)
|-- TS_REQUEST: timestamp (nullable = true)
>
> root |-- timestamp_imp: timestamp (nullable = true)
|-- id_imp: string (nullable = true)
|-- ts_imp_request: timestamp (nullable = true)
|-- country: string (nullable = true)
|-- TS_IMPRESSION: timestamp (nullable = true)
rawQuery = query_requests.join(query_impressions, expr("""
(id_req = id_imp AND
timestamp_imp >= timestamp_req AND
timestamp_imp <= timestamp_req + interval 5 minutes)
"""),
"leftOuter")
rawQuery = rawQuery \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \
.option("path", "/home/jovyan/streaming").start()
print(rawQuery.status)
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True} ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1062, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 908, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33968) Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2910, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "", line 3, in print(rawQuery.status) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py", line 114, in status return json.loads(self._jsq.status().json()) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1160, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/conda/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name)) py4j.protocol.Py4JError: An error occurred while calling o92.statusDuring handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 1828, in showtraceback stb = value._render_traceback_() AttributeError: 'Py4JError' object has no attribute '_render_traceback_'
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 852, in _get_connection connection = self.deque.pop() IndexError: pop from an empty deque
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 15g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1062, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 908, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving
最佳答案
我解决了问题!基本上,该问题出于某种原因与 Jupyter Notebook 有关。我删除了前面代码的下一行:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py
关于apache-spark - 结构化流错误 py4j.protocol.Py4JNetworkError : Answer from Java side is empty,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50171431/
我可以看到有状态的协议(protocol)可以减少像 cookie 这样的“模拟状态”。 但是测试变得更加难以确保您的实现正确并重新连接,并且 session 继续可能很难处理。 始终使用无状态协议(
我正在尝试为我的下一个分布式应用程序找到合适的协议(protocol)中间件。在过去的几天里,我找到了几个规范,想知道我是否错过了一个重要的规范?它应该是二进制协议(protocol),支持 RPC,
我正在做一个研究生院软件工程项目,我正在寻找管理 ATM 和银行网络之间通信的协议(protocol)。 我已经在谷歌上搜索了很长一段时间,虽然我找到了各种有关 ATM 的有趣信息,但我惊讶地发现似乎
我正在开发一个 ECG 模块,它以字节为单位给出数据。有一个关于它的协议(protocol)文档解释了如何构建从模块中出来的数据包。我想解码该数据。我很困惑 Protocol Buffer 是否会对此
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 3年前关闭。 Improve this qu
我需要在我的程序中包含基本的文件发送和文件接收例程,并且需要通过 ZMODEM 协议(protocol)。问题是我无法理解规范。 供引用,here is the specification . 规范没
我最近听到这个术语来描述 Google 的新环聊协议(protocol)和 Whisper System 的新 encrypted texting app . The new TextSecure p
如何检查某个对象是否符合协议(protocol)? 我试过这种方式,但出现错误: if lCell.conformsToProtocol(ContentProtocol) { } 最佳
在应用程序中,我们有两种类型的贴纸,字符串和位图。每个贴纸包都可以包含两种类型。这就是我声明模型的方式: // Mark: - Models protocol Sticker: Codable { }
这个问题在这里已经有了答案: Why can't a get-only property requirement in a protocol be satisfied by a property w
我有以下快速代码: protocol Animal { var name: String { get } } struct Bird: Animal { var name: String
我在遵循继承树的几个类中分配协议(protocol)。像这样: 头等舱 @protocol LevelOne - (void) functionA @end @interface BaseClass
我们之前使用的是 fix,但客户说使用 OUCH 进行交易,因为这样速度更快。我在互联网上查了一下,消息看起来很相似。它如何获得速度优势。请给我一些示例消息 最佳答案 基本上,FIX 消息以文本格式传
在我的 swift 项目中,我有一个使用协议(protocol)继承的案例,如下所示 protocol A : class{ } protocol B : A{ } 接下来我要实现的目标是声明另一个具
我想根据这两种协议(protocol)的一般特征(例如开销(数据包)、安全性、信息建模和可靠性)来比较 OPC UA 和 MQTT。我在哪里可以找到每个协议(protocol)的开销和其他特性的一些示
本质上,我的最终目标是拥有一个协议(protocol) Log,它强制所有符合它的对象都有一个符合另一个协议(protocol) [LogEvent] 的对象数组. 但是,符合Log的类需要有特定类型
我正在尝试为基于左操作数和右操作数标识的协议(protocol)实现 Equatable 协议(protocol)。换句话说:我如何为一个协议(protocol)实现 Equatable 协议(pro
问题不在于编程。 我正在使用一台旧机器,微软停止了这些机器的补丁。 有没有人针对攻击者已知的使用端口 445 的 SMB 协议(protocol)漏洞的解决方案? 任何棘手的解决方案? 换句话说,我想
在我们的业务中,我们需要记录到达我们服务器的每个请求/响应。 目前,我们使用 xml 作为标准实现。 如果我们需要调试/跟踪某些错误,则使用日志文件。 如果我们切换到 Protocol Buffer
你推荐什么协议(protocol)定义? 我评估了 Google 的 Protocol Buffer ,但它不允许我控制正在构建的数据包中字段的位置。我认为 Thrift 也是如此。我的要求是: 指定
我是一名优秀的程序员,十分优秀!