- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在具有 1 个主节点和 2 个从节点的 3 节点集群上使用 Spark-1.0.0。我正在尝试在 Spark Streaming 上运行 LR 算法。
package org.apache.spark.examples.streaming;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Logistic regression based classification using ML Lib.
*/
public final class StreamingJavaLR {
static int i = 1;
// static LogisticRegressionModel model;
// private static final Pattern SPACE = Pattern.compile(" ");
static class ParsePoint implements Function<String, LabeledPoint> {
private static final Pattern COMMA = Pattern.compile(",");
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public LabeledPoint call(String line) {
String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
String[] tok = SPACE.split(parts[1]);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}
}
// Edited
static class ParsePointforInput implements Function<String, double[]> {
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public double[] call(String line) {
String[] tok = SPACE.split(line);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return x;
}
}
public static void main(String[] args) {
if (args.length != 5) {
System.err
.println("Usage: JavaLR <master> <input_file_for_training> <step_size> <no_iters> <input_file_for_prediction>");
System.exit(1);
}
FileWriter file;
PrintWriter outputFile = null;
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Calendar cal=Calendar.getInstance();
final Date startTime;
System.out.println("<<<<<Let's Print>>>>>");
// SparkConf conf = new SparkConf()
// .setMaster(args[0])
// .setAppName("StreamingJavaLR")
// .set("spark.cleaner.ttl", "1000")
// .set("spark.executor.uri", "hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz")
// .setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class));
//
// JavaSparkContext sc = new JavaSparkContext(conf);
JavaSparkContext sc = new JavaSparkContext(args[0],
"StreamingJavaLR",
System.getenv("SPARK_HOME"),
JavaSparkContext.jarOfClass(StreamingJavaLR.class));
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Reading File");
JavaRDD<String> lines = sc.textFile(args[1]);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File has been Read now mapping");
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Mapping Done");
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Read the arguments. stepSize = "+stepSize+" and iterations = "+iterations);
BufferedReader br = null;
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Training the Model");
final LogisticRegressionModel model = LogisticRegressionWithSGD.train(
points.rdd(), iterations, stepSize);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Model Trained");
System.out.println("Final w: " + model.weights());
// printWeights(model.weights());
System.out.println("Intercept : " + model.intercept());
final Vector weightVector = model.weights();
// double[] weightArray = model.weights();
//
// final DoubleMatrix weightMatrix = new DoubleMatrix(weightArray);
sc.stop();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// try {
// file = new FileWriter(args[5]);
// outputFile = new PrintWriter(file);
// cal = Calendar.getInstance();
// cal.getTime();
//// startTime = sdf.format(cal.getTime());
// startTime = cal.getTime();
// outputFile.println("Start Time : " + startTime);
// outputFile.flush();
// } catch (IOException E) {
// E.printStackTrace();
// }
// final JavaStreamingContext ssc = new JavaStreamingContext(sc,
// new Duration(1000));
startTime = cal.getTime();
final JavaStreamingContext ssc = new JavaStreamingContext(args[0],
"StreamingJavaLR", new Duration(1000),
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(StreamingJavaLR.class));
JavaDStream<String> lines_2 = ssc.textFileStream(args[4]);
JavaDStream<double[]> points_2 = lines_2.map(new ParsePointforInput());
// points_2.print();
// System.out.print(lines_2.count());
// System.exit(0);
points_2.foreachRDD(new Function<JavaRDD<double[]>, Void>() {
@Override
public Void call(JavaRDD rdd) {
List<double[]> temp = rdd.collect();
//If no more data is left for Prediction, Stop the Program
// if (rdd.count() == 0)
// ssc.stop();
FileWriter newfile = null;
BufferedWriter bw = null;
try {
newfile = new FileWriter(
"/home/pravesh/data/abc"
+ i++ + ".txt");
bw = new BufferedWriter(newfile);
} catch (IOException e) {
e.printStackTrace();
}
int inpNo = 0;
double result;
for (double[] dArray : temp) {
double[][] dataArray = new double[1][2];
for (int i = 0; i < dArray.length; i++)
dataArray[0][i] = dArray[i];
// DoubleMatrix dataMatrix = new DoubleMatrix(dataArray);
// result = model.predictPoint(dataMatrix, weightMatrix,
// model.intercept());
Vector dataVector = Vectors.dense(dArray);
result = model.predictPoint(dataVector, weightVector, model.intercept());
try {
Calendar cal2 = Calendar.getInstance();
// bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo + " (" + dataMatrix.get(0, 0)
// + ", " + dataMatrix.get(0, 1) + ")"
// + " belongs to : " + result + " and Start Time was " + startTime + "\n");
bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo + " (" + dataVector.toArray()[0]
+ ", " + dataVector.toArray()[1] + ")"
+ " belongs to : " + result + " and Start Time was " + startTime + "\n");
bw.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// newoutputFile.flush();
inpNo++;
}
try {
bw.close();
newfile.close();
} catch (IOException e) {
e.printStackTrace();
}
Void v = null;
return v;
}
});
ssc.start();
ssc.awaitTermination();
// cal = Calendar.getInstance();
// outputFile.println(" End Time : " + cal.getTime());
// outputFile.flush();
System.exit(0);
}
}
14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113609-0001/0 on hostPort host-DSRV05.host.co.in:55206 with 8 cores, 512.0 MB RAM
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor added: app-20140606113609-0001/1 on worker-20140606114445-host-DSRV04.host.co.in-39342 (host-DSRV04.host.co.in:39342) with 8 cores
14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113609-0001/1 on hostPort host-DSRV04.host.co.in:39342 with 8 cores, 512.0 MB RAM
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated: app-20140606113609-0001/0 is now RUNNING
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated: app-20140606113609-0001/1 is now RUNNING
14/06/06 11:36:09 INFO RecurringTimer: Started timer for JobGenerator at time 1402034770000
14/06/06 11:36:09 INFO JobGenerator: Started JobGenerator at 1402034770000 ms
14/06/06 11:36:09 INFO JobScheduler: Started JobScheduler
14/06/06 11:36:10 INFO FileInputDStream: Finding new files took 29 ms
14/06/06 11:36:10 INFO FileInputDStream: New files at time 1402034770000 ms:
file:/newdisk1/praveshj/pravesh/data/input/testing8lk.txt
14/06/06 11:36:10 INFO MemoryStore: ensureFreeSpace(33216) called with curMem=0, maxMem=309225062
14/06/06 11:36:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.4 KB, free 294.9 MB)
14/06/06 11:36:10 INFO FileInputFormat: Total input paths to process : 1
14/06/06 11:36:10 INFO JobScheduler: Added jobs for time 1402034770000 ms
14/06/06 11:36:10 INFO JobScheduler: Starting job streaming job 1402034770000 ms.0 from job set of time 1402034770000 ms
14/06/06 11:36:10 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:10 INFO DAGScheduler: Got job 0 (collect at StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
14/06/06 11:36:10 INFO DAGScheduler: Final stage: Stage 0(collect at StreamingJavaLR.java:170)
14/06/06 11:36:10 INFO DAGScheduler: Parents of final stage: List()
14/06/06 11:36:10 INFO DAGScheduler: Missing parents: List()
14/06/06 11:36:10 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35), which has no missing parents
14/06/06 11:36:10 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
14/06/06 11:36:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV05.host.co.in:47657/user/Executor#-1277914179] with ID 0
14/06/06 11:36:10 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
14/06/06 11:36:10 INFO TaskSetManager: Serialized task 0.0:0 as 3544 bytes in 1 ms
14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV04.host.co.in:46975/user/Executor#1659982546] with ID 1
14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager host-DSRV05.host.co.in:52786 with 294.9 MB RAM
14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager host-DSRV04.host.co.in:42008 with 294.9 MB RAM
14/06/06 11:36:11 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:11 INFO FileInputDStream: New files at time 1402034771000 ms:
14/06/06 11:36:11 INFO JobScheduler: Added jobs for time 1402034771000 ms
14/06/06 11:36:12 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:36:12 INFO FileInputDStream: New files at time 1402034772000 ms:
14/06/06 11:36:12 INFO JobScheduler: Added jobs for time 1402034772000 ms
14/06/06 11:36:13 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:13 INFO FileInputDStream: New files at time 1402034773000 ms:
14/06/06 11:36:13 INFO JobScheduler: Added jobs for time 1402034773000 ms
14/06/06 11:36:14 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:14 INFO FileInputDStream: New files at time 1402034774000 ms:
14/06/06 11:36:14 INFO JobScheduler: Added jobs for time 1402034774000 ms
14/06/06 11:36:15 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:15 INFO FileInputDStream: New files at time 1402034775000 ms:
14/06/06 11:36:15 INFO JobScheduler: Added jobs for time 1402034775000 ms
14/06/06 11:36:15 INFO BlockManagerInfo: Added taskresult_0 in memory on host-DSRV05.host.co.in:52786 (size: 19.9 MB, free: 275.0 MB)
14/06/06 11:36:15 INFO SendingConnection: Initiating connection to [host-DSRV05.host.co.in/192.168.145.195:52786]
14/06/06 11:36:15 INFO SendingConnection: Connected to [host-DSRV05.host.co.in/192.168.145.195:52786], 1 messages pending
14/06/06 11:36:15 INFO ConnectionManager: Accepted connection from [host-DSRV05.host.co.in/192.168.145.195]
14/06/06 11:36:15 INFO BlockManagerInfo: Removed taskresult_0 on host-DSRV05.host.co.in:52786 in memory (size: 19.9 MB, free: 294.9 MB)
14/06/06 11:36:15 INFO DAGScheduler: Completed ResultTask(0, 0)
14/06/06 11:36:15 INFO TaskSetManager: Finished TID 0 in 4961 ms on host-DSRV05.host.co.in (progress: 1/1)
14/06/06 11:36:15 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/06/06 11:36:15 INFO DAGScheduler: Stage 0 (collect at StreamingJavaLR.java:170) finished in 5.533 s
14/06/06 11:36:15 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 5.548644244 s
14/06/06 11:36:16 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:36:16 INFO FileInputDStream: New files at time 1402034776000 ms:
14/06/06 11:36:16 INFO JobScheduler: Added jobs for time 1402034776000 ms
14/06/06 11:36:17 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:17 INFO FileInputDStream: New files at time 1402034777000 ms:
14/06/06 11:36:17 INFO JobScheduler: Added jobs for time 1402034777000 ms
14/06/06 11:36:18 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:18 INFO FileInputDStream: New files at time 1402034778000 ms:
14/06/06 11:36:18 INFO JobScheduler: Added jobs for time 1402034778000 ms
14/06/06 11:36:19 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:19 INFO FileInputDStream: New files at time 1402034779000 ms:
14/06/06 11:36:19 INFO JobScheduler: Added jobs for time 1402034779000 ms
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034770000 ms.0 from job set of time 1402034770000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 9.331 s for time 1402034770000 ms (execution: 9.274 s)
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 2.7293E-5 s
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034771000 ms.0 from job set of time 1402034771000 ms
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034771000 ms.0 from job set of time 1402034771000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 8.333 s for time 1402034771000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034772000 ms.0 from job set of time 1402034772000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.4859E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034772000 ms.0 from job set of time 1402034772000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 7.335 s for time 1402034772000 ms (execution: 0.002 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034773000 ms.0 from job set of time 1402034773000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.5294E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034773000 ms.0 from job set of time 1402034773000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 6.336 s for time 1402034773000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034774000 ms.0 from job set of time 1402034774000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.117E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034774000 ms.0 from job set of time 1402034774000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 5.337 s for time 1402034774000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034775000 ms.0 from job set of time 1402034775000 ms
14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were older than 1402034769000 ms:
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.1414E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034775000 ms.0 from job set of time 1402034775000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 4.338 s for time 1402034775000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034776000 ms.0 from job set of time 1402034776000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 4.2422E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034776000 ms.0 from job set of time 1402034776000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 3.338 s for time 1402034776000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034777000 ms.0 from job set of time 1402034777000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 3 from persistence list
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.1133E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034777000 ms.0 from job set of time 1402034777000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 2.339 s for time 1402034777000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034778000 ms.0 from job set of time 1402034778000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.124E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034778000 ms.0 from job set of time 1402034778000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 1.340 s for time 1402034778000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034779000 ms.0 from job set of time 1402034779000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.2101E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034779000 ms.0 from job set of time 1402034779000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 0.341 s for time 1402034779000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO BlockManager: Removing RDD 3
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 2 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 2
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 1 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 1
14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were older than 1402034770000 ms:
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 6 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 6
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 5 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 5
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 4 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 4
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034771000 ms: 1402034770000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 9 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 9
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 8 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 8
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 7 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 7
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034772000 ms: 1402034771000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 12 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 12
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 11 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 11
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 10 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 10
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034773000 ms: 1402034772000 ms
14/06/06 11:36:20 INFO JobScheduler: Finished job streaming job 1402034780000 ms.0 from job set of time 1402034780000 ms
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added: app-20140606113855-0003/0 on worker-20140606114445-host-DSRV05.host.co.in-55206 (host-DSRV05.host.co.in:55206) with 8 cores
14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113855-0003/0 on hostPort host-DSRV05.host.co.in:55206 with 8 cores, 512.0 MB RAM
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added: app-20140606113855-0003/1 on worker-20140606114445-host-DSRV04.host.co.in-39342 (host-DSRV04.host.co.in:39342) with 8 cores
14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113855-0003/1 on hostPort host-DSRV04.host.co.in:39342 with 8 cores, 512.0 MB RAM
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated: app-20140606113855-0003/0 is now RUNNING
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated: app-20140606113855-0003/1 is now RUNNING
14/06/06 11:38:55 INFO RecurringTimer: Started timer for JobGenerator at time 1402034936000
14/06/06 11:38:55 INFO JobGenerator: Started JobGenerator at 1402034936000 ms
14/06/06 11:38:55 INFO JobScheduler: Started JobScheduler
14/06/06 11:38:56 INFO FileInputDStream: Finding new files took 31 ms
14/06/06 11:38:56 INFO FileInputDStream: New files at time 1402034936000 ms:
file:/newdisk1/praveshj/pravesh/data/input/testing4lk.txt
14/06/06 11:38:56 INFO MemoryStore: ensureFreeSpace(33216) called with curMem=0, maxMem=309225062
14/06/06 11:38:56 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.4 KB, free 294.9 MB)
14/06/06 11:38:56 INFO FileInputFormat: Total input paths to process : 1
14/06/06 11:38:56 INFO JobScheduler: Added jobs for time 1402034936000 ms
14/06/06 11:38:56 INFO JobScheduler: Starting job streaming job 1402034936000 ms.0 from job set of time 1402034936000 ms
14/06/06 11:38:56 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170
14/06/06 11:38:56 INFO DAGScheduler: Got job 0 (collect at StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
14/06/06 11:38:56 INFO DAGScheduler: Final stage: Stage 0(collect at StreamingJavaLR.java:170)
14/06/06 11:38:56 INFO DAGScheduler: Parents of final stage: List()
14/06/06 11:38:56 INFO DAGScheduler: Missing parents: List()
14/06/06 11:38:56 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35), which has no missing parents
14/06/06 11:38:56 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
14/06/06 11:38:56 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/06/06 11:38:57 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:38:57 INFO FileInputDStream: New files at time 1402034937000 ms:
14/06/06 11:38:57 INFO JobScheduler: Added jobs for time 1402034937000 ms
14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV05.host.co.in:39424/user/Executor#-500165450] with ID 0
14/06/06 11:38:57 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
14/06/06 11:38:57 INFO TaskSetManager: Serialized task 0.0:0 as 3544 bytes in 0 ms
14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@host-DSRV04.host.co.in:45532/user/Executor#1654371091] with ID 1
14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager host-DSRV05.host.co.in:53857 with 294.9 MB RAM
14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager host-DSRV04.host.co.in:38057 with 294.9 MB RAM
14/06/06 11:38:58 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:38:58 INFO FileInputDStream: New files at time 1402034938000 ms:
14/06/06 11:38:58 INFO JobScheduler: Added jobs for time 1402034938000 ms
14/06/06 11:38:59 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:38:59 INFO FileInputDStream: New files at time 1402034939000 ms:
14/06/06 11:38:59 INFO JobScheduler: Added jobs for time 1402034939000 ms
14/06/06 11:39:00 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:00 INFO FileInputDStream: New files at time 1402034940000 ms:
14/06/06 11:39:00 INFO JobScheduler: Added jobs for time 1402034940000 ms
14/06/06 11:39:01 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:01 INFO FileInputDStream: New files at time 1402034941000 ms:
14/06/06 11:39:01 INFO JobScheduler: Added jobs for time 1402034941000 ms
14/06/06 11:39:02 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:02 INFO FileInputDStream: New files at time 1402034942000 ms:
14/06/06 11:39:02 INFO JobScheduler: Added jobs for time 1402034942000 ms
14/06/06 11:39:03 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:03 INFO FileInputDStream: New files at time 1402034943000 ms:
14/06/06 11:39:03 INFO JobScheduler: Added jobs for time 1402034943000 ms
14/06/06 11:39:04 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:04 INFO FileInputDStream: New files at time 1402034944000 ms:
14/06/06 11:39:04 INFO JobScheduler: Added jobs for time 1402034944000 ms
14/06/06 11:39:05 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:39:05 INFO FileInputDStream: New files at time 1402034945000 ms:
14/06/06 11:39:05 INFO JobScheduler: Added jobs for time 1402034945000 ms
14/06/06 11:39:06 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:39:06 INFO FileInputDStream: New files at time 1402034946000 ms:
最佳答案
好吧,我能够通过在 mesos 上运行 spark 来让它工作。但是在单独运行 spark 时它看起来像一个错误。
关于streaming - 无法使用 Spark Streaming 处理特定数量的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24075604/
我正在尝试实现具有以下签名的方法: public static Pair, Stream> flatten(Iterator, Stream>> iterator); 该方法的目标是将每种流类型展平
我有两个流从两个不同的 api 获取。 Stream get monthOutStream => monthOutController.stream; Stream get resultOutStre
Stream.of(int[])返回 Stream ,而 Stream.of(String[])返回 Stream . 为什么这两种方法的行为不同?两者都应该返回 Stream和 Stream或 St
我正在使用 rxdart在 dart 中处理流的包。我被困在处理一个特殊的问题上。 请看一下这个虚拟代码: final userId = BehaviorSubject(); Stream getSt
我到处都找遍了,还是没弄明白。我知道你可以用流建立两个关联: 用于支持数据存储的包装器意味着作为消费者和供应商之间的抽象层 数据随着时间的推移变得可用,而不是一次全部 SIMD 代表单指令,多数据;在
考虑下面的代码: List l=new ArrayList<>(); l.add(23);l.add(45);l.add(90); Stream str=l.stream
我有一个大型主干/requirejs 应用程序,我想迁移到 webpack,最新的“webpack”:“^4.27.1”,但我遇到了一个我无法解决的错误。 我一直在阅读 https://webpack
我正在使用 xmpp 开发聊天应用程序,根据我们的要求,我们有三台服务器 Apache Tomcat 7、ejabbered 2.1.11 和 mysql 5.5, to run xmppbot on
我知道如何使用 Java 库,并且我可以编写一些循环来执行我需要的操作,但问题更多,为什么 scala.collection.JavaConverters 中没有任何内容或scala.collecti
我正在尝试创建一个单一的衬里,它应该计算一个非常长的文本文件中的唯一单词。独特的词例如:márya fëdorovna scarlet-liveried,...所以基本上都是非英语词。 我的问题是我的
如果我有以下情况: StreamWriter MySW = null; try { Stream MyStream = new FileStream("asdf.txt"); MySW =
有人可以帮我将以下语句转换为 Java8: 我有一个像这样的 HashMap : private Map, List>> someMap; 我想在java8中转换以下逻辑: private Strin
有人可以帮我将以下语句转换为 Java8: 我有一个像这样的 HashMap : private Map, List>> someMap; 我想在java8中转换以下逻辑: private Strin
考虑两种测试方法parallel()和sequential(): @Test public void parallel() throws Exception { System.ou
我是 NodeJS 的新手,我基本上想做的是通过 HTTP 将 .pdf 上传到我的服务器。我正在使用 POST rquest 来处理 Content-Type multipart/form-data
哪个更好:MemoryStream.WriteTo(Stream destinationStream) 或 Stream.CopyTo(Stream destinationStream)?? 我正在谈
给定一个 Stream,我想创建一个新的 Stream,其中的元素在它们之间有时间延迟。 我尝试使用 tokio_core::reactor::Timeout 和 Stream 的 and_then
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面已经阅读了有关它的好东西,因此开发人员可以主要专注于事物的业务逻辑方面。 这里
源代码看起来非常相似:pump , pipe .为什么我要使用一个而不是另一个?一个只是另一个的更好版本吗? 最佳答案 Stream.pipe 现在显然是自 0.3.x 以来的首选方法,因此尽可能尝试
我正在寻找是否有更好的方法来解决我不得不使用这些签名的困境(注意:由于 Spock 测试,T[][] 是必需的,我提供 T[][] 作为数据提供商) 我的方法签名是: public T[][] cr
我是一名优秀的程序员,十分优秀!