155
KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message)); 
producer.send(keyedMessage);

Currently, I am sending messages without any key as part of keyed messages, will it still work with delete.retention.ms? Do I need to send a key as part of the message? Is this good to make key as part of the message?

Michael Heil
  • 13,037
  • 3
  • 31
  • 58
gaurav
  • 2,524
  • 6
  • 23
  • 26

3 Answers3

254

Keys are mostly useful/necessary if you require strong order for a key and are developing something like a state machine. If you require that messages with the same key (for instance, a unique id) are always seen in the correct order, attaching a key to messages will ensure messages with the same key always go to the same partition in a topic. Kafka guarantees order within a partition, but not across partitions in a topic, so alternatively not providing a key - which will result in round-robin distribution across partitions - will not maintain such order.

In the case of a state machine, keys can be used with log.cleaner.enable to deduplicate entries with the same key. In that case, Kafka assumes that your application only cares about the most recent instance of a given key and the log cleaner deletes older duplicates of a given key only if the key is not null. This form of log compaction is controlled by the log.cleaner.delete.retention property and requires keys.

Alternatively, the more common property log.retention.hours, which is enabled by default, works by deleting complete segments of the log that are out of date. In this case keys do not have to be provided. Kafka will simply delete chunks of the log that are older than the given retention period.

That's all to say, if you've enabled log compaction or require strict order for messages with the same key then you should definitely be using keys. Otherwise, null keys may provide better distribution and prevent potential hot spotting issues in cases where some keys may appear more than others.

kuujo
  • 7,030
  • 1
  • 24
  • 20
  • I am new to Kafka that's the reason asking so many question : There are couple of question on this : First Question , Can we consume the message on the key basis , Currently i am consuming message from MessagAndMetadata mm . or is it fine to ignore key at the time of consuming message.I am using hig Level Consumer Api. – gaurav Apr 09 '15 at 05:26
  • 1
    @kuujo I am assuming this de-duplication is only for log entries, it does not necessarily de-duplicate messages on a topic queue? – user1658296 Oct 31 '16 at 10:45
  • @kuujo In the first paragraph of your answer, you say : `attaching a key to messages will ensure messages with the same key always go to the same partition in a topic`. I don't get this part and how/why is that a benefit or use-case. Isn't the **log compaction** supposed to clear messages with same key over time ? – oblivion Nov 11 '17 at 15:56
  • 3
    @oblivion having messages go into the same partition sequentially is important for handling non-idemponent updates e.g. customer selects delivery date (one message) but changes mind later (second message). If the messages were to go to different partitions then either message may be processed first / last e.g. with 2 consumers consuming from each partition. If both messages relating to the same Delivery go into the same partition then they're processed first-in-first-out, giving the correct final delivery date. – Kunal Dec 03 '18 at 23:05
  • 4
    The order guarantees come not from the key but from messages being in the same partition. The routing of messages to partitions doesn't have to be key-based. You can explicitly specify a partition when creating a `ProducerRecord` – Malt Feb 07 '19 at 14:07
  • 3
    My understanding is the producer client is responsible for choosing the partition (http://kafka.apache.org/documentation.html#design_loadbalancing), which may or may not be based on key. So why do you say keys are necessary for ordering? – lfk Jun 06 '19 at 08:24
  • Producers default to round-robin allocation if there's no key for hash-based partitioning, but this can be explicitly overridden by the producer if manual partitioning is required. – boycy Aug 10 '21 at 10:54
  • You could specify a key as well as an explicit partition. From JavaDocs: https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html – Khanna111 May 03 '22 at 03:15
68

tl;dr No, a key is not required as part of sending messages to Kafka. But...


In addition to the very helpful accepted answer I would like to add a few more details

Partitioning

By default, Kafka uses the key of the message to select the partition of the topic it writes to. This is done in the DefaultPartitioner by

kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

If there is no key provided, then Kafka will partition the data in a round-robin fashion.

In Kafka, it is possible to create your own Partitioner by extending the Partitioner class. For this, you need to override the partition method which has the signature:

int partition(String topic, 
              Object key,
              byte[] keyBytes,
              Object value,
              byte[] valueBytes,
              Cluster cluster)

Usually, the key of a Kafka message is used to select the partition and the return value (of type int) is the partition number. Without a key, you need to rely on the value which might be much more complex to process.

Ordering

As stated in the given answer, Kafka has guarantees on ordering of the messages only at partition level.

Let's say you want to store financial transactions for your customers in a Kafka topic with two partitions. The messages could look like (key:value)

null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}

