- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有卡夫卡记录:
ConsumerRecords<String, Events> records = kafkaConsumer.poll(POLL_TIMEOUT);
我想使用并行流而不是多线程运行以下代码。
records.forEach((record) -> {
Event event = record.value();
HTTPSend.send(event);
});
我尝试过多线程,但我想尝试并行流:
for (ConsumerRecord<String, Event> record : records) {
executor.execute(new Runnable() {
@Override
public void run() {
HTTPSend.send(Event);
}
});
}
实际上,我面临着多线程 HTTP.send 的问题(即使线程池为 1 个线程)。我得到了
“由以下原因引起:sun.security.validator.ValidatorException:PKIX 路径构建失败:sun.security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径”
。这是通过 https 的请求。此错误仅在第一次发出请求时出现。之后,异常消失。噗!
对于我正在使用的多线程:
int threadCOunt=1;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(threadCOunt, true);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executor = new ThreadPoolExecutor(threadCOunt, threadCOunt, 0L, TimeUnit.MILLISECONDS, queue, handler);
HTTPSend.send() 是:
long sizeSend = 0;
SSLContext sc = null;
try {
sc = SSLContext.getInstance("TLS");
sc.init(null, TRUST_ALL_CERTS, new SecureRandom());
} catch (NoSuchAlgorithmException | KeyManagementException e) {
LOGGER.error("Failed to create SSL context", e);
}
// Ignore differences between given hostname and certificate hostname
HostnameVerifier hv = (hostname, session) -> true;
// Create the REST client and configure it to connect meta
Client client = ClientBuilder.newBuilder()
.hostnameVerifier(hv)
.sslContext(sc).build();
WebTarget baseTarget = client.target(getURL()).path(HTTP_PATH);
Response jsonResponse = null;
try {
StringBuilder eventsBatchString = new StringBuilder();
eventsBatchString.append(this.getEvent(event));
Entity<String> entity = Entity.entity(eventsBatchString.toString(), MediaType.APPLICATION_JSON_TYPE);
builder = baseTarget.request();
LOGGER.debug("about to send the event {} and URL {}", entity, getURL());
jsonResponse = builder.header(HTTP_ACK_CHANNEL, guid.toString())
.header("Content-type", MediaType.APPLICATION_JSON)
.header("Authorization", String.format("Meta %s", eventsModuleConfig.getSecretKey()))
.post(entity);
最佳答案
我明白你想做什么,但我不确定这是最好的主意(我也不确定这是否不是)。
Kafka 的 poll
/commit
模型允许简单的背压,并在崩溃时保留最后处理的项目。通过“立即”返回轮询循环,您告诉 Kafka“我已准备好接受更多”,并提交偏移量(手动或自动)告诉 Kafka 您已成功读取到该点。
您似乎想要做的是尽快读取 Kafka,提交偏移量,然后将 Kafka 记录放入执行器队列中,然后从中平衡每秒的请求等。
我不能 100% 确定这是一个好主意:如果您的应用程序崩溃了会发生什么?您可能已经提交了一些实际上并未到达上游的 Kafka 消息。如果您确实想这样做,我建议在完成 Runnable
后手动提交偏移量(通过 commitSync
),而不是让高级使用者为您执行此操作。
为什么你可能想要使用线程执行器:我认为这些也可以通过 Kafka 来完成。
您可能想同时向网络服务器发布多条消息。分区良好的 Kafka 主题将允许多个消费者/消费者组消费多个分区,因此 - 假设一个完美扩展的 HTTP 服务器 - 将允许您并行地将消息发布到您的服务器。基于进程的并发性太棒了!
也许 Web 服务器不是完全可扩展的,或者该请求速度很慢(假设每个请求需要 1 秒):您需要限制 Web 服务器每秒处理的请求数,如果您有一个队列,则可能有几个线程在不备份 Kafka 的情况下发布。
在这种情况下,您可以设置 max.poll.records为您的 Web 服务器所需的可扩展值。可能还有更好的方法来做到这一点,尽管目前我还没有想到。
如果您的网络服务器需要很长时间才能响应,您可能会收到与心跳失败相关的错误。在这种情况下,我会引导您访问 this SO answer on the timeout / heartbeat topic .
我不会使用线程执行器,从而使同步 HTTP 请求看起来是异步的,而是使用像 Netty 这样的事件 HTTP 客户端。 ,从而无需基于线程的并发即可实现并行性。
关于java - 与 kafka 消费者记录并行流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47250781/
我有一个网站。 必须登录才能看到里面的内容。 但是,我使用此代码登录。 doc = Jsoup.connect("http://46.137.207.181/Account/Login.aspx")
我正在尝试为我的域创建一个 SPF 记录并使我的邮件服务器能够对其进行评估。我在邮件服务器上使用 Postfix 并使用 policyd-spf (Python) 来评估记录。目前,我通过我的私有(p
我需要为负载平衡的 AWS 站点 mywebsite.com 添加 CName 记录。记录应该是: @ CNAME mywebsite.us-east-1.elb.amazon
我目前正在开发一个相当大的多层应用程序,该应用程序将部署在海外。虽然我希望它在解聚后不会折叠或爆炸,但我不能 100% 确定这一点。因此,如果我知道我可以请求日志文件,以准确找出问题所在以及原因,那就
我使用以下命令从我的网络摄像头录制音频和视频 gst-launch-0.10 v4l2src ! video/x-raw-yuv,width=640,height=480,framerate=30/1
我刚刚开始使用 ffmpeg 将视频分割成图像。我想知道是否可以将控制台输出信息保存到日志文件中。我试过“-v 10”参数,也试过“-loglevel”参数。我在另一个 SO 帖子上看到使用 ffmp
我想针对两个日期查询我的表并检索其中的记录。 我这样声明我的变量; DECLARE @StartDate datetime; DECLARE @EndDate datetime; 并像这样设置我的变量
在 javascript 中,我可以使用简单的 for 循环访问对象的每个属性,如下所示 var myObj = {x:1, y:2}; var i, sum=0; for(i in myObj) s
最近加入了一个需要处理大量代码的项目,我想开始记录和可视化调用图的一些流程,让我更好地理解一切是如何组合在一起的。这是我希望在我的理想工具中看到的: 每个节点都是一个函数/方法 如果一个函数可以调用另
如何使用反射在F#中创建记录类型?谢谢 最佳答案 您可以使用 FSharpValue.MakeRecord [MSDN]创建一个记录实例,但是我认为F#中没有任何定义记录类型的东西。但是,记录会编译为
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 3年前关闭。 Improve thi
我是 Sequelize 的新手并且遇到了一些语法问题。我制作了以下模型: // User sequelize.define('user', { name: { type: DataTyp
${student.name} Notify 这是我的output.jsp。请注意,我已经放置了一个链接“Notify”以将其转发到 display.jsp 上。但我不确定如何将 Stud
例如,这是我要做的查询: server:"xxx.xxx.com" AND request_url:"/xxx/xxx/xxx" AND http_X_Forwarded_Proto:(https O
我一直在开发大量 Java、PHP 和 Python。所有这些都提供了很棒的日志记录包(分别是 Log4J、Log 或logging)。这在调试应用程序时有很大帮助。特别是当应用程序 headless
在我的Grails应用程序中,我异步运行一些批处理过程,并希望该过程记录各种状态消息,以便管理员以后可以检查它们。 我考虑过将log4j JDBC附加程序用作最简单的解决方案,但是据我所知,它不使用D
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
如果我有一条包含通用字段的记录,在更改通用字段时是否有任何方法可以模仿方便的 with 语法? 即如果我有 type User = // 'photo can be Bitmap or Url {
假设我有一个名为 Car 的自定义对象。其中的所有字段都是私有(private)的。 public class Car { private String mName; private
当记录具有特定字段时,我需要返回 true 的函数,反之亦然。示例: -record(robot, {name, type=industrial, ho
我是一名优秀的程序员,十分优秀!