- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在编写一个代码,我在其中尝试使用 kafka 和 spark 来消费消息。但是我的代码不起作用。这是我的代码:
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util._
object Smack_Kafka_Spark extends App {
def main(args: Array[String]) {
val kafkaBrokers = "localhost:2181"
val kafkaOpTopic = "test"
/*val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")*/
val props = new Properties()
props.put("bootstrap.servers", "localhost:2181")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
var spark: SparkSession = null
val textFile: RDD[String] = spark.sparkContext.textFile("dataset.txt")
textFile.foreach(record => {
val data = record.toString
val message = new ProducerRecord[String, String](kafkaOpTopic, null, data)
producer.send(message)
})
producer.close()
}
}
这是我得到的错误:
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at Smack_Kafka_Spark$.main(Smack_Kafka_Spark.scala:25)
at Smack_Kafka_Spark.main(Smack_Kafka_Spark.scala)
如有任何帮助,我将不胜感激!
最佳答案
您将收到 NullPointerException
,因为 SparkSession
为空。像下面这样创建它。
val spark : SparkSession = SparkSession.builder()
.appName("Smack_Kafka_Spark")
.master("local[*]")
.getOrCreate()
现在阅读您的文本文件,如下所示。
val textFile: Dataset[String] = spark.read.textFile("dataset.txt")
您在运行程序时可能会遇到的另一个问题是
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
KafkaProducer
不可序列化。您需要将您的 KafkaProducer 实例创建移动到 foreachPartition 中。请查看 SO 帖子 spark kafka producer serializable
关于scala - 找不到记录器的附加程序 (org.apache.kafka.clients.producer.ProducerConfig),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41655654/
有没有办法配置 python 记录器来调用自定义函数并在它记录时将记录消息传递给它? 谢谢! 最佳答案 子类 logging.Handler 并实现 emit方法: import logging cl
我有一个这样的记录器设置: import logging from logging.handlers import RotatingFileHandler import sys # root logg
已关闭。此问题旨在寻求有关书籍、工具、软件库等的建议。不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以
我一直在使用 Netbeans 作为 Java IDE 进行开发。当 Netbeans 在 try-catch block 中包含一条语句时,它在捕获区域中使用类似 Logger.getLogger(
我正在记下远程日志记录的功能从头开始构建时可能需要库。我查了这个:http://www.aggsoft.com/serial-data-logger.htm 我想知道a之间有什么区别远程日志记录库和远
我需要跟踪包含数千个 JAR 和 .CLASS 文件的已编译 Java 应用程序,您知道有什么合适的工具可以附加到 JVM 来跟踪函数调用(无需源代码)吗? 最佳答案 是的。 Jprofiler无需源
我想使用记录器并设置其属性,但不想在每个对象类中创建记录器,我的目标是:在Spring中创建一个bean,创建属性文件我如何使用注释,可以做到吗? 最佳答案 是的,你可以做到, 在你的
首先,我已经阅读了real python article on the subject . 了解到记录器具有层次结构后,我想在这样的层次结构中创建一个名为 MyProjectLogger 的新记录器:
我设置了一个记录器。像这样: def initLogger(self): self.logger = logging.getLogger('MyApp') if not self.lo
是否允许为您的日志创建一个静态类? public final class Log { public static final Logger LOGGER = Logger.getLogger(
我对 java.util.logging 有一些无法解释的行为。让我们看一下这两个示例: 首先: boolean var = false; log.log( Level.WARNING, "Cant
我正在尝试开始在 python 中使用日志记录并阅读了几篇博客。一个让我感到困惑的问题是是按功能还是按模块创建记录器。在这个Blog: Good logging practice in Python建
我正在使用 https://pub.dartlang.org/packages/logging ,但它不会在我的日志中显示任何内容。这是我的代码: class Test { final Logge
public static Logger getLogger() { final Throwable t = new Throwable(); final StackTraceElem
我是 Poco 的新手,我在 Poco 在线帮助中看到了以下示例: int main(int argc, char** argv) { AutoPtr pChannel(new SimpleF
我有一个带有函数、文档字符串和文档测试的 Julia 模块文件。我加载它并在 Julia 帮助中显示文档字符串,但 Documenter.jl 找不到文档字符串。 一个示例模块文件,src/my_mo
我正在尝试在我的 Django 项目( django 1.11 、 Python 3.6 )中实现日志记录。我正在使用默认的 django 记录器。 获取 username在日志中,我使用了 djan
是否可以有多个 serilog 记录器?目前,在我的 WebApi 中,我可以调用 Log.Information 来记录信息事件,但是有没有一种方法可以改为制作不同的日志并从我的 Controlle
我有一个 python 应用程序,其文件结构类似于以下内容: /Project file1.py file2.py file3.py ... 该应用程序使用 Pytho
如何访问 python Bokeh 记录器? 我尝试使用 basicConfig包装器,但它似乎不起作用。 from bokeh.util import logconfig logconfig.bas
我是一名优秀的程序员,十分优秀!