gpt4 book ai didi

streaming - 无法使用 Spark Streaming 处理特定数量的行

转载 作者:行者123 更新时间:2023-12-01 01:00:18 25 4
gpt4 key购买 nike

我在具有 1 个主节点和 2 个从节点的 3 节点集群上使用 Spark-1.0.0。我正在尝试在 Spark Streaming 上运行 LR 算法。

package org.apache.spark.examples.streaming;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
* Logistic regression based classification using ML Lib.
*/
public final class StreamingJavaLR {
static int i = 1;

// static LogisticRegressionModel model;

// private static final Pattern SPACE = Pattern.compile(" ");

static class ParsePoint implements Function<String, LabeledPoint> {
private static final Pattern COMMA = Pattern.compile(",");
private static final Pattern SPACE = Pattern.compile(" ");

@Override
public LabeledPoint call(String line) {
String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
String[] tok = SPACE.split(parts[1]);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}
}

// Edited
static class ParsePointforInput implements Function<String, double[]> {
private static final Pattern SPACE = Pattern.compile(" ");

@Override
public double[] call(String line) {
String[] tok = SPACE.split(line);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return x;
}
}

public static void main(String[] args) {

if (args.length != 5) {
System.err
.println("Usage: JavaLR <master> <input_file_for_training> <step_size> <no_iters> <input_file_for_prediction>");
System.exit(1);
}

FileWriter file;
PrintWriter outputFile = null;
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Calendar cal=Calendar.getInstance();

final Date startTime;

System.out.println("<<<<<Let's Print>>>>>");

// SparkConf conf = new SparkConf()
// .setMaster(args[0])
// .setAppName("StreamingJavaLR")
// .set("spark.cleaner.ttl", "1000")
// .set("spark.executor.uri", "hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz")
// .setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class));
//
// JavaSparkContext sc = new JavaSparkContext(conf);

JavaSparkContext sc = new JavaSparkContext(args[0],
"StreamingJavaLR",
System.getenv("SPARK_HOME"),
JavaSparkContext.jarOfClass(StreamingJavaLR.class));

System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Reading File");
JavaRDD<String> lines = sc.textFile(args[1]);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File has been Read now mapping");
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Mapping Done");
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Read the arguments. stepSize = "+stepSize+" and iterations = "+iterations);

BufferedReader br = null;

System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Training the Model");
final LogisticRegressionModel model = LogisticRegressionWithSGD.train(
points.rdd(), iterations, stepSize);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Model Trained");

System.out.println("Final w: " + model.weights());
// printWeights(model.weights());
System.out.println("Intercept : " + model.intercept());

final Vector weightVector = model.weights();

// double[] weightArray = model.weights();
//
// final DoubleMatrix weightMatrix = new DoubleMatrix(weightArray);

sc.stop();

try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}

// try {
// file = new FileWriter(args[5]);
// outputFile = new PrintWriter(file);
// cal = Calendar.getInstance();
// cal.getTime();
//// startTime = sdf.format(cal.getTime());
// startTime = cal.getTime();
// outputFile.println("Start Time : " + startTime);
// outputFile.flush();
// } catch (IOException E) {
// E.printStackTrace();
// }

// final JavaStreamingContext ssc = new JavaStreamingContext(sc,
// new Duration(1000));

startTime = cal.getTime();

final JavaStreamingContext ssc = new JavaStreamingContext(args[0],
"StreamingJavaLR", new Duration(1000),
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(StreamingJavaLR.class));

JavaDStream<String> lines_2 = ssc.textFileStream(args[4]);
JavaDStream<double[]> points_2 = lines_2.map(new ParsePointforInput());
// points_2.print();


// System.out.print(lines_2.count());
// System.exit(0);
points_2.foreachRDD(new Function<JavaRDD<double[]>, Void>() {

@Override
public Void call(JavaRDD rdd) {

List<double[]> temp = rdd.collect();

//If no more data is left for Prediction, Stop the Program
// if (rdd.count() == 0)
// ssc.stop();
FileWriter newfile = null;
BufferedWriter bw = null;

try {
newfile = new FileWriter(
"/home/pravesh/data/abc"
+ i++ + ".txt");
bw = new BufferedWriter(newfile);
} catch (IOException e) {
e.printStackTrace();
}
int inpNo = 0;
double result;
for (double[] dArray : temp) {
double[][] dataArray = new double[1][2];
for (int i = 0; i < dArray.length; i++)
dataArray[0][i] = dArray[i];
// DoubleMatrix dataMatrix = new DoubleMatrix(dataArray);
// result = model.predictPoint(dataMatrix, weightMatrix,
// model.intercept());

Vector dataVector = Vectors.dense(dArray);
result = model.predictPoint(dataVector, weightVector, model.intercept());

try {
Calendar cal2 = Calendar.getInstance();
// bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo + " (" + dataMatrix.get(0, 0)
// + ", " + dataMatrix.get(0, 1) + ")"
// + " belongs to : " + result + " and Start Time was " + startTime + "\n");

bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo + " (" + dataVector.toArray()[0]
+ ", " + dataVector.toArray()[1] + ")"
+ " belongs to : " + result + " and Start Time was " + startTime + "\n");

bw.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// newoutputFile.flush();
inpNo++;
}
try {
bw.close();
newfile.close();
} catch (IOException e) {
e.printStackTrace();
}
Void v = null;
return v;
}
});

ssc.start();
ssc.awaitTermination();

// cal = Calendar.getInstance();
// outputFile.println(" End Time : " + cal.getTime());
// outputFile.flush();

System.exit(0);
}
}

