0

I have created a custom transformer which is using two state stores to delay nearly simultaneous stream inputs. It is storing the inputs as KeyValue-Pairs thus the queue has the signature KeyValueStore<String, KeyValue<K,V>>.

I do need to store the input records as KeyValue-Pairs and they have to be generic.

The Code of the Transformer

public class Throttler<K, V> implements Transformer<K, V, KeyValue<K, V>>{
    private String queueStoreName;
    private KeyValueStore<String, KeyValue<K,V>> queue;
    private String recentlyStoreName;
    private KeyValueStore<Long, V> recently;
    
    private Serde<K> keySerde = null;
    private Serde<V> valueSerde = null;

    private ProcessorContext ctx;

    private static final Logger LOG = LoggerFactory.getLogger(Throttler.class);
    private Duration delay;
    
    public Throttler(String queueStorename, String recentlyStoreName, Duration delayBetweenRecords, Serde<K> keySerde, Serde<V> valueSerde){
        this.queueStoreName = queueStorename;
        this.recentlyStoreName = recentlyStoreName;
        this.delay = delayBetweenRecords;
        
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        ctx = context;
        queue = (KeyValueStore<String, KeyValue<K,V>>)ctx.getStateStore(queueStoreName);
        recently = (KeyValueStore<Long, V>)ctx.getStateStore(recentlyStoreName);

        ctx.schedule(Duration.ofMillis(300), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
    }

    public KeyValue<K, V> transform(K key, V value) {
        Long storeKey = Instant.now().toEpochMilli();
        
        // we want to put the current input within the queue if:
        // a) a record has been released recently
        // b) we have records within the queue because these should be released first
        KeyValueIterator<Long, V> itRecentlies = recently.range(Instant.now().minusMillis(delay.toMillis()).toEpochMilli(), storeKey);
        KeyValueIterator<String, KeyValue<K,V>> itQueue = queue.all();
        if(itRecentlies.hasNext() || itQueue.hasNext()) {
            queue.put(value.hashCode()+"::"+storeKey, KeyValue.pair(key,value));
            LOG.info(String.format("Storing: [%s],ts:[%s] in Queue", value,storeKey));
            
            itRecentlies.close();
            itQueue.close();

            return null;
        }

        itRecentlies.close();
        itQueue.close();

        recently.put(storeKey, value);
        LOG.info(String.format("Directly forwarding:[%s],ts:[%s]", value,storeKey));

         return KeyValue.pair(key, value);
    }

    private void punctuate(long ts) {
        try {
            releaseRecordsIfDue();
        }catch (Exception e) {
            LOG.error("ERROR releasing Messages from Queue...");
            LOG.error(e.getMessage());
            e.printStackTrace();
        }
    }
    private void releaseRecordsIfDue() {
        Instant now = Instant.now();
        Long tsNow = now.toEpochMilli();
        Long tsNowMinusDelay = now.minusMillis(delay.toMillis()).toEpochMilli();

        KeyValueIterator<Long, V> otherIt = recently.all();

        while(otherIt.hasNext()) {
            KeyValue<Long, V> entry = otherIt.next();
            if(entry.key.longValue() <= tsNowMinusDelay) {
                LOG.info(String.format("Cleaning 'recently-send':[%s],ts:[%s]", entry.value,entry.key));
                recently.delete(entry.key);
            }else {
                LOG.info(String.format("Record: [%s] NOT cleaned from 'recently-send'", entry.value));
            }

        }
        otherIt.close();

        KeyValueIterator<String, KeyValue<K,V>> it = queue.all();

        while(it.hasNext()) {
            KeyValue<String, KeyValue<K,V>> entry = it.next();
            Long tsEntry = Long.valueOf(entry.key.split("::")[1]);
            KeyValueIterator<Long, V> itRecentlies = recently.range(tsNowMinusDelay, tsNow);
            if(tsEntry.longValue() <= tsNowMinusDelay && !itRecentlies.hasNext()) {
                LOG.info(String.format("Queue releasing value:[%s], key:[%s]", entry.value,entry.key));
                LOG.info(String.format("k-type: %s v-type: %s",entry.value.key.getClass(), entry.value.value.getClass()));
                queue.delete(entry.key);
                recently.put(tsNow, entry.value.value);
                itRecentlies.close();
                ctx.forward(entry.value.key, entry.value.value);
                break;  // we only allow one entry to be send at a time
            }
            itRecentlies.close();
        }
        it.close();
        
        long queueSize = queue.approximateNumEntries();
        LOG.info(String.format("Queue size: [%s]", queueSize));
    }

    public void close() {}
}

The transformer worked just fine if I only forward/store the V with the corresponding serde, set on the state-store. But I do need the key to be forwarded too and its dynamic.

To solve the problem I wrote an KeyValueSerde on my own.

KeyValueSerde

public class CustomSerdes {
    public static <K, V> Serde<KeyValue<K,V>> KayVee(Serializer<KeyValue<K, V>> serializer, Deserializer<KeyValue<K, V>> deserializer){
        return new KayVeeSerde<>(serializer, deserializer);
    }
    
