- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我在 python spark 应用程序中创建了一个 kafka 流,可以解析通过它的任何文本。
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
我想更改它以便能够解析来自 kafka 主题的 avro 消息。从文件解析 avro 消息时,我会这样做:
reader = DataFileReader(open("customer.avro", "r"), DatumReader())
我是 python 和 spark 的新手,如何更改流以便能够解析 avro 消息?另外,如何指定从 Kafka 读取 Avro 消息时要使用的模式???我以前用 Java 做过所有这些,但 Python 让我感到困惑。
编辑:
我尝试更改以包含 avro 解码器
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
但我得到以下错误
TypeError: 'DatumReader' object is not callable
最佳答案
我遇到了同样的挑战 - 在 pyspark 中反序列化来自 Kafka 的 avro 消息,并使用 Confluent Schema Registry 模块的 Messageserializer 方法解决了它,因为在我们的例子中,模式存储在 Confluent Schema Registry 中。
您可以在 https://github.com/verisign/python-confluent-schemaregistry 找到该模块
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)
# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()
def decoder(s):
decoded_message = serializer.decode_message(s)
return decoded_message
kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)
lines = kvs.map(lambda x: x[1])
lines.pprint()
很明显,如您所见,这段代码使用了没有接收器的新直接方法,因此使用了 createdDirectStream(更多信息请参见 https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)
关于python - Spark Python Avro Kafka 解串器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30339636/
我正在尝试创建一个程序,其中字符串的前三个字符重复给定次数,如下所示: foo('Chocolate', 3) # => 'ChoChoCho' foo('Abc', 3) # => 'AbcAbcA
我有以下字符串: std::string str = "Mode:AAA:val:101:id:A1"; 我想分离一个位于 "val:" 和 ":id" 之间的子字符串,这是我的方法: std::st
DNA 字符串可以是任意长度,包含 5 个字母(A、T、G、C、N)的任意组合。 压缩包含 5 个字母(A、T、G、C、N)的 DNA 字母串的有效方法是什么?不是考虑每个字母表 3 位,我们可以使用
是否有一种使用 levenstein 距离将一个特定字符串与第二个较长字符串中的任何区域进行匹配的好方法? 例子: str1='aaaaa' str2='bbbbbbaabaabbbb' if str
使用 OAuth 并使用以下函数使用我们称为“foo”(实际上是 OAuth token )的字符串加密 key public function encrypt( $text ) { // a
我是一名优秀的程序员,十分优秀!