如您所见,我从文件中获取输入以使用 JavaSparkContext 训练模型并使用 JavaStreamingContext 测试模型。

我已经使用 $SPARK_HOME/mllib/data/lr-data/random.data 中给出的数据进行训练和测试。为了获得更大的数据集,我复制了这些数据。该代码适用于本地模式下每个可能的数据集。但是,在集群上,它无法处理包含 40 万个条目的文件。

对于每个其他数据集(此处有 80 万个条目的文件),输出类似于(启动 StreamingContext 后的输出):
14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113609-0001/0 on hostPort host-DSRV05.host.co.in:55206 with 8 cores, 512.0 MB RAM
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor added: app-20140606113609-0001/1 on worker-20140606114445-host-DSRV04.host.co.in-39342 (host-DSRV04.host.co.in:39342) with 8 cores
14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113609-0001/1 on hostPort host-DSRV04.host.co.in:39342 with 8 cores, 512.0 MB RAM
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated: app-20140606113609-0001/0 is now RUNNING
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated: app-20140606113609-0001/1 is now RUNNING
14/06/06 11:36:09 INFO RecurringTimer: Started timer for JobGenerator at time 1402034770000
14/06/06 11:36:09 INFO JobGenerator: Started JobGenerator at 1402034770000 ms
14/06/06 11:36:09 INFO JobScheduler: Started JobScheduler
14/06/06 11:36:10 INFO FileInputDStream: Finding new files took 29 ms
14/06/06 11:36:10 INFO FileInputDStream: New files at time 1402034770000 ms:
file:/newdisk1/praveshj/pravesh/data/input/testing8lk.txt
14/06/06 11:36:10 INFO MemoryStore: ensureFreeSpace(33216) called with curMem=0, maxMem=309225062
14/06/06 11:36:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.4 KB, free 294.9 MB)
14/06/06 11:36:10 INFO FileInputFormat: Total input paths to process : 1
14/06/06 11:36:10 INFO JobScheduler: Added jobs for time 1402034770000 ms
14/06/06 11:36:10 INFO JobScheduler: Starting job streaming job 1402034770000 ms.0 from job set of time 1402034770000 ms
14/06/06 11:36:10 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:10 INFO DAGScheduler: Got job 0 (collect at StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
14/06/06 11:36:10 INFO DAGScheduler: Final stage: Stage 0(collect at StreamingJavaLR.java:170)
14/06/06 11:36:10 INFO DAGScheduler: Parents of final stage: List()
14/06/06 11:36:10 INFO DAGScheduler: Missing parents: List()
14/06/06 11:36:10 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35), which has no missing parents
14/06/06 11:36:10 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
14/06/06 11:36:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV05.host.co.in:47657/user/Executor#-1277914179] with ID 0
14/06/06 11:36:10 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
14/06/06 11:36:10 INFO TaskSetManager: Serialized task 0.0:0 as 3544 bytes in 1 ms
14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV04.host.co.in:46975/user/Executor#1659982546] with ID 1
14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager host-DSRV05.host.co.in:52786 with 294.9 MB RAM
14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager host-DSRV04.host.co.in:42008 with 294.9 MB RAM
14/06/06 11:36:11 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:11 INFO FileInputDStream: New files at time 1402034771000 ms:

14/06/06 11:36:11 INFO JobScheduler: Added jobs for time 1402034771000 ms
14/06/06 11:36:12 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:36:12 INFO FileInputDStream: New files at time 1402034772000 ms:

14/06/06 11:36:12 INFO JobScheduler: Added jobs for time 1402034772000 ms
14/06/06 11:36:13 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:13 INFO FileInputDStream: New files at time 1402034773000 ms:

14/06/06 11:36:13 INFO JobScheduler: Added jobs for time 1402034773000 ms
14/06/06 11:36:14 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:14 INFO FileInputDStream: New files at time 1402034774000 ms:

14/06/06 11:36:14 INFO JobScheduler: Added jobs for time 1402034774000 ms
14/06/06 11:36:15 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:15 INFO FileInputDStream: New files at time 1402034775000 ms:

14/06/06 11:36:15 INFO JobScheduler: Added jobs for time 1402034775000 ms
14/06/06 11:36:15 INFO BlockManagerInfo: Added taskresult_0 in memory on host-DSRV05.host.co.in:52786 (size: 19.9 MB, free: 275.0 MB)
14/06/06 11:36:15 INFO SendingConnection: Initiating connection to [host-DSRV05.host.co.in/192.168.145.195:52786]
14/06/06 11:36:15 INFO SendingConnection: Connected to [host-DSRV05.host.co.in/192.168.145.195:52786], 1 messages pending
14/06/06 11:36:15 INFO ConnectionManager: Accepted connection from [host-DSRV05.host.co.in/192.168.145.195]
14/06/06 11:36:15 INFO BlockManagerInfo: Removed taskresult_0 on host-DSRV05.host.co.in:52786 in memory (size: 19.9 MB, free: 294.9 MB)
14/06/06 11:36:15 INFO DAGScheduler: Completed ResultTask(0, 0)
14/06/06 11:36:15 INFO TaskSetManager: Finished TID 0 in 4961 ms on host-DSRV05.host.co.in (progress: 1/1)
14/06/06 11:36:15 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/06/06 11:36:15 INFO DAGScheduler: Stage 0 (collect at StreamingJavaLR.java:170) finished in 5.533 s
14/06/06 11:36:15 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 5.548644244 s
14/06/06 11:36:16 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:36:16 INFO FileInputDStream: New files at time 1402034776000 ms:

14/06/06 11:36:16 INFO JobScheduler: Added jobs for time 1402034776000 ms
14/06/06 11:36:17 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:17 INFO FileInputDStream: New files at time 1402034777000 ms:

14/06/06 11:36:17 INFO JobScheduler: Added jobs for time 1402034777000 ms
14/06/06 11:36:18 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:18 INFO FileInputDStream: New files at time 1402034778000 ms:

14/06/06 11:36:18 INFO JobScheduler: Added jobs for time 1402034778000 ms
14/06/06 11:36:19 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:19 INFO FileInputDStream: New files at time 1402034779000 ms:

14/06/06 11:36:19 INFO JobScheduler: Added jobs for time 1402034779000 ms
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034770000 ms.0 from job set of time 1402034770000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 9.331 s for time 1402034770000 ms (execution: 9.274 s)
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 2.7293E-5 s
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034771000 ms.0 from job set of time 1402034771000 ms
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034771000 ms.0 from job set of time 1402034771000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 8.333 s for time 1402034771000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034772000 ms.0 from job set of time 1402034772000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.4859E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034772000 ms.0 from job set of time 1402034772000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 7.335 s for time 1402034772000 ms (execution: 0.002 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034773000 ms.0 from job set of time 1402034773000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.5294E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034773000 ms.0 from job set of time 1402034773000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 6.336 s for time 1402034773000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034774000 ms.0 from job set of time 1402034774000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.117E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034774000 ms.0 from job set of time 1402034774000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 5.337 s for time 1402034774000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034775000 ms.0 from job set of time 1402034775000 ms
14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were older than 1402034769000 ms:
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.1414E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034775000 ms.0 from job set of time 1402034775000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 4.338 s for time 1402034775000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034776000 ms.0 from job set of time 1402034776000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 4.2422E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034776000 ms.0 from job set of time 1402034776000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 3.338 s for time 1402034776000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034777000 ms.0 from job set of time 1402034777000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 3 from persistence list
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.1133E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034777000 ms.0 from job set of time 1402034777000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 2.339 s for time 1402034777000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034778000 ms.0 from job set of time 1402034778000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.124E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034778000 ms.0 from job set of time 1402034778000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 1.340 s for time 1402034778000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034779000 ms.0 from job set of time 1402034779000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.2101E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034779000 ms.0 from job set of time 1402034779000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 0.341 s for time 1402034779000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO BlockManager: Removing RDD 3
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 2 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 2
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 1 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 1
14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were older than 1402034770000 ms:
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 6 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 6
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 5 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 5
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 4 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 4
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034771000 ms: 1402034770000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 9 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 9
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 8 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 8
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 7 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 7
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034772000 ms: 1402034771000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 12 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 12
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 11 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 11
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 10 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 10
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034773000 ms: 1402034772000 ms
14/06/06 11:36:20 INFO JobScheduler: Finished job streaming job 1402034780000 ms.0 from job set of time 1402034780000 ms

对于具有 40 万个条目的文件,输出为(启动 StreamingContext 后的输出):
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added: app-20140606113855-0003/0 on worker-20140606114445-host-DSRV05.host.co.in-55206 (host-DSRV05.host.co.in:55206) with 8 cores
14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113855-0003/0 on hostPort host-DSRV05.host.co.in:55206 with 8 cores, 512.0 MB RAM
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added: app-20140606113855-0003/1 on worker-20140606114445-host-DSRV04.host.co.in-39342 (host-DSRV04.host.co.in:39342) with 8 cores
14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113855-0003/1 on hostPort host-DSRV04.host.co.in:39342 with 8 cores, 512.0 MB RAM
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated: app-20140606113855-0003/0 is now RUNNING
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated: app-20140606113855-0003/1 is now RUNNING
14/06/06 11:38:55 INFO RecurringTimer: Started timer for JobGenerator at time 1402034936000
14/06/06 11:38:55 INFO JobGenerator: Started JobGenerator at 1402034936000 ms
14/06/06 11:38:55 INFO JobScheduler: Started JobScheduler
14/06/06 11:38:56 INFO FileInputDStream: Finding new files took 31 ms
14/06/06 11:38:56 INFO FileInputDStream: New files at time 1402034936000 ms:
file:/newdisk1/praveshj/pravesh/data/input/testing4lk.txt
14/06/06 11:38:56 INFO MemoryStore: ensureFreeSpace(33216) called with curMem=0, maxMem=309225062
14/06/06 11:38:56 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.4 KB, free 294.9 MB)
14/06/06 11:38:56 INFO FileInputFormat: Total input paths to process : 1
14/06/06 11:38:56 INFO JobScheduler: Added jobs for time 1402034936000 ms
14/06/06 11:38:56 INFO JobScheduler: Starting job streaming job 1402034936000 ms.0 from job set of time 1402034936000 ms
14/06/06 11:38:56 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:38:56 INFO DAGScheduler: Got job 0 (collect at StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
14/06/06 11:38:56 INFO DAGScheduler: Final stage: Stage 0(collect at StreamingJavaLR.java:170)
14/06/06 11:38:56 INFO DAGScheduler: Parents of final stage: List()
14/06/06 11:38:56 INFO DAGScheduler: Missing parents: List()
14/06/06 11:38:56 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35), which has no missing parents
14/06/06 11:38:56 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
14/06/06 11:38:56 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/06/06 11:38:57 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:38:57 INFO FileInputDStream: New files at time 1402034937000 ms:

14/06/06 11:38:57 INFO JobScheduler: Added jobs for time 1402034937000 ms
14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV05.host.co.in:39424/user/Executor#-500165450] with ID 0
14/06/06 11:38:57 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
14/06/06 11:38:57 INFO TaskSetManager: Serialized task 0.0:0 as 3544 bytes in 0 ms
14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV04.host.co.in:45532/user/Executor#1654371091] with ID 1
14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager host-DSRV05.host.co.in:53857 with 294.9 MB RAM
14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager host-DSRV04.host.co.in:38057 with 294.9 MB RAM
14/06/06 11:38:58 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:38:58 INFO FileInputDStream: New files at time 1402034938000 ms:

14/06/06 11:38:58 INFO JobScheduler: Added jobs for time 1402034938000 ms
14/06/06 11:38:59 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:38:59 INFO FileInputDStream: New files at time 1402034939000 ms:

14/06/06 11:38:59 INFO JobScheduler: Added jobs for time 1402034939000 ms
14/06/06 11:39:00 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:00 INFO FileInputDStream: New files at time 1402034940000 ms:

14/06/06 11:39:00 INFO JobScheduler: Added jobs for time 1402034940000 ms
14/06/06 11:39:01 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:01 INFO FileInputDStream: New files at time 1402034941000 ms:

14/06/06 11:39:01 INFO JobScheduler: Added jobs for time 1402034941000 ms
14/06/06 11:39:02 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:02 INFO FileInputDStream: New files at time 1402034942000 ms:

14/06/06 11:39:02 INFO JobScheduler: Added jobs for time 1402034942000 ms
14/06/06 11:39:03 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:03 INFO FileInputDStream: New files at time 1402034943000 ms:

14/06/06 11:39:03 INFO JobScheduler: Added jobs for time 1402034943000 ms
14/06/06 11:39:04 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:04 INFO FileInputDStream: New files at time 1402034944000 ms:

14/06/06 11:39:04 INFO JobScheduler: Added jobs for time 1402034944000 ms
14/06/06 11:39:05 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:39:05 INFO FileInputDStream: New files at time 1402034945000 ms:

14/06/06 11:39:05 INFO JobScheduler: Added jobs for time 1402034945000 ms
14/06/06 11:39:06 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:39:06 INFO FileInputDStream: New files at time 1402034946000 ms:

这将永远持续下去。它不会在它应该打印的文件中打印输出。
工作日志不会输出任何不同的内容。

知道可能是什么问题吗?

——

谢谢

最佳答案

好吧,我能够通过在 mesos 上运行 spark 来让它工作。但是在单独运行 spark 时它看起来像一个错误。

关于streaming - 无法使用 Spark Streaming 处理特定数量的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24075604/

25 4 0