gpt4 book ai didi

scala - Kafka 分区键无法正常工作

转载 作者:行者123 更新时间:2023-12-04 17:00:01 26 4
gpt4 key购买 nike

我正在为如何正确使用分区键机制而苦苦挣扎。我的逻辑是设置分区号为3,然后创建三个分区键为“0”、“1”、“2”,然后使用分区键创建三个KeyedMessage如

  • KeyedMessage(主题,“0”,消息)
  • KeyedMessage(主题,“1”,消息)
  • KeyedMessage(主题,“2”,消息)

  • 在此之后,创建一个生产者实例来发送所有的 KeyedMessage。

    我期望每个 KeyedMessage 应该根据不同的分区键进入不同的分区,这意味着
  • KeyedMessage(topic, "0", message) 转到分区 0
  • KeyedMessage(topic, "1", message) 转到分区 1
  • KeyedMessage(topic, "2", message) 转到分区 2

  • 我正在使用 Kafka-web-console 来查看主题状态,但结果并不像我期望的那样。 KeyedMessage 仍然随机进入分区,有时两个 KeyedMessage 会进入同一个分区,即使它们具有不同的分区键。

    为了让我的问题更清楚,我想发布一些我目前拥有的 Scala 代码,以及 我正在使用 Kafka 0.8.2-beta 和 Scala 2.10.4 .

    这是生产者代码, 我没有使用自定义 partitioner.class :
      val props = new Properties()

    val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

    props.put("compression.codec", codec.toString)
    props.put("producer.type", if(synchronously) "sync" else "async")
    props.put("metadata.broker.list", brokerList)
    props.put("batch.num.messages", batchSize.toString)
    props.put("message.send.max.retries", messageSendMaxRetries.toString)
    props.put("request.required.acks",requestRequiredAcks.toString)
    props.put("client.id",clientId.toString)

    val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

    def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
    if (partition == null) {
    new KeyedMessage(topic,message)
    } else {
    new KeyedMessage(topic,partition,message)
    }
    }

    def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

    def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
    producer.send(kafkaMesssage(message, partition))
    } catch {
    case e: Exception =>
    e.printStackTrace
    System.exit(1)
    }
    }

    这是我如何使用生产者,创建一个生产者实例,然后使用这个实例发送三个消息。目前我将分区键创建为整数,然后将其转换为字节数组:
      val testMessage = UUID.randomUUID().toString
    val testTopic = "sample1"
    val groupId_1 = "testGroup"

    print("starting sample broker testing")
    val producer = new KafkaProducer(testTopic, "localhost:9092")

    val numList = List(0,1,2);
    for (a <- numList) {
    // Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"), key)
    }

    不确定我的逻辑是否不正确,或者我没有正确理解分区键机制。任何人都可以提供一些示例代码或解释会很棒!

    最佳答案

    人们通常认为分区是一种将业务数据按业务类别分开的方法,但这并不是查看分区的正确角度。

    分区直接影响这些主题:

    -性能(每个分区可以与其他分区并行使用)

    -messages order(仅在分区级别保证的消息顺序)

    我将举例说明我们如何创建分区:

    你有一个话题,比如 MyMessagesToWorld

    您想将此主题(所有 MyMessagesToWorld)传输给某个消费者。

    您“称重”了 MyMessagesToWorld 的整个“质量”并发现,这是 10 公斤。

    您在“MyMessagesToWorld”中有以下“业务”类别:

    -给爸爸的信息 (D)

    -给妈妈的信息 (M)

    - 给姐姐的消息(S)

    - 给奶奶的消息 (G)

    -给老师的消息 (T)

    -给女 friend 的信息 (F)

    你想,谁是你的消费者,并发现你的消费者是侏儒,每个小时可以消耗 1 公斤消息。

    你最多可以雇佣 2 个这样的侏儒。

    1 个侏儒需要 10 小时才能消耗 10 公斤消息,2 个侏儒需要 5 小时。

    所以你决定使用所有可用的侏儒来节省时间。

    要为这 2 个侏儒创建 2 个“ channel ”,您需要在 Kafka 上创建此主题的 2 个分区。如果您预想更多侏儒,请创建更多分区。

    您内部有 6 个业务类别和 2 个连续的独立消费者 - gnomes(消费者线程)。

    该怎么办?

    卡夫卡的做法如下:

    假设您在集群中有 2 个 kafka 实例。
    (同一个例子 OK ,如果集群中有更多的实例)

    您在 Kafka 上将分区号设置为 2,例如(以 Kafka 0.8.2.1 为例):

    您在 Kafka 中定义您的主题,告诉您该主题有 2 个分区:

    kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld 

    现在主题 MyMessagesToWorld 有 2 个分区:P(0) 和 P(1)。

    您选择了数字 2(分区),因为您知道,您(洞察)只有 2 个消耗侏儒。

    您可以稍后添加更多分区,届时将使用更多消费者侏儒。

    不要将 Kafka 消费者与此类侏儒混淆。

    Kafka 消费者可以雇佣 N 个侏儒。 (N个平行线程)

    现在您为您的消息创建 key 。

    您需要 KEYS 在分区之间分发您的消息。

    键将是您之前定义的“业务类别”的这些字母:
    D、M、S、G、T、F,你认为这样的字母可以作为ID。

    但在一般情况下,任何可以用作 Key 的东西:
    (复杂的对象和字节数组,任何东西......)

    如果您不创建分区程序,则将使用默认分区程序。

    默认分区器有点愚蠢。

    它获取每个 KEY 的哈希码并将其除以可用分区数,“提醒”将定义该键的分区数。

    例子:
    KEY M, hashcode=12345, partition for M = 12345 % 2 = 1

    可以想象,在最好的情况下,使用这种分区器,每个分区中有 3 个业务类别。

    在更糟糕的情况下,您可以将所有业务类别放在 1 个分区中。

    如果你有 100000 个业务类别,按照这样的算法分配它们在统计上是可以的。

    但是只有少数几个类别,你的分配可能不太公平。

    因此,您可以更明智地重写分区程序并分配您的业务类别。

    有一个例子:

    此分区器在可用分区之间平均分配业务类别。
    public class CustomPartitioner {

    private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>();
    private static AtomicInteger sequence = new AtomicInteger();
    private ReentrantLock lock = new ReentrantLock();

    public int partition(ProducerRecord<String, Object> record, Cluster cluster) {

    String key = record.key();
    int seq = figureSeq(key);

    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());

    if (availablePartitions.size() > 0) {
    int part = seq % availablePartitions.size();
    return availablePartitions.get(part).partition();
    } else {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
    int numPartitions = partitions.size();
    // no partitions are available, give a non-available partition
    return seq % numPartitions;
    }
    }


    private int figureSeq(String key) {
    int sequentualNumber = 0;
    if(keyDistributionTable.containsKey(key)){
    sequentualNumber = keyDistributionTable.get(key);
    }else{//synchronized region
    //used only for new Keys, so high waiting time for monitor expected only on start
    lock.lock();
    try{
    if(keyDistributionTable.containsKey(key)){
    sequentualNumber = keyDistributionTable.get(key);
    }else{
    int seq = sequence.incrementAndGet();
    keyDistributionTable.put(key, seq);
    sequentualNumber = seq;
    }
    }finally{
    lock.unlock();
    }
    }
    return sequentualNumber;
    }

    }

    关于scala - Kafka 分区键无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27373594/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com