gpt4 book ai didi

python - 如何使用 Spark Streaming 读取流并在时间窗口内查找 IP?

转载 作者:太空狗 更新时间:2023-10-29 21:51:17 25 4
gpt4 key购买 nike

我是 Apache Spark 的新手,我想使用 PySpark 在 Python 中编写一些代码来读取流并查找 IP 地址。

我有一个 Java 类来生成一些假的 ip 地址,以便以后处理它们。这个类将在这里列出:

import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Random;

public class SocketNetworkTrafficSimulator {
public static void main(String[] args) throws Exception {
Random rn = new Random();
ServerSocket welcomeSocket = new ServerSocket(9999);
int[] possiblePortTypes = new int[]{21, 22, 80, 8080, 463};
int numberOfRandomIps=100;
String[] randomIps = new String[numberOfRandomIps];
for (int i=0;i<numberOfRandomIps;i++)
randomIps[i] = (rn.nextInt(250)+1) +"." +
(rn.nextInt(250)+1) +"." +
(rn.nextInt(250)+1) +"." +
(rn.nextInt(250)+1);
System.err.println("Server started");
while (true) {
try {
Socket connectionSocket = welcomeSocket.accept();
System.err.println("Server accepted connection");
DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream());
while (true) {
String str = "" + possiblePortTypes[rn.nextInt(possiblePortTypes.length)] + ","
+ randomIps[rn.nextInt(numberOfRandomIps)] + ","
+ randomIps[rn.nextInt(numberOfRandomIps)] + "\n";
outToClient.writeBytes(str);
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
}
}

}
}

目前我已经实现了以下函数来计算单词数,我在 Mac OsX 中使用以下命令运行 spark-submit spark_streaming.py <host> <port> <folder_name> <file_name> .我设法在两者之间建立连接并监听生成的 IP。现在我的主要问题是如何跟踪我听的项目。

from __future__ import print_function

import os
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# Get or register a Broadcast variable
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']


# Get or register an Accumulator
def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']


def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
print("Creating new context")
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
return False
else:
return True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
# with open(outputPath, 'a') as f:
# f.write(counts + "\n")

wordCounts.foreachRDD(echo)
return ssc


if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: recoverable_network_wordcount.py <hostname> <port> "
"<checkpoint-directory> <output-file>", file=sys.stderr)
sys.exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
lambda: createContext(host, int(port), output))
ssc.start()
ssc.awaitTermination()

最后,我想读取流并找到在过去 K 秒内发送或接收超过 J 个数据包的每个端口的 IP 地址。 J 和 K 是我定义的一些参数在我的代码中(比如 J=10 和 K=60 等)

最佳答案

我已经用这个方法解决了我的问题:

def getFrequentIps(stream, time_window, min_packets):
frequent_ips = (stream.flatMap(lambda line: format_stream(line))
# Count the occurrences of a specific pair
.countByValueAndWindow(time_window, time_window, 4)
# Filter above the threshold imposed by min_packets
.filter(lambda count: count[1] >= int(min_packets))
.transform(lambda record: record.sortBy(lambda x: x[1], ascending=False)))

number_items = 20
print("Every %s seconds the top-%s channles with more than %s packages will be showed: " %
(time_window, number_items, min_packets))
frequent_ips.pprint(number_items)

关于python - 如何使用 Spark Streaming 读取流并在时间窗口内查找 IP?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56690539/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com