- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在为如何正确使用分区键机制而苦苦挣扎。我的逻辑是设置分区号为3,然后创建三个分区键为“0”、“1”、“2”,然后使用分区键创建三个KeyedMessage如
val props = new Properties()
val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(synchronously) "sync" else "async")
props.put("metadata.broker.list", brokerList)
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", messageSendMaxRetries.toString)
props.put("request.required.acks",requestRequiredAcks.toString)
props.put("client.id",clientId.toString)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
new KeyedMessage(topic,message)
} else {
new KeyedMessage(topic,partition,message)
}
}
def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
def send(message: Array[Byte], partition: Array[Byte]): Unit = {
try {
producer.send(kafkaMesssage(message, partition))
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
}
val testMessage = UUID.randomUUID().toString
val testTopic = "sample1"
val groupId_1 = "testGroup"
print("starting sample broker testing")
val producer = new KafkaProducer(testTopic, "localhost:9092")
val numList = List(0,1,2);
for (a <- numList) {
// Create a partition key as Byte Array
var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
//Here I give a Array[Byte] key
//so the second "send" function of producer will be called
producer.send(testMessage.getBytes("UTF8"), key)
}
最佳答案
人们通常认为分区是一种将业务数据按业务类别分开的方法,但这并不是查看分区的正确角度。
分区直接影响这些主题:
-性能(每个分区可以与其他分区并行使用)
-messages order(仅在分区级别保证的消息顺序)
我将举例说明我们如何创建分区:
你有一个话题,比如 MyMessagesToWorld
您想将此主题(所有 MyMessagesToWorld)传输给某个消费者。
您“称重”了 MyMessagesToWorld 的整个“质量”并发现,这是 10 公斤。
您在“MyMessagesToWorld”中有以下“业务”类别:
-给爸爸的信息 (D)
-给妈妈的信息 (M)
- 给姐姐的消息(S)
- 给奶奶的消息 (G)
-给老师的消息 (T)
-给女 friend 的信息 (F)
你想,谁是你的消费者,并发现你的消费者是侏儒,每个小时可以消耗 1 公斤消息。
你最多可以雇佣 2 个这样的侏儒。
1 个侏儒需要 10 小时才能消耗 10 公斤消息,2 个侏儒需要 5 小时。
所以你决定使用所有可用的侏儒来节省时间。
要为这 2 个侏儒创建 2 个“ channel ”,您需要在 Kafka 上创建此主题的 2 个分区。如果您预想更多侏儒,请创建更多分区。
您内部有 6 个业务类别和 2 个连续的独立消费者 - gnomes(消费者线程)。
该怎么办?
卡夫卡的做法如下:
假设您在集群中有 2 个 kafka 实例。
(同一个例子 OK ,如果集群中有更多的实例)
您在 Kafka 上将分区号设置为 2,例如(以 Kafka 0.8.2.1 为例):
您在 Kafka 中定义您的主题,告诉您该主题有 2 个分区:
kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld
KEY M, hashcode=12345, partition for M = 12345 % 2 = 1
public class CustomPartitioner {
private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>();
private static AtomicInteger sequence = new AtomicInteger();
private ReentrantLock lock = new ReentrantLock();
public int partition(ProducerRecord<String, Object> record, Cluster cluster) {
String key = record.key();
int seq = figureSeq(key);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
int part = seq % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
// no partitions are available, give a non-available partition
return seq % numPartitions;
}
}
private int figureSeq(String key) {
int sequentualNumber = 0;
if(keyDistributionTable.containsKey(key)){
sequentualNumber = keyDistributionTable.get(key);
}else{//synchronized region
//used only for new Keys, so high waiting time for monitor expected only on start
lock.lock();
try{
if(keyDistributionTable.containsKey(key)){
sequentualNumber = keyDistributionTable.get(key);
}else{
int seq = sequence.incrementAndGet();
keyDistributionTable.put(key, seq);
sequentualNumber = seq;
}
}finally{
lock.unlock();
}
}
return sequentualNumber;
}
关于scala - Kafka 分区键无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27373594/
自从我 faced an issue由于背景图片对于不同分辨率的内容来说太短,我尝试将背景分成 3 部分并自动拉伸(stretch)中间部分以相应地填充顶部和底部图像之间的空间。不幸的是我没能在 CS
我从去年开始就在我的程序中运行这个函数(Linux 和 Windows)。 现在我需要实现一个新功能,我的新构建不再运行。 我还有其他使用 POST 的 CUrl 函数,结果是一样的:没问题,但我的
在评估函数应用方面,Haskell 是只支持普通降阶还是也支持应用降阶?我是否认为正常顺序是 Haskell 惰性的原因? 最佳答案 GHC 运行时不使用术语缩减策略,因为那会非常低效。事实上,GHC
怎么来的multi使用多处理池对多个“进程”上的数据进行分段和处理的函数比仅调用 map 慢(8 秒)。功能(6 秒)? from multiprocessing import Pool import
假设我正在渲染一个 3d GL_TRIANGLE。该对象需要 3 个顶点才能定义:A、B、C。我将此类数据放入缓冲区并通过 glVertexAttribPointer 将其绑定(bind)到着色器。
我有一个字体的三个文件,普通的,粗体的和浅色的。由于 font-weight:light 不存在,我该如何在 font-face 上设置 light 呢? 顺便问一下,font-weight:ligh
我是 C 的新手,我似乎无法弄清楚什么似乎是一个非常简单的指针问题。我的程序将行号添加到文件中。它逐行读入文件,然后在每行的开头添加一个行号。它在每个文件上都可以正常工作,如下所示: soccer@s
我有以下代码,我不确定为什么当它命中 Myclass 的析构函数时我会收到堆损坏检测错误。我相信我正在正确地释放内存?? #include #include using namespace std
有什么方法可以将“正常”数学符号解释为逆波兰符号 (RPN)..? 例如1) 2 + 3*4 - 1 = 234*+1-2) 5 (4-8) = 548- 你可以假设遵循 BODMAS 规则并且必须首
http://www.ergotopia.de/ergonomie-shop/ergonomische-kissen/orthopaedisches-sitzkissen的手机页面应该看起来像右边(检
我正在 Phonegap/Cordova 中构建一个应用程序。应用目前相当简单,但确实需要网络状态和地理定位插件才能工作。 到目前为止,我已经在 Android 上开发了该应用程序(目前它仅由一些基本
我一整天都在做这个,但没有运气 我设法在一行 TfidfVectorizer 中消除了问题 这是我的工作代码 from sklearn.feature_extraction.text import C
也许有人看到一个错误,问题是当我按btn2 (button 2)和btn3 (button 3)应用程序crashes时,但操作仍然有效,即video正在运行并且PDF打开,而button 1正常工作
我正在开发一个应用程序。它的第一页是登录屏幕。成功登录后,我想将用户带到选项卡式 Activity 。我怎样才能在安卓中做到这一点?谢谢 最佳答案 在 Android 中,启动 Activity 是通
我不确定我在这里做错了什么。 :normal! I### 当我对一个单词执行此命令时,我想要的最终结果是: ### word 但是我得到了这个: ###word 最佳答案 Vim 的 :normal是
我必须将 2 个静态矩阵发送到分配动态矩阵的函数,将矩阵 1 乘以矩阵 2,并返回新矩阵的地址。请注意,COMM 很常见。 我尝试删除 free_matrix 行,它工作正常。 void main()
我在我的一个项目中使用 Gnome libglib 并遇到了一个奇怪的错误。我可以输入 GList 的元素数量看起来仅限于 45 个。在第 45 个元素处,它给出了此错误 40 counter 41
我正在尝试获取“顶级”HWND 的尺寸。即,我想要 Firefox/Windows 资源管理器等的主 HWND 的当前尺寸。窗口。如果窗口最小化, GetWindowRect() 将不起作用。 Get
相同的标题:什么是索引 - 正常 - 全文 - 唯一? 最佳答案 普通索引用于通过仅包含行数据的切片或散列来加速操作。 全文索引向数据库的全文搜索 (FTS) 引擎指示它应该将数据存档在给定字段中,以
我正在使用 EnumParser来自 here它在 VC++ 中编译得很好,但是使用 gcc 我有这样的错误: ./Terminator.o: In function `EnumParser::Enu
我是一名优秀的程序员,十分优秀!