As we do not have defined a key the two partitions will presumably look like

// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}

// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}

Your consumer reading that topic could end up telling you that the balance on the account is 600 at a particular time although that was never the case! Just because it was reading all messages in partition 0 in prior to the messages in partition 1.

With a senseful key (like customerId) this could be avoided as the partitoning would be like this:

// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}

// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}

Remember, that the ordering within a partition is only guaranteed with the producer configuration max.in.flight.requests.per.connection set to 1. The default value for that configuration is, however, 5 and it is described as:

"The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled)."

You can find more details on this in another Stackoverflow post on Kafka - Message Ordering Guarantees.

Log compaction

Without a key as part of your messages, you will not be able to set the topic configuration cleanup.policy to compacted. According to the documentation "log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.".

This nice and helpful setting will not be available without any key.

Usage of Keys

In real-life use cases, the key of a Kafka message can have a huge influence on your performance and clarity of your business logic.

A key can for example be used naturally for partitioning your data. As you can control your consumers to read from particular partitions this could serve as an efficient filter. Also, the key can include some meta data on the actual value of the message that helps you control the subsequent processing. Keys are usually smaller then values and it is therefore more convenient to parse a key instead of the whole value. At the same time, you can apply all serializations and schema registration as done with your value also with the key.

As a note, there is also the concept of Header that can be used to store information, see documentation.

Michael Heil
  • 13,037
  • 3
  • 31
  • 58
  • What could happen if a producer, for instance, was attempting to write messages to 100 topic partitions (ex.: messages with numerical keys ranging from 0 to 99) when the broker has only 10 topic partitions? Would the messages be distributed using the default mechanism by round robin? – dandev486 Mar 23 '21 at 21:22
  • @dandev486 not sure if I understand your question correctly. If you try to write a PRoducerRecord to a non-existing TopicPartition the producer will throw an exception. If you use numeric keys 0 to 99 then the messages gets distributed accross the 10 partitions based on `hash(key) % 10` as described in my answer. – Michael Heil Mar 24 '21 at 09:57
  • @Mike, one follow-up question. I understand providing the key preservers the ordering of the messages, is it true in "all" cases ? say , producer sent ( k1,m1,t) and (k1,m2,t) . will it be guaranteed all times m1 will get low offset than m2 ( meaning will m2 be consider as latest message compared with m1) – Nag May 06 '21 at 07:29
  • what if the producer fails when sending m1 but it successully sent m2 and m1 sent later as part of retries ? – Nag May 06 '21 at 07:29
  • @Nag see my other answer [here](https://stackoverflow.com/questions/61832615/kafka-message-ordering-guarantees/61832820#61832820). – Michael Heil May 06 '21 at 07:43
  • 1
    :-), means it is NOT guaranteed all times even if we send key ? – Nag May 06 '21 at 08:29
  • 1
    @Nag Yes, that is correct. I will make this clear in my answer. Thank you for pointing thid out – Michael Heil May 06 '21 at 08:59
  • "control your consumers to read from particular partitions this could serve as an efficient filter", this strategy can cause consumers to consume unintended events if `numPartitions` of the topic is changed. – CᴴᴀZ Sep 16 '21 at 14:16
  • you dont need set "max.in.flight.requests.per.connection=1" for ordering, if you enabled idempotence with "enable.idempotence=true". Here is the [link](https://kafka-tutorials.confluent.io/message-ordering/kafka.html). – iesen Dec 01 '21 at 23:25
1

The key with a message is basically sent to get the message ordering for a specific field.

  • If key=null, data is sent round-robin (to a different partition and to a different broker in a distributed env. and of course to the same topic.).
  • If a key is sent, then all messages for that key will always go to the same partition.

Explain and example

  • key can be any string or integer, etc.. take an example of an integer employee_id as key.
  • So emplyee_id 123 will always go to partition 0, employee_id 345 will always go to partition 1. This is decided by the key hashing algorithm which depends on the number of partitions.
  • if you don't send any key then the message can go to any partition using a round-robin technique.
Pradeep Singh
  • 984
  • 1
  • 7
  • 24