- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在我的应用程序中将 Spark Streaming 用于多个自定义接收器(2 个接收器用于不同的 UDP 数据套接字,1 个用于 HTTP 数据)。接收者的转换没有任何共同的资源。
当输入数据的数量增加时,我发现这3个接收器不是并行工作,而是一个接一个。
例如,如果我将批处理间隔设置为 20 秒,则每个接收器处理数据大约需要 5 秒,但是如果所有 3 个接收器一起启用,它们的汇总处理时间 = 3 * 5 秒(大约),而不是 5 秒。
所以我创建了这个测试,并看到了同样的情况。Environment: Core i5, 4 cores, 16 GB of memory.
4 个内核的 2 个 UDP 接收器(因此足以接收和处理)。 dstreams 的转换很奇怪并且没有缓存(持久化),但仅用于测试目的
问题:出了什么问题,如何启用并行处理?
Spark web ui 图片显示,接收者的信息一一处理。
@Slf4j
public class SparkApp {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]")
.setAppName("ParallelReceiver");
// no changes in processing
conf.set("spark.cores.max", "4");
// undocumented, has some effect for parallel processing (spark web ui),
// but not for the whole processing time
conf.set("spark.streaming.concurrentJobs", "10");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
RunCalc runCalc1 = new RunCalc(jssc, 5216, 2000, "1");
runCalc1.service();
RunCalc runCalc2 = new RunCalc(jssc, 5217, 2000, "2");
runCalc2.service();
jssc.start();
jssc.awaitTermination();
}
}
@Data
@Slf4j
public class RunCalc {
private final JavaStreamingContext jssc;
private final int port;
private final Integer defaultBitrate;
private final String suff;
public void service() {
// get stream nginx log data from UDP
JavaReceiverInputDStream<NginxRaw> records = jssc.receiverStream(new UdpReceiver(port, defaultBitrate));
records.print();
calc(records, suff);
records.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, suff));
}
private void calc(JavaReceiverInputDStream<NginxRaw> records, String suff) {
// first operation
JavaDStream<Integer> reduce = records.filter(r -> r.getChannelName() != null)
.map(NginxRaw::getBytesSent)
.reduce((r1, r2) -> r1 + r2);
reduce.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "reduce" + "-" + suff));
// second operation
JavaPairDStream<String, NginxRaw> uidRawPairs = records.mapToPair(r -> new Tuple2<>(r.getMac()
.toUpperCase(), r))
.window(Durations.minutes(1), Durations.minutes(1));
JavaPairDStream<String, Iterable<NginxRaw>> groups = uidRawPairs.groupByKey();
JavaPairDStream<String, Long> uidSizePairs = groups.mapValues(v -> v.spliterator()
.getExactSizeIfKnown());
uidSizePairs.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "uidSizeWindowCalc" + "-" + suff));
}
}
@Slf4j
public class UdpReceiver extends Receiver<NginxRaw> {
private final int port;
private final int defaultBitrate;
private DatagramSocket socket;
public UdpReceiver(int port, int defaultBitrate) {
super(StorageLevel.MEMORY_AND_DISK());
this.port = port;
this.defaultBitrate = defaultBitrate;
}
@Override
public void onStart() {
new Thread(this::receive).start();
}
@Override
public void onStop() {
}
private void receive() {
try {
log.debug("receive");
log.debug("thread: {}", Thread.currentThread());
String row;
initSocket();
byte[] receiveData = new byte[5000];
// Until stopped or connection broken continue reading
while (!isStopped()) {
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
socket.receive(receivePacket);
byte[] data = receivePacket.getData();
row = new String(data, 0, receivePacket.getLength());
NginxRaw rawLine = new NginxRaw(row, defaultBitrate);
filterAndSave(rawLine);
}
socket.close();
// Restart in an attempt to connect again when server is active again
log.debug("Trying to connect again");
restart("Trying to connect again");
} catch (ConnectException e) {
// restart if could not connect to server
log.error("Could not connect", e);
reportError("Could not connect: ", e);
restart("Could not connect", e);
} catch (Throwable e) {
// restart if there is any other error
log.error("Error receiving data", e);
reportError("Error receiving data: ", e);
restart("Error receiving data", e);
}
}
/**
* connect to the server
*/
private void initSocket() {
if (socket == null) {
try {
socket = new DatagramSocket(null);
socket.setReuseAddress(true);
socket.setBroadcast(true);
socket.bind(new InetSocketAddress(port));
} catch (SocketException e) {
log.debug("Error = {}", e);
e.printStackTrace();
}
}
}
private void filterAndSave(NginxRaw rawLine) {
if (!rawLine.getMac()
.equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
&&
!rawLine.getChannelName()
.equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
&& !rawLine.getChannelName()
.equals("vod")
&& !rawLine.getIp()
.equals("127.0.0.1")) {
store(rawLine);
}
}
}
最佳答案
我有一个类似的问题:同一个队列有多个接收器,但数据是串行处理的。
修复方法非常简单:我将所有流合并为一个流!
在我有这个之前:
sizeStream.foreachRDD(rdd -> {
...
});
for (JavaPairDStream<String, Long> dstream : streams) {
dstream.foreachRDD(rdd -> {
...
});
}
JavaPairDStream<String, Long> countStream = streamingContext.union(streams.get(0), streams.subList(1,streams.size()));
JavaPairDStream<String, Tuple2<Long, Long>> joinStream = sizeStream.join(countStream);
joinStream.foreachRDD(rdd -> {
...
});
关于parallel-processing - 多个接收器的 Spark 流并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48024424/
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 5年前关闭。 Improve t
我是一名设计老师,试图帮助学生应对编程挑战,所以我编码是为了好玩,但我不是专家。 她需要找到 mode (最常见的值)在使用耦合到 Arduino 的传感器的数据构建的数据集中,然后根据结果激活一些功
我正在开发一个应用程序,该应用程序提供 CPU 使用率最高的 5 个应用程序名称。目前,我通过以下代码获得了排名前 5 的应用程序: var _ = require('lodash');
互联网上很少有例子涉及这个问题的所有三个问题——即 set-process-sentinel ; set-process-filter ;和 start-process . 我尝试了几种不同的方法来微
如 this post 中所述,在 C# 中有两种调用另一个进程的方法。 Process.Start("hello"); 和 Process p = new Process(); p.StartInf
我试图让我的桨从白色变为渐变(线性),并使球具有径向渐变。感谢您的帮助!您可以在 void drawPaddle 中找到桨的代码。 这是我的目标: 这是我的代码: //球 int ballX = 50
考虑:流程(a)根据我的文字: A process is first entered at the time of simulation, at which time it is executed u
我真的希望 Processing 有用于处理数组的 push 和 pop 方法,但由于它没有,我不得不试图找出删除数组中特定位置的对象的最佳方法。我相信这对很多人来说都是基本的,但我可以使用一些帮助,
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 10 年前。 Improve thi
以编程方式,我如何确定 Windows 10 中的 3 个类别 应用 后台进程 Windows 服务 就像任务管理器一样? 即我需要一些 C# 代码,我可以确定应用程序列表与后台进程列表。检查 Win
当我导入 node:process它工作正常。但是,当我尝试要求相同时,它会出错。 这工作正常: import process from 'node:process'; 但是当我尝试要求相同时,它会引
我正在上一门使用处理的类(class)。 我在理解 map() 函数时遇到问题。 根据它的文档( http://www.processing.org/reference/map_.html ): Re
我试图执行: composer.phar update 并收到: Fatal error: Allowed memory size of 94371840 bytes exhausted (tried
给定一堆二维图像,如何使用 Processing/Processing.js 产生体积渲染效果? 目前我的想法是使用 java(类似于 imageJ)进行体积渲染 -> 获取体积渲染图像的面作为单独的
这是代码示例 var startInfo = new ProcessStartInfo { Arguments = commandStr, FileName = @"C:\Window
当我在 Processing(草图 > 导入库 > 添加库)中添加库时,它安装在哪里? 最佳答案 它们安装在您的 中速写本位置 . 您可以通过转到"file">“首选项”来查看和更改您的速写本位置。草
无聊的好奇... 我正在查看当前进程的一些属性: using(Process p = Process.GetCurrentProcess()) { // Inspect properties
我正在尝试在同一页面上运行多个草图。 初始化脚本指定: /* * This code searches for all the * in your page and loads each scrip
Process.Kill 后是否需要使用 Process.WaitForExit? 如果调用进程在调用 Process.Kill 后立即退出怎么办? 这会导致 Process.Kill 失败吗? 编辑
我尝试使用处理从麦克风获取频率。我混合了文档中的两个示例,但“最高”并不是真正的赫兹(a 是 440 赫兹)。 你知道如何拥有比这更好的东西吗? import ddf.minim.*; import
我是一名优秀的程序员,十分优秀!