gpt4 book ai didi

parallel-processing - 多个接收器的 Spark 流并行处理

转载 作者:行者123 更新时间:2023-12-03 19:36:41 28 4
gpt4 key购买 nike

我在我的应用程序中将 Spark Streaming 用于多个自定义接收器(2 个接收器用于不同的 UDP 数据套接字,1 个用于 HTTP 数据)。接收者的转换没有任何共同的资源。

当输入数据的数量增加时,我发现这3个接收器不是并行工作,而是一个接一个。

例如,如果我将批处理间隔设置为 20 秒,则每个接收器处理数据大约需要 5 秒,但是如果所有 3 个接收器一起启用,它们的汇总处理时间 = 3 * 5 秒(大约),而不是 5 秒。

所以我创建了这个测试,并看到了同样的情况。
Environment: Core i5, 4 cores, 16 GB of memory.4 个内核的 2 个 UDP 接收器(因此足以接收和处理)。 dstreams 的转换很奇怪并且没有缓存(持久化),但仅用于测试目的

问题:出了什么问题,如何启用并行处理?

Spark web ui 图片显示,接收者的信息一一处理。

Spark web ui picture

@Slf4j
public class SparkApp {

public static void main(String[] args) throws InterruptedException {

SparkConf conf = new SparkConf().setMaster("local[*]")
.setAppName("ParallelReceiver");

// no changes in processing
conf.set("spark.cores.max", "4");

// undocumented, has some effect for parallel processing (spark web ui),
// but not for the whole processing time
conf.set("spark.streaming.concurrentJobs", "10");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

RunCalc runCalc1 = new RunCalc(jssc, 5216, 2000, "1");
runCalc1.service();

RunCalc runCalc2 = new RunCalc(jssc, 5217, 2000, "2");
runCalc2.service();

jssc.start();
jssc.awaitTermination();

}

}

@Data
@Slf4j
public class RunCalc {

private final JavaStreamingContext jssc;
private final int port;
private final Integer defaultBitrate;
private final String suff;

public void service() {

// get stream nginx log data from UDP
JavaReceiverInputDStream<NginxRaw> records = jssc.receiverStream(new UdpReceiver(port, defaultBitrate));

records.print();
calc(records, suff);
records.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, suff));
}

private void calc(JavaReceiverInputDStream<NginxRaw> records, String suff) {

// first operation
JavaDStream<Integer> reduce = records.filter(r -> r.getChannelName() != null)
.map(NginxRaw::getBytesSent)
.reduce((r1, r2) -> r1 + r2);

reduce.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "reduce" + "-" + suff));

// second operation
JavaPairDStream<String, NginxRaw> uidRawPairs = records.mapToPair(r -> new Tuple2<>(r.getMac()
.toUpperCase(), r))
.window(Durations.minutes(1), Durations.minutes(1));

JavaPairDStream<String, Iterable<NginxRaw>> groups = uidRawPairs.groupByKey();

JavaPairDStream<String, Long> uidSizePairs = groups.mapValues(v -> v.spliterator()
.getExactSizeIfKnown());

uidSizePairs.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "uidSizeWindowCalc" + "-" + suff));

}

}

@Slf4j
public class UdpReceiver extends Receiver<NginxRaw> {

private final int port;

private final int defaultBitrate;

private DatagramSocket socket;

public UdpReceiver(int port, int defaultBitrate) {
super(StorageLevel.MEMORY_AND_DISK());
this.port = port;
this.defaultBitrate = defaultBitrate;
}


@Override
public void onStart() {
new Thread(this::receive).start();
}

@Override
public void onStop() {

}

private void receive() {

try {

log.debug("receive");
log.debug("thread: {}", Thread.currentThread());

String row;
initSocket();
byte[] receiveData = new byte[5000];

// Until stopped or connection broken continue reading
while (!isStopped()) {

DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);

socket.receive(receivePacket);

byte[] data = receivePacket.getData();

row = new String(data, 0, receivePacket.getLength());

NginxRaw rawLine = new NginxRaw(row, defaultBitrate);

filterAndSave(rawLine);

}

socket.close();

// Restart in an attempt to connect again when server is active again
log.debug("Trying to connect again");
restart("Trying to connect again");

} catch (ConnectException e) {

// restart if could not connect to server
log.error("Could not connect", e);
reportError("Could not connect: ", e);
restart("Could not connect", e);

} catch (Throwable e) {

// restart if there is any other error
log.error("Error receiving data", e);
reportError("Error receiving data: ", e);
restart("Error receiving data", e);

}

}

/**
* connect to the server
*/
private void initSocket() {

if (socket == null) {

try {

socket = new DatagramSocket(null);
socket.setReuseAddress(true);
socket.setBroadcast(true);
socket.bind(new InetSocketAddress(port));

} catch (SocketException e) {

log.debug("Error = {}", e);
e.printStackTrace();

}
}

}

private void filterAndSave(NginxRaw rawLine) {

if (!rawLine.getMac()
.equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
&&
!rawLine.getChannelName()
.equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
&& !rawLine.getChannelName()
.equals("vod")
&& !rawLine.getIp()
.equals("127.0.0.1")) {

store(rawLine);

}

}
}

最佳答案

我有一个类似的问题:同一个队列有多个接收器,但数据是串行处理的。
修复方法非常简单:我将所有流合并为一个流!

在我有这个之前:

  sizeStream.foreachRDD(rdd -> {
...
});
for (JavaPairDStream<String, Long> dstream : streams) {
dstream.foreachRDD(rdd -> {
...
});
}

现在我有了这个:
  JavaPairDStream<String, Long> countStream = streamingContext.union(streams.get(0), streams.subList(1,streams.size()));
JavaPairDStream<String, Tuple2<Long, Long>> joinStream = sizeStream.join(countStream);
joinStream.foreachRDD(rdd -> {
...
});

关于parallel-processing - 多个接收器的 Spark 流并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48024424/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com