    public static final class KayVeeSerde<K,V> extends WrapperSerde<KeyValue<K,V>> {
        public KayVeeSerde(Serializer<KeyValue<K, V>> serializer, Deserializer<KeyValue<K, V>> deserializer) {
            super(serializer, deserializer);
        }
    }
}

KeyValueSerializer

public class KayVeeSerializer<K, V> implements Serializer<KeyValue<K, V>> {
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private final ObjectMapper mapper = new ObjectMapper();
    private static final Logger LOG = LoggerFactory.getLogger(KayVeeSerializer.class);

    public KayVeeSerializer(Serializer<K> keySerializer, Serializer<V> valueSerializer, Class<K> classK, Class<V> classV) {
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        
        mapper.addMixIn(classK, IgnoreSchemaProperty.class);
        mapper.addMixIn(classV, IgnoreSchemaProperty.class);
    }

    @Override
    public byte[] serialize(String topic, KeyValue<K, V> data) {
        if (data == null)
            return null;

        byte[] keySerialized = keySerializer.serialize(topic, data.key);
        byte[] valueSerialized = valueSerializer.serialize(topic, data.value);
        KayVee<byte[], byte[]> out = KayVee.pair(keySerialized, valueSerialized);

        try {
            byte[] asBytes = mapper.writeValueAsBytes(out);
            return asBytes;
        } catch (Exception e) {
            LOG.error(e.getMessage());
            e.printStackTrace();

            return null;
        }
    }
    // had some issues with the schema-field on avro-generated data
    public abstract class IgnoreSchemaProperty {
        @JsonIgnore
        abstract void getSchema();
    }

}

KeyValueDeserializer

public class KayVeeDeserializer<K,V> implements Deserializer<KeyValue<K, V>> {

    private Deserializer<K> keyDeserializer;
    private Deserializer<V> valueDeserializer;
    private final ObjectMapper mapper = new ObjectMapper();
    private static final Logger LOG = LoggerFactory.getLogger(KayVeeSerializer.class);

    public KayVeeDeserializer() {
        
    }
    public KayVeeDeserializer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
            this();
            this.keyDeserializer = keyDeserializer;
            this.valueDeserializer = valueDeserializer;
    }

    @Override
    public KeyValue<K, V> deserialize(String topic, byte[] data) {
        KeyValue<K, V> out = null;

        if (data != null) {
            try {
                KayVee<byte[],byte[]> temp = mapper.readValue(data, KayVee.class);
                out = KeyValue.pair(keyDeserializer.deserialize(topic,temp.key),valueDeserializer.deserialize(topic, temp.value));
                return out;
            } catch (Exception e) {
                LOG.error(e.getMessage());
                e.printStackTrace();

                return null;
            }
        }

        return out;
    }

}

Because the objectMapper was complaining about missing 'no-arg-constructor' on KeyValue while deserialization I did create the simple KayVee-class to mitigate this problem.

public class KayVee<K,V> {
    K key;
    V value;
    
    public KayVee() {}
    
    KayVee(K key, V value){
        this();
        this.key = key;
        this.value = value;
    }
    public static  <K,V> KayVee pair(K key, V value) {
        return new KayVee<>(key, value);
    }

    public K getKey() {
        return key;
    }

