- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有以下 Spark 代码:
package my.spark;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public class ExecutionTest {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("ExecutionTest")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
int slices = 2;
int n = slices;
List<String> list = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
list.add("" + i);
}
JavaRDD<String> dataSet = jsc.parallelize(list, slices);
dataSet.foreach(str -> {
System.out.println("value: " + str);
Thread.sleep(10000);
});
System.out.println("done");
spark.stop();
}
}
我已经使用以下命令运行主节点和两个工作节点(本地主机上的所有内容;Windows):
bin\spark-class org.apache.spark.deploy.master.Master
和(两次):
bin\spark-class org.apache.spark.deploy.worker.Worker spark://<local-ip>:7077
一切都正确开始。
使用命令提交作业后:
bin\spark-submit --class my.spark.ExecutionTest --master spark://<local-ip>:7077 file:///<pathToFatJar>/FatJar.jar
命令已启动,但 value: 0
和 value: 1
输出由其中一名工作人员写入(如与工作人员关联的页面上的Logs > stdout
上所示)。第二个工作人员在 Logs > stdout
中没有任何内容。据我了解,这意味着每次迭代都是由同一个工作人员完成的。
如何在两个不同的正在运行的工作线程上运行这些任务?
最佳答案
这是可能的,但我不确定它是否每次都可以正常工作。然而,在测试时,每次都按预期工作。
我已经使用 Windows 10 x64 主机和 4 个虚拟机 (VM) 测试了我的代码:具有 Debian 9(延伸)内核 4.9.0 x64 的 VirtualBox、仅主机网络、Java 1.8.0_144、适用于 Hadoop 2.7 的 Apache Spark 2.2.0 (spark-2.2.0-bin-hadoop2.7.tar.gz)。
我一直在虚拟机上使用主服务器和 3 个从服务器,在 Windows 上使用另外一个从服务器:
我正在将作业从 Windows 计算机提交到位于虚拟机上的主服务器。
开头和之前一样:
SparkSession spark = SparkSession
.builder()
.config("spark.cores.max", coresCount) // not necessary
.appName("ExecutionTest")
.getOrCreate();
[重要] coresCount
对于分区至关重要 - 我必须使用已用核心的数量来对数据进行分区,而不是工作线程/执行程序的数量。
接下来,我必须创建 JavaSparkContext 和 RDD。重用 RDD 允许多次执行同一组工作线程。
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Integer> rddList
= jsc.parallelize(
IntStream.range(0, coresCount * 2)
.boxed().collect(Collectors.toList()))
.repartition(coresCount);
我创建了包含 coresCount * 2
元素的 rddList
。等于 coresCount 的元素数量不允许在所有关联的工作线程上运行(在我的例子中)。也许,coresCount + 1
就足够了,但我还没有测试它,因为 coresCount * 2
也不够。
接下来要做的是运行命令:
List<String> hostsList
= rddList.map(value -> {
Thread.sleep(3_000);
return InetAddress.getLocalHost().getHostAddress();
})
.distinct()
.collect();
System.out.println("-----> hostsList = " + hostsList);
Thread.sleep(3_000)
对于正确分配任务是必要的。 3秒对我来说就足够了。可能该值可能更小,有时可能需要更高的值(我猜该值取决于工作人员从主机获取任务执行的速度)。
上述代码将在与工作线程关联的每个核心上运行,因此每个工作线程不止一个。为了在每个工作线程上运行一个命令,我使用了以下代码:
/* as static field of class */
private static final AtomicBoolean ONE_ON_WORKER = new AtomicBoolean(false);
...
long nodeCount
= rddList.map(value -> {
Thread.sleep(3_000);
if (ONE_ON_WORKER.getAndSet(true) == false) {
System.out.println("Executed on "
+ InetAddress.getLocalHost().getHostName());
return 1;
} else {
return 0;
}
})
.filter(val -> val != 0)
.count();
System.out.println("-----> finished using #nodes = " + nodeCount);
当然,最后,停止:
spark.stop();
关于java - Spark中如何在不同的worker上运行任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46031549/
如果一个域有多个团队和多个 Web 应用程序,那么注册 Service Worker 来管理整个站点的最佳建议是什么?具有范围的顶级服务 worker /或子域中的多个服务 worker ?由于一个域
我开发了一个应用程序来分析播放 YouTube 视频时的网络流量。它使用 chrome.webRequest,我使用 onHeadersReceived 事件计算流量。 我想使用 service wo
假设我提供了不同网站使用的推送通知服务。此服务需要在我的客户站点上安装服务 worker 。我希望架构具有一些属性: 完全静态资源。安装service worker文件和配置JS片段等过程只需要完成一
我要缓存某人网站中的特定请求 ,那么我发现 service worker 是一个不错的选择。但我找不到任何方法 通过 tampermonkey 注入(inject)一个 service worker
当 Service Worker 更新时,它不会以正确的方式控制页面;它进入“等待”状态,等待被激活。 令人惊讶的是,更新后的 Service Worker 甚至在刷新页面后都无法控制选项卡。谷歌解释
有谁知道是否有办法在 service worker 中获取此号码或日期: 将我的服务 worker 缓存命名为 cache-1182 会很方便或 cache-20171127171448 我想在安装事
这link说: Workers may spawn more workers if they wish. So-called sub-workers must be hosted within the
有许多关于使用 ngsw-worker.js 安装 ServiceWorker 的分步指南;然而,甚至没有关于使用 safety-worker.js 卸载 ServiceWorker 的分步指南。 s
我正在尝试为我的网站使用后台定期同步。我正在使用 localhost 并在 1*1000 毫秒时注册 periodicsync 事件,但这根本不会触发。 我看过这个demo ,但即使我将该网站安装为应
我试图让用户安排一个周期性任务。我还在一个容器中运行多个 celery worker 。我对该容器的命令过去是这样的: celery worker -c 4 -B -l INFO -A my.cele
从我所看到的,你甚至可以缓存一个网页。根据此文档:https://www.mnot.net/cache_docs/#BROWSER ,表示可以缓存在浏览器缓存中。我看到即使是 serviceworke
我只是在测试 Service Worker 的功能以了解其工作原理。所以现在我遇到了一个问题。 var CACHE_NAME = 'my-site-cache-v1'; var urlsToCache
下图显示安装了两名工作人员 - 一名处于事件状态,另一名未处于事件状态(刚刚安装)。 注册 service worker 更改 service-worker.js并重新加载页面。 逻辑是 Servic
我正在尝试学习渐进式 Web 应用程序的一些基础知识,并且在我阅读的其中一篇教程中学习 [在安装了 service worker 并且用户导航到不同的页面或刷新后,service worker 将开始
我正在开发一个应用程序,其目标是定期(例如每小时)向用户发送通知。 我的想法是使用一个可以在选项卡关闭后运行的服务 worker ,并继续向用户发送这些通知。 网页需要能够与 Service Work
我正在尝试为一个简单但旧的 Django Web 应用程序安装 ServiceWorker。我开始使用示例 read-through caching example from the Chrome t
在我们开发的情况下,我们提供来自 https://localhost 的文件因为该应用程序托管在 salesforce.com 中。在 chrome service worker 中,chrome 会
我是服务人员的新手,并且浏览了各种文档(Google,Mozilla,serviceworke.rs,Github,StackOverflow questions)。最有用的是ServiceWorke
我正在解决一个问题,我有一组“热情的 worker ”。这意味着它们被维护在内存中,维护自己的上下文并且是可调用的。我一直在研究各种 Go Worker 实现,但都依赖于闭包或返回结果的简单计算函数。
我有一个部署到静态服务器的非根路径的网络应用程序。即MyApp构建时部署到路径/文件夹 https://example.com/myapp . MyApp正在使用 vue 和 webpack 所以我添
我是一名优秀的程序员,十分优秀!