I have created a custom transformer which is using two state stores to delay nearly simultaneous stream inputs. It is storing the inputs as KeyValue-Pairs thus the queue has the signature KeyValueStore<String, KeyValue<K,V>>.
I do need to store the input records as KeyValue-Pairs and they have to be generic.
The Code of the Transformer
public class Throttler<K, V> implements Transformer<K, V, KeyValue<K, V>>{
private String queueStoreName;
private KeyValueStore<String, KeyValue<K,V>> queue;
private String recentlyStoreName;
private KeyValueStore<Long, V> recently;
private Serde<K> keySerde = null;
private Serde<V> valueSerde = null;
private ProcessorContext ctx;
private static final Logger LOG = LoggerFactory.getLogger(Throttler.class);
private Duration delay;
public Throttler(String queueStorename, String recentlyStoreName, Duration delayBetweenRecords, Serde<K> keySerde, Serde<V> valueSerde){
this.queueStoreName = queueStorename;
this.recentlyStoreName = recentlyStoreName;
this.delay = delayBetweenRecords;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
ctx = context;
queue = (KeyValueStore<String, KeyValue<K,V>>)ctx.getStateStore(queueStoreName);
recently = (KeyValueStore<Long, V>)ctx.getStateStore(recentlyStoreName);
ctx.schedule(Duration.ofMillis(300), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
}
public KeyValue<K, V> transform(K key, V value) {
Long storeKey = Instant.now().toEpochMilli();
// we want to put the current input within the queue if:
// a) a record has been released recently
// b) we have records within the queue because these should be released first
KeyValueIterator<Long, V> itRecentlies = recently.range(Instant.now().minusMillis(delay.toMillis()).toEpochMilli(), storeKey);
KeyValueIterator<String, KeyValue<K,V>> itQueue = queue.all();
if(itRecentlies.hasNext() || itQueue.hasNext()) {
queue.put(value.hashCode()+"::"+storeKey, KeyValue.pair(key,value));
LOG.info(String.format("Storing: [%s],ts:[%s] in Queue", value,storeKey));
itRecentlies.close();
itQueue.close();
return null;
}
itRecentlies.close();
itQueue.close();
recently.put(storeKey, value);
LOG.info(String.format("Directly forwarding:[%s],ts:[%s]", value,storeKey));
return KeyValue.pair(key, value);
}
private void punctuate(long ts) {
try {
releaseRecordsIfDue();
}catch (Exception e) {
LOG.error("ERROR releasing Messages from Queue...");
LOG.error(e.getMessage());
e.printStackTrace();
}
}
private void releaseRecordsIfDue() {
Instant now = Instant.now();
Long tsNow = now.toEpochMilli();
Long tsNowMinusDelay = now.minusMillis(delay.toMillis()).toEpochMilli();
KeyValueIterator<Long, V> otherIt = recently.all();
while(otherIt.hasNext()) {
KeyValue<Long, V> entry = otherIt.next();
if(entry.key.longValue() <= tsNowMinusDelay) {
LOG.info(String.format("Cleaning 'recently-send':[%s],ts:[%s]", entry.value,entry.key));
recently.delete(entry.key);
}else {
LOG.info(String.format("Record: [%s] NOT cleaned from 'recently-send'", entry.value));
}
}
otherIt.close();
KeyValueIterator<String, KeyValue<K,V>> it = queue.all();
while(it.hasNext()) {
KeyValue<String, KeyValue<K,V>> entry = it.next();
Long tsEntry = Long.valueOf(entry.key.split("::")[1]);
KeyValueIterator<Long, V> itRecentlies = recently.range(tsNowMinusDelay, tsNow);
if(tsEntry.longValue() <= tsNowMinusDelay && !itRecentlies.hasNext()) {
LOG.info(String.format("Queue releasing value:[%s], key:[%s]", entry.value,entry.key));
LOG.info(String.format("k-type: %s v-type: %s",entry.value.key.getClass(), entry.value.value.getClass()));
queue.delete(entry.key);
recently.put(tsNow, entry.value.value);
itRecentlies.close();
ctx.forward(entry.value.key, entry.value.value);
break; // we only allow one entry to be send at a time
}
itRecentlies.close();
}
it.close();
long queueSize = queue.approximateNumEntries();
LOG.info(String.format("Queue size: [%s]", queueSize));
}
public void close() {}
}
The transformer worked just fine if I only forward/store the V with the corresponding serde, set on the state-store. But I do need the key to be forwarded too and its dynamic.
To solve the problem I wrote an KeyValueSerde on my own.
KeyValueSerde
public class CustomSerdes {
public static <K, V> Serde<KeyValue<K,V>> KayVee(Serializer<KeyValue<K, V>> serializer, Deserializer<KeyValue<K, V>> deserializer){
return new KayVeeSerde<>(serializer, deserializer);
}
public static final class KayVeeSerde<K,V> extends WrapperSerde<KeyValue<K,V>> {
public KayVeeSerde(Serializer<KeyValue<K, V>> serializer, Deserializer<KeyValue<K, V>> deserializer) {
super(serializer, deserializer);
}
}
}
KeyValueSerializer
public class KayVeeSerializer<K, V> implements Serializer<KeyValue<K, V>> {
private Serializer<K> keySerializer;
private Serializer<V> valueSerializer;
private final ObjectMapper mapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(KayVeeSerializer.class);
public KayVeeSerializer(Serializer<K> keySerializer, Serializer<V> valueSerializer, Class<K> classK, Class<V> classV) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
mapper.addMixIn(classK, IgnoreSchemaProperty.class);
mapper.addMixIn(classV, IgnoreSchemaProperty.class);
}
@Override
public byte[] serialize(String topic, KeyValue<K, V> data) {
if (data == null)
return null;
byte[] keySerialized = keySerializer.serialize(topic, data.key);
byte[] valueSerialized = valueSerializer.serialize(topic, data.value);
KayVee<byte[], byte[]> out = KayVee.pair(keySerialized, valueSerialized);
try {
byte[] asBytes = mapper.writeValueAsBytes(out);
return asBytes;
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
return null;
}
}
// had some issues with the schema-field on avro-generated data
public abstract class IgnoreSchemaProperty {
@JsonIgnore
abstract void getSchema();
}
}
KeyValueDeserializer
public class KayVeeDeserializer<K,V> implements Deserializer<KeyValue<K, V>> {
private Deserializer<K> keyDeserializer;
private Deserializer<V> valueDeserializer;
private final ObjectMapper mapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(KayVeeSerializer.class);
public KayVeeDeserializer() {
}
public KayVeeDeserializer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this();
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
@Override
public KeyValue<K, V> deserialize(String topic, byte[] data) {
KeyValue<K, V> out = null;
if (data != null) {
try {
KayVee<byte[],byte[]> temp = mapper.readValue(data, KayVee.class);
out = KeyValue.pair(keyDeserializer.deserialize(topic,temp.key),valueDeserializer.deserialize(topic, temp.value));
return out;
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
return null;
}
}
return out;
}
}
Because the objectMapper was complaining about missing 'no-arg-constructor' on KeyValue while deserialization I did create the simple KayVee-class to mitigate this problem.
public class KayVee<K,V> {
K key;
V value;
public KayVee() {}
KayVee(K key, V value){
this();
this.key = key;
this.value = value;
}
public static <K,V> KayVee pair(K key, V value) {
return new KayVee<>(key, value);
}
public K getKey() {
return key;
}
public void setKey(K key) {
this.key = key;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}
@Override
public String toString() {
return "KayVee [key=" + key + ", value=" + value + "]";
}
}
KayVee is used in between serialization and deserialization but only intermediatly. The deserializer returns KeyValue to downstream and the serializer receives KeyValue from upstream.
The Topology is simple for testing:
class SerdeTests {
private static String schemaRegistryUrl = "http://localhost:8081";
private static final Map<String, String> avroSerdeConf = Collections.singletonMap("schema.registry.url",
schemaRegistryUrl);
private static final String INP_TOPIC_NAME = "input-topic";
private static final String OUT_TOPIC_NAME = "output-topic";
private static TopologyTestDriver testDriver;
private static Serde<SendMessageCommandKey> serdeKey;
private static Serde<SendMessageCommand> serdeValue;
private static TestInputTopic<SendMessageCommandKey, SendMessageCommand> inputTopic;
private static TestOutputTopic<SendMessageCommandKey, SendMessageCommand> outputTopic;
private static Serde<String> serdeString;
private static Serde<Long> serdeLong;
private static Serde<KeyValue<SendMessageCommandKey,SendMessageCommand>> serdeKeyValue;
private static SendMessageCommand cmd;
private static SendMessageCommandKey key;
private static SendMessageCommand secondCmd;
@SuppressWarnings("unchecked")
@BeforeAll
public static void setup() {
long ts = Instant.now().toEpochMilli();
cmd = SendMessageCommand.newBuilder().setMessage("Das ist eine Nachricht mit einem Umlauten äöü!")
.setReceiver("Junit").setSender("Bugzilla").setTimestamp(ts).build();
key = SendMessageCommandKey.newBuilder().setSender("Bugzilla").build();
secondCmd = SendMessageCommand.newBuilder(cmd)
.setTimestamp(ts+500)
.build();
serdeKey = new SpecificAvroSerde<>();
serdeKey.configure(avroSerdeConf, true);
serdeValue = new SpecificAvroSerde<>();
serdeValue.configure(avroSerdeConf, false);
serdeString = Serdes.String();
serdeLong = Serdes.Long();
serdeKeyValue = CustomSerdes.KayVee(
new KayVeeSerializer<>(
serdeKey.serializer(),
serdeValue.serializer(),
SendMessageCommandKey.class,
SendMessageCommand.class
), new KayVeeDeserializer<>(
serdeKey.deserializer(),
serdeValue.deserializer()
)
);
StreamsBuilder builder = new StreamsBuilder();
Properties p = new Properties();
p.put("application.id", "kafka-streams-utils-tests");
p.put("bootstrap.servers","localhost:9092");
StoreBuilder<KeyValueStore<String, KeyValue<SendMessageCommandKey, SendMessageCommand>>> keyValueStoreBuilder = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("throttler-queue"), serdeString,
serdeKeyValue).withLoggingDisabled();
StoreBuilder<KeyValueStore<Long, SendMessageCommand>> keyValueStoreBuilder2 = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("throttler-recently"), serdeLong,
serdeValue).withLoggingDisabled();
builder.addStateStore(keyValueStoreBuilder);
builder.addStateStore(keyValueStoreBuilder2);
builder
.stream(
INP_TOPIC_NAME,
Consumed.with(serdeKey, serdeValue)
).peek(
(k,v)->System.out.println("K:["+k+"], V:["+v+"]")
).transform(() -> new Throttler<SendMessageCommandKey, SendMessageCommand>("throttler-queue", "throttler-recently",
Duration.ofSeconds(120l), serdeKey, serdeValue), "throttler-queue", "throttler-recently"
).peek(
(k,v)->System.out.println("K-after-queue:["+k+"], V-after-queue:["+v+"]")
)
.to(
OUT_TOPIC_NAME,
Produced.with(serdeKey, serdeValue)
);
Topology t = builder.build();
testDriver = new TopologyTestDriver(t, p);
inputTopic = testDriver.createInputTopic(INP_TOPIC_NAME, serdeKey.serializer(), serdeValue.serializer());
outputTopic = testDriver.createOutputTopic(OUT_TOPIC_NAME, serdeKey.deserializer(), serdeValue.deserializer());
}
@AfterAll
public static void tearDown() {
testDriver.close();
serdeKey.close();
serdeValue.close();
serdeString.close();
serdeLong.close();
}
@AfterEach
public void cleanup() {
testDriver.advanceWallClockTime(Duration.ofHours(1));
}
@Test
void onlyDirectlyForwardsFirst() {
KeyValueOnWire<SendMessageCommandKey, SendMessageCommand> kvo = new KeyValueOnWire<>(key, cmd);
KeyValue<SendMessageCommandKey, SendMessageCommand> kv = KeyValue.pair(key, cmd);
inputTopic.pipeInput(key, cmd);
inputTopic.pipeInput(key, secondCmd);
assertThat(outputTopic.readValuesToList()).hasSize(1).contains(cmd, Index.atIndex(0));
}
@Test
void forwardsByWallclock() {
inputTopic.pipeInput(key, cmd);
inputTopic.pipeInput(key, secondCmd);
inputTopic.pipeInput(key, secondCmd);
assertThat(outputTopic.getQueueSize()).isEqualTo(2l);
inputTopic.pipeInput(key, cmd);
inputTopic.pipeInput(key, secondCmd);
assertThat(outputTopic.getQueueSize()).isEqualTo(1l);
}
}
I get the following Stacktrace:
K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699369}] K-after-queue:[{"sender": "Bugzilla"}], V-after-queue:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699369}] K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099698980}] K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699480}] K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699480}] java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap') at org.an.company.kafka_streams.serialization.KayVeeDeserializer.deserialize(KayVeeDeserializer.java:40) at org.an.company.kafka_streams.serialization.KayVeeDeserializer.deserialize(KayVeeDeserializer.java:10) at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:207) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.access$100(MeteredKeyValueStore.java:47) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:245) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:222) at org.an.company.kafka_streams.processors.Throttler.releaseRecordsIfDue(Throttler.java:109) at org.an.company.kafka_streams.processors.Throttler.punctuate(Throttler.java:80) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) at org.apache.kafka.streams.TopologyTestDriver.advanceWallClockTime(TopologyTestDriver.java:653) at org.an.company.kafka_streams.serialization.SerdeTests.cleanup(SerdeTests.java:141) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptAfterEachMethod(TimeoutExtension.java:108) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:490) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeAfterEachMethodAdapter$20(ClassBasedTestDescriptor.java:480) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAfterEachMethods$9(TestMethodTestDescriptor.java:236) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAllAfterMethodsOrCallbacks$12(TestMethodTestDescriptor.java:269) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAllAfterMethodsOrCallbacks$13(TestMethodTestDescriptor.java:269) at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAllAfterMethodsOrCallbacks(TestMethodTestDescriptor.java:268) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAfterEachMethods(TestMethodTestDescriptor.java:234) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
I really dont know why it is complaining about the class cast exception:
java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap') at org.an.company.kafka_streams.serialization.KayVeeDeserializer.deserialize(KayVeeDeserializer.java:40)
The data should be byte[].
Thanks in advance for your help!
EDIT:
As per suggestion my question was answered by: Jackson serializing and deserializing Map<String, byte[]>
The solution was to supply a TypReference to the objectMapper because after serialization information about the complex type KayVee<byte[], byte[]> was lost leading to the isssue.
TypeReference<KayVee<byte[], byte[]>> tRef = new TypeReference<KayVee<byte[],byte[]>>() {};
KayVee<byte[],byte[]> temp = mapper.readValue(data, tRef);