    public void setKey(K key) {
        this.key = key;
    }

    public V getValue() {
        return value;
    }

    public void setValue(V value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "KayVee [key=" + key + ", value=" + value + "]";
    }
}

KayVee is used in between serialization and deserialization but only intermediatly. The deserializer returns KeyValue to downstream and the serializer receives KeyValue from upstream.

The Topology is simple for testing:

class SerdeTests {
    private static String schemaRegistryUrl = "http://localhost:8081";
    private static final Map<String, String> avroSerdeConf = Collections.singletonMap("schema.registry.url",
            schemaRegistryUrl);
    private static final String INP_TOPIC_NAME = "input-topic";
    private static final String OUT_TOPIC_NAME = "output-topic";

    private static TopologyTestDriver testDriver;
    private static Serde<SendMessageCommandKey> serdeKey;
    private static Serde<SendMessageCommand> serdeValue;
    private static TestInputTopic<SendMessageCommandKey, SendMessageCommand> inputTopic;
    private static TestOutputTopic<SendMessageCommandKey, SendMessageCommand> outputTopic;
    private static Serde<String> serdeString;
    private static Serde<Long> serdeLong;
    private static Serde<KeyValue<SendMessageCommandKey,SendMessageCommand>> serdeKeyValue;
    private static SendMessageCommand cmd;
    private static SendMessageCommandKey key;
    private static SendMessageCommand secondCmd;
    
    @SuppressWarnings("unchecked")
    @BeforeAll
    public static void setup() {
        long ts = Instant.now().toEpochMilli();
        cmd = SendMessageCommand.newBuilder().setMessage("Das ist eine Nachricht mit einem Umlauten äöü!")
                .setReceiver("Junit").setSender("Bugzilla").setTimestamp(ts).build();
        key = SendMessageCommandKey.newBuilder().setSender("Bugzilla").build();
        
        secondCmd = SendMessageCommand.newBuilder(cmd)
                .setTimestamp(ts+500)
                .build();
        serdeKey = new SpecificAvroSerde<>();
        serdeKey.configure(avroSerdeConf, true);

        serdeValue = new SpecificAvroSerde<>();
        serdeValue.configure(avroSerdeConf, false);
        serdeString = Serdes.String();
        serdeLong = Serdes.Long();
        serdeKeyValue = CustomSerdes.KayVee(
                new KayVeeSerializer<>(
                        serdeKey.serializer(),
                        serdeValue.serializer(),
                        SendMessageCommandKey.class,
                        SendMessageCommand.class
                ), new KayVeeDeserializer<>(
                        serdeKey.deserializer(),
                        serdeValue.deserializer()
                )
        );
        
        StreamsBuilder builder = new StreamsBuilder();
        Properties p = new Properties();
        p.put("application.id", "kafka-streams-utils-tests");
        p.put("bootstrap.servers","localhost:9092");

        StoreBuilder<KeyValueStore<String, KeyValue<SendMessageCommandKey, SendMessageCommand>>> keyValueStoreBuilder = Stores
                .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("throttler-queue"), serdeString,
                        serdeKeyValue).withLoggingDisabled();
                        
