- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在尝试为 Structured Streaming
编写一个自定义接收器,它将使用来自 RabbitMQ
的消息。Spark
recently released DataSource V2 API,这看起来很有前途。由于它抽象了很多细节,我想使用这个 API 是为了简单和性能。但是,由于它很新,因此可用的资源不多。我需要经验丰富的 Spark
人员进行一些说明,因为他们会更容易掌握关键点。我们开始吧:
我的起点是博客文章系列,第一部分 here .它展示了如何在没有流功能的情况下实现数据源。为了制作一个流媒体源,我稍微改变了它们,因为我需要实现 MicroBatchReadSupport代替(或补充)DataSourceV2 .
为了提高效率,明智的做法是让多个 spark 执行器同时使用 RabbitMQ
,即来自同一个队列。如果我没有混淆的话,输入的每个分区 - 在 Spark
的术语中 - 对应于来自队列的消费者 - 在 RabbitMQ
的术语中。因此,我们需要为输入流设置多个分区,对吧?
类似于part 4 of the series , 我实现了 MicroBatchReader如下:
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
int partition = options.getInt(RMQ.PARTITICN, 5);
List<DataReaderFactory<Row>> factories = new LinkedList<>();
for (int i = 0; i < partition; i++) {
factories.add(new RMQDataReaderFactory(options));
}
return factories;
}
我正在返回一个工厂列表,希望列表中的每个实例都被用来创建一个读者,这也是一个消费者。这种方法正确吗?
我希望我的接收者是可靠的,即在每条处理过的消息之后(或者至少写入 chekpoint 目录以供进一步处理),我需要将其确认回 RabbitMQ
。问题从这里开始:这些工厂是在驱动程序中创建的,实际的读取过程通过 DataReader 在执行程序中进行。秒。然而,commit方法是 MicroBatchReader
的一部分,而不是 DataReader
。由于每个 MicroBatchReader
我有很多 DataReader
,我应该如何将这些消息返回给 RabbitMQ
?或者我应该在 next 时确认在 DataReader
上调用方法?安全吗?如果是这样,那么 commit
函数的目的是什么?
CLARIFICATION: OBFUSCATION: 答案中提供的关于重命名某些类/函数的链接(除了那里的解释)使一切都更加清晰 比以前更糟。引自 there :
Renames:
DataReaderFactory
toInputPartition
DataReader
toInputPartitionReader
...
InputPartition
's purpose is to manage the lifecycle of the associated reader, which is now calledInputPartitionReader
, with an explicit create operation to mirror the close operation. This was no longer clear from the API becauseDataReaderFactory
appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.
编辑: 然而,docs清楚地说“读取器工厂将被序列化并发送给执行器,然后数据读取器将在执行器上创建并进行实际读取。”
为了使消费者可靠,我必须仅在特定消息在 Spark 端提交后才确认该消息。 Note that必须在传递消息的同一连接上确认消息,但在驱动程序节点调用提交函数。我如何在工作节点/执行节点上提交?
最佳答案
<删除>> 我正在返回一个工厂列表,希望列表中的每个实例都被用来创建一个读者,这也是一个消费者。这种做法正确吗?源 [socket][1] 源实现有一个线程将消息推送到内部 ListBuffer。换句话说,有一个消费者(线程)填满了内部 ListBuffer,它**然后**由 `planInputPartitions`(`createDataReaderFactories` 已 [重命名][2] 为 `planInputPartitions`)分成多个分区。此外,根据 [MicroBatchReadSupport][3] 的 Javadoc> 执行引擎将在流式查询开始时创建一个微批处理读取器,交替调用 setOffsetRange 和 createDataReaderFactories 以处理每个批处理,然后在执行完成时调用 stop()。请注意,由于重启或故障恢复,单个查询可能会多次执行。换句话说,应该调用 `createDataReaderFactories` **多次** 次,根据我的理解,这表明每个 `DataReader` 负责一个静态输入分区,这意味着 DataReader 不应该是消费者。----------> 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ... 如果是这样,那么 commit 函数的目的是什么?提交函数的部分原因可能是防止 MicroBatchReader 的内部缓冲区变大。通过提交偏移量,您可以有效地从缓冲区中删除小于偏移量的元素,因为您 promise 不再处理它们。您可以使用 `batches.trimStart(offsetDiff)` 在套接字源代码中看到这种情况
编辑
我只研究了 socket , 和 wiki-edit来源。这些资源还没有准备好生产,这是问题所在没有寻找的东西。相反,kafka source 是更好的起点,与上述来源不同,它有多个像作者正在寻找的消费者。
但是,也许如果您正在寻找不可靠的来源,上面的 socket 和 wikiedit 来源提供了一个不太复杂的解决方案。
关于java - Spark Structured Streaming with RabbitMQ 源码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50684667/
我想读取 RabbitMQ 队列中未确认消息的负载或 messageId。这可能吗? 我想这样做的原因是我尝试使用 RabbitMQ 死信功能来构建一个循环以定期自动生成消息。简而言之,创建两个队列
除了 vFabric 由 VMWare 提供商业支持之外,vFabric RabbitMQ 和 RabbitMQ 之间的主要区别是什么? 最佳答案 来自 source : We also produc
RabbitMQ 集群中有如下三个节点。 在 RabbitMQ 中,有两个队列,q1 和 q2。 q1 和q2 的主副本分布在不同的节点上。两个队列都被其他节点镜像。 三个节点前面有一个负载均衡器。
我希望在谷歌计算引擎上实现 rabbitmq 来处理我的 android 和 ios 消息传递应用程序上的消息。我听说 rabbitmq 可能非常耗电,所以我想知道解决这个问题的最佳解决方案是什么?我
是否可以在 RabbitMQ 服务器(管理插件 View )中查看连接单元的主机名,而不仅仅是 IP/端口?我们使用动态连接位置,这样更容易识别客户...... 最佳答案 不,没有这样的内置功能。 作
我正在阅读 RabbitMQ in Action 书,仍在第 2 章中,但作者说的一件事让我感到困惑。您设置了一个交换并发送了一条消息,两个订阅者正在监听队列。当第一条消息进来时,第一个订阅者得到它,
我正在使用 RabbitMQ 将所有消息排队,并将消息作为 SMS 发送给各个消费者。我正在使用直接交换,并且我已经正确地创建了一个到带有路由键的队列的绑定(bind)。问题是,当我尝试发布消息时,我
我们正在使用微服务架构在 nodejs 中实现 Web-API。每个服务都会公开 HTTP 端点,以便应用程序/网站可以与其交互。为了同步不同的数据库,我们目前使用 RabbitMQ。微服务可以在扇出
我计划在 RabbitMQ 消息头中存储堆栈跟踪。消息 header 是否有大小限制? 最佳答案 RabbitMQ 默认使用 AMQP 版本 0.9.1。根据AMQP protocol specifi
无法理解 exclusive queue 和 exclusive consumer 之间的区别,想知道我是否理解正确。 假设我有一个 queue、consumer1 和 consumer2。 我的理解
发布到 RabbitMQ 队列(发布/订阅模型)时消息的最大大小是多少? 我在文档中看不到任何明确的限制,但我认为有一些指导方针。 提前致谢。 最佳答案 我在做比较亚马逊队列服务和 RabbitMQ
我可以使用 Publish/Subscribe 创建扇出交换RabbitMQ Java 教程,任何连接的消费者都会收到一条消息的副本。我不想以动态/编程方式声明交换和绑定(bind),而是想在连接任何
java的 native rabbitmq客户端允许在连接设置上设置心跳,例如: import com.rabbitmq.client.ConnectionFactory; ... Connectio
我开始着手一个新项目,我们被要求将系统构建为一系列微服务,使用 RabbitMQ 作为它们之间的通信层。 在开发 REST API 时,我倾向于使用接受 HTTP header 来控制版本控制,我看到
在 Rabbit MQ 中使用集群时,我计划使用竞争订阅者模式。 Producer : 1 Exchange : 1 direct Queue : 1 Consumers : n (multiple)
是否可以实现 aggregator pattern在 RabbitMQ 中? 我有 A … N在发送到另一个队列之前我需要等待/聚合的消息 X . 所以我想我会有一些 唯一 ID 确保消息被路由 独家
我正在使用RabbitMQ向用户发送通知。用户可以随时读取其队列。 我面临的问题是,队列在夜间充满了很多通知,而当用户在早上返回时,他必须顺序处理这些消息。这些通知中有很多甚至是重复的。 我认为在发布
是否可以延迟通过 RabbitMQ 发送消息? 例如,我想在 30 分钟后使客户端 session 过期,并且我发送了一条将在 30 分钟后处理的消息。 最佳答案 您可以尝试两种方法: 旧方法:在每个
RabbitMQ 在单个服务器上可以处理的最大队列数是多少? 这取决于内存吗?它取决于 erlang 进程吗? 最佳答案 RabbitMQ 代理内部没有任何硬编码限制。代理将利用所有可用资源(除非您对
RabbitMQ Server 使用或需要在防火墙上为节点集群打开哪些端口? 我的 /usr/lib/rabbitmq/bin/rabbitmq-env 设置如下,我假设需要 (35197)。 SER
我是一名优秀的程序员,十分优秀!