        StoreBuilder<KeyValueStore<Long, SendMessageCommand>> keyValueStoreBuilder2 = Stores
                .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("throttler-recently"), serdeLong,
                        serdeValue).withLoggingDisabled();

        builder.addStateStore(keyValueStoreBuilder);
        builder.addStateStore(keyValueStoreBuilder2);

        builder
        .stream(
            INP_TOPIC_NAME,
            Consumed.with(serdeKey, serdeValue)
        ).peek(
            (k,v)->System.out.println("K:["+k+"], V:["+v+"]")
        ).transform(() -> new Throttler<SendMessageCommandKey, SendMessageCommand>("throttler-queue", "throttler-recently",
                Duration.ofSeconds(120l), serdeKey, serdeValue), "throttler-queue", "throttler-recently"
        ).peek(
            (k,v)->System.out.println("K-after-queue:["+k+"], V-after-queue:["+v+"]")
        )
        .to(
            OUT_TOPIC_NAME,
            Produced.with(serdeKey, serdeValue)
        );
        
        
        
        Topology t = builder.build();
        testDriver = new TopologyTestDriver(t, p);
        
        inputTopic = testDriver.createInputTopic(INP_TOPIC_NAME, serdeKey.serializer(), serdeValue.serializer());
        outputTopic = testDriver.createOutputTopic(OUT_TOPIC_NAME, serdeKey.deserializer(), serdeValue.deserializer());
        
        
    }
    @AfterAll
    public static void tearDown() {

        testDriver.close();
        
        serdeKey.close();
        serdeValue.close();
        serdeString.close();
        serdeLong.close();
    }
    @AfterEach
    public void cleanup() {
        testDriver.advanceWallClockTime(Duration.ofHours(1));
    }
    @Test
    void onlyDirectlyForwardsFirst() {
        KeyValueOnWire<SendMessageCommandKey, SendMessageCommand> kvo = new KeyValueOnWire<>(key, cmd);

        KeyValue<SendMessageCommandKey, SendMessageCommand> kv = KeyValue.pair(key, cmd);
        inputTopic.pipeInput(key, cmd);
        inputTopic.pipeInput(key, secondCmd);
        assertThat(outputTopic.readValuesToList()).hasSize(1).contains(cmd, Index.atIndex(0));
    }
    @Test
    void forwardsByWallclock() {
        inputTopic.pipeInput(key, cmd);
        inputTopic.pipeInput(key, secondCmd);
        inputTopic.pipeInput(key, secondCmd);
        assertThat(outputTopic.getQueueSize()).isEqualTo(2l);

        inputTopic.pipeInput(key, cmd);
        inputTopic.pipeInput(key, secondCmd);
        assertThat(outputTopic.getQueueSize()).isEqualTo(1l);
    }
}

I get the following Stacktrace:

K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699369}] K-after-queue:[{"sender": "Bugzilla"}], V-after-queue:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699369}] K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099698980}] K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699480}] K:[{"sender": "Bugzilla"}], V:[{"sender": "Bugzilla", "receiver": "Junit", "message": "Das ist eine Nachricht mit einem Umlauten äöü!", "timestamp": 1626099699480}] java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap') at org.an.company.kafka_streams.serialization.KayVeeDeserializer.deserialize(KayVeeDeserializer.java:40) at org.an.company.kafka_streams.serialization.KayVeeDeserializer.deserialize(KayVeeDeserializer.java:10) at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:207) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.access$100(MeteredKeyValueStore.java:47) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:245) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:222) at org.an.company.kafka_streams.processors.Throttler.releaseRecordsIfDue(Throttler.java:109) at org.an.company.kafka_streams.processors.Throttler.punctuate(Throttler.java:80) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) at org.apache.kafka.streams.TopologyTestDriver.advanceWallClockTime(TopologyTestDriver.java:653) at org.an.company.kafka_streams.serialization.SerdeTests.cleanup(SerdeTests.java:141) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptAfterEachMethod(TimeoutExtension.java:108) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:490) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeAfterEachMethodAdapter$20(ClassBasedTestDescriptor.java:480) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAfterEachMethods$9(TestMethodTestDescriptor.java:236) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAllAfterMethodsOrCallbacks$12(TestMethodTestDescriptor.java:269) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAllAfterMethodsOrCallbacks$13(TestMethodTestDescriptor.java:269) at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAllAfterMethodsOrCallbacks(TestMethodTestDescriptor.java:268) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAfterEachMethods(TestMethodTestDescriptor.java:234) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)

I really dont know why it is complaining about the class cast exception:

java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap') at org.an.company.kafka_streams.serialization.KayVeeDeserializer.deserialize(KayVeeDeserializer.java:40)

The data should be byte[].

Thanks in advance for your help!

EDIT:

As per suggestion my question was answered by: Jackson serializing and deserializing Map<String, byte[]>

The solution was to supply a TypReference to the objectMapper because after serialization information about the complex type KayVee<byte[], byte[]> was lost leading to the isssue.

TypeReference<KayVee<byte[], byte[]>> tRef = new TypeReference<KayVee<byte[],byte[]>>() {};
KayVee<byte[],byte[]> temp = mapper.readValue(data, tRef);
PSchoe
  • 56
  • 9

0 Answers0