Class KafkaChannelDefinitionProcessor
java.lang.Object
org.flowable.eventregistry.spring.kafka.KafkaChannelDefinitionProcessor
- All Implemented Interfaces:
EventListener
,ChannelModelProcessor
,Aware
,BeanFactoryAware
,ApplicationContextAware
,ApplicationListener<ContextRefreshedEvent>
public class KafkaChannelDefinitionProcessor
extends Object
implements BeanFactoryAware, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, ChannelModelProcessor
A
ChannelModelProcessor
which is responsible for configuring Kafka Event registry integration.
This class is not meant to be extended.- Author:
- Filip Hrisafov
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static class
protected static class
protected static class
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected ApplicationContext
protected BeanFactory
static final String
protected KafkaListenerContainerFactory<?>
protected String
protected boolean
protected static final int
protected StringValueResolver
protected KafkaListenerEndpointRegistry
protected BeanExpressionContext
protected KafkaAdminOperations
protected KafkaConsumerBackoffManager
protected KafkaOperations<Object,
Object> protected final org.slf4j.Logger
protected com.fasterxml.jackson.databind.ObjectMapper
protected BeanExpressionResolver
protected Map<String,
Collection<String>> -
Constructor Summary
ConstructorsConstructorDescriptionKafkaChannelDefinitionProcessor
(com.fasterxml.jackson.databind.ObjectMapper objectMapper) -
Method Summary
Modifier and TypeMethodDescriptionboolean
canProcess
(ChannelModel channelModel) boolean
canProcessIfChannelModelAlreadyRegistered
(ChannelModel channelModel) createEndpointConfigurations
(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry, KafkaListenerEndpoint mainEndpoint, KafkaListenerContainerFactory<?> containerFactory) protected KafkaListenerEndpoint
createKafkaListenerEndpoint
(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) protected ListenerContainerFactoryConfigurer
createListenerContainerFactoryConfigurer
(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration, BackOff backOff, DefaultDestinationTopicResolver topicResolver) protected GenericMessageListener<org.apache.kafka.clients.consumer.ConsumerRecord<Object,
Object>> createMessageListener
(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) protected void
createNewTopics
(Collection<String> topics, int numPartitions, short replicationFactor) protected org.springframework.retry.backoff.SleepingBackOffPolicy<?>
protected RetryTopicConfiguration
createRetryTopicConfiguration
(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) protected KafkaListenerContainerFactory<?>
decorateFactory
(DestinationTopic.Properties destinationTopicProperties, ListenerContainerFactoryConfigurer factoryConfigurer, RetryTopicConfiguration retryTopicConfiguration) protected String
getEndpointGroupId
(KafkaInboundChannelModel channelDefinition, String id) protected String
getEndpointId
(ChannelModel channelModel, String tenantId) protected KafkaConsumerBackoffManager
protected TaskScheduler
protected Consumer<Collection<String>>
getTopicCreationFunction
(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) protected static Collection<TopicPartitionOffset>
getTopicPartitions
(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset[] topicPartitionOffsets) protected static TopicPartitionOffset
getTPOForMainTopic
(Suffixer suffixer, TopicPartitionOffset tpo) protected static TopicPartitionOffset
getTPOForRetryTopics
(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset tpo) void
parsePartitions
(String partsString) Parse a list of partitions into aList
.protected void
processAndRegisterEndpoints
(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) protected void
processOutboundDefinition
(KafkaOutboundChannelModel channelModel) void
registerChannelModel
(ChannelModel channelModel, String tenantId, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean fallbackToDefaultTenant) protected void
registerEndpoint
(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) Register a newKafkaListenerEndpoint
alongside theKafkaListenerContainerFactory
to use to create the underlying container.protected String
protected KafkaListenerContainerFactory<?>
resolveContainerFactory
(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) protected Object
resolveExpression
(String value) protected <T> T
resolveExpression
(String expression, Class<T> type) protected Boolean
resolveExpressionAsBoolean
(String value, String attribute) protected Boolean
resolveExpressionAsBoolean
(String value, String attribute, Boolean defaultValue) protected Double
resolveExpressionAsDouble
(String value, String attribute) protected Integer
resolveExpressionAsInteger
(String value, String attribute) protected Integer
resolveExpressionAsInteger
(String value, String attribute, Integer defaultValue) protected Long
resolveExpressionAsLong
(String value, String attribute) protected String
resolveExpressionAsString
(String value, String attribute) protected KafkaMessageKeyProvider<?>
resolveKafkaMessageKeyProvider
(KafkaOutboundChannelModel channelModel) protected KafkaPartitionProvider
resolveKafkaPartitionProvider
(KafkaOutboundChannelModel channelModel) protected void
resolvePartitionAsInteger
(String topic, Object resolvedValue, List<TopicPartitionOffset> result) protected Pattern
resolvePattern
(KafkaInboundChannelModel channelModel) protected Properties
resolveProperties
(List<KafkaInboundChannelModel.CustomProperty> consumerProperties) resolveRetryConfiguration
(KafkaInboundChannelModel channelModel) protected Collection<TopicPartitionOffset>
resolveTopicPartitions
(KafkaInboundChannelModel channelModel) protected void
resolveTopics
(Object resolvedValue, List<String> result, KafkaInboundChannelModel channelDefinition) protected Collection<String>
resolveTopics
(KafkaInboundChannelModel channelDefinition) void
setApplicationContext
(ApplicationContext applicationContext) void
setBeanFactory
(BeanFactory beanFactory) void
setContainerFactory
(KafkaListenerContainerFactory<?> containerFactory) void
setContainerFactoryBeanName
(String containerFactoryBeanName) void
setEndpointRegistry
(KafkaListenerEndpointRegistry endpointRegistry) void
setKafkaAdminOperations
(KafkaAdminOperations kafkaAdminOperations) void
setKafkaConsumerBackoffManager
(KafkaConsumerBackoffManager kafkaConsumerBackoffManager) void
setKafkaOperations
(KafkaOperations<Object, Object> kafkaOperations) void
unregisterChannelModel
(ChannelModel channelModel, String tenantId, EventRepositoryService eventRepositoryService) protected void
unregisterEndpoint
(String endpointId, ChannelModel channelModel, String tenantId) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecution
-
Field Details
-
CHANNEL_ID_PREFIX
- See Also:
-
DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT
protected static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT- See Also:
-
logger
protected final org.slf4j.Logger logger -
kafkaOperations
-
kafkaAdminOperations
-
endpointRegistry
-
containerFactoryBeanName
-
containerFactory
-
kafkaConsumerBackoffManager
-
beanFactory
-
applicationContext
-
contextRefreshed
protected boolean contextRefreshed -
objectMapper
protected com.fasterxml.jackson.databind.ObjectMapper objectMapper -
resolver
-
embeddedValueResolver
-
expressionContext
-
retryEndpointsByMainEndpointId
-
-
Constructor Details
-
KafkaChannelDefinitionProcessor
public KafkaChannelDefinitionProcessor(com.fasterxml.jackson.databind.ObjectMapper objectMapper)
-
-
Method Details
-
canProcess
- Specified by:
canProcess
in interfaceChannelModelProcessor
-
canProcessIfChannelModelAlreadyRegistered
- Specified by:
canProcessIfChannelModelAlreadyRegistered
in interfaceChannelModelProcessor
-
registerChannelModel
public void registerChannelModel(ChannelModel channelModel, String tenantId, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean fallbackToDefaultTenant) - Specified by:
registerChannelModel
in interfaceChannelModelProcessor
-
createKafkaListenerEndpoint
protected KafkaListenerEndpoint createKafkaListenerEndpoint(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) -
processAndRegisterEndpoints
protected void processAndRegisterEndpoints(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) -
createEndpointConfigurations
protected Collection<KafkaChannelDefinitionProcessor.Configuration> createEndpointConfigurations(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry, KafkaListenerEndpoint mainEndpoint, KafkaListenerContainerFactory<?> containerFactory) -
getTopicPartitions
protected static Collection<TopicPartitionOffset> getTopicPartitions(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset[] topicPartitionOffsets) -
getTPOForRetryTopics
protected static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset tpo) -
getTPOForMainTopic
protected static TopicPartitionOffset getTPOForMainTopic(Suffixer suffixer, TopicPartitionOffset tpo) -
getTopicCreationFunction
protected Consumer<Collection<String>> getTopicCreationFunction(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) -
createNewTopics
protected void createNewTopics(Collection<String> topics, int numPartitions, short replicationFactor) -
createListenerContainerFactoryConfigurer
protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfigurer(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration, BackOff backOff, DefaultDestinationTopicResolver topicResolver) -
getOrCreateKafkaConsumerBackoffManager
-
getOrCreateRetryTopicTaskScheduler
-
decorateFactory
protected KafkaListenerContainerFactory<?> decorateFactory(DestinationTopic.Properties destinationTopicProperties, ListenerContainerFactoryConfigurer factoryConfigurer, RetryTopicConfiguration retryTopicConfiguration) -
processOutboundDefinition
-
resolveExpressionAsInteger
-
resolveExpressionAsInteger
-
resolveExpressionAsLong
-
resolveExpressionAsDouble
-
resolveExpressionAsBoolean
-
resolveExpressionAsBoolean
-
resolveExpressionAsString
-
resolveTopics
-
resolveTopics
protected void resolveTopics(Object resolvedValue, List<String> result, KafkaInboundChannelModel channelDefinition) -
resolvePattern
-
resolveTopicPartitions
protected Collection<TopicPartitionOffset> resolveTopicPartitions(KafkaInboundChannelModel channelModel) -
resolvePartitionAsInteger
protected void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result) -
parsePartitions
Parse a list of partitions into aList
. Example: "0-5,10-15". This parsing is the same as it is done in the SpringKafkaListenerAnnotationBeanPostProcessor
.- Parameters:
partsString
- the comma-delimited list of partitions/ranges.- Returns:
- the stream of partition numbers, sorted and de-duplicated.
-
resolveKafkaPartitionProvider
protected KafkaPartitionProvider resolveKafkaPartitionProvider(KafkaOutboundChannelModel channelModel) -
resolveKafkaMessageKeyProvider
protected KafkaMessageKeyProvider<?> resolveKafkaMessageKeyProvider(KafkaOutboundChannelModel channelModel) -
resolveExpression
-
resolveExpression
-
createMessageListener
protected GenericMessageListener<org.apache.kafka.clients.consumer.ConsumerRecord<Object,Object>> createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) -
unregisterChannelModel
public void unregisterChannelModel(ChannelModel channelModel, String tenantId, EventRepositoryService eventRepositoryService) - Specified by:
unregisterChannelModel
in interfaceChannelModelProcessor
-
unregisterEndpoint
-
registerEndpoint
protected void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) Register a newKafkaListenerEndpoint
alongside theKafkaListenerContainerFactory
to use to create the underlying container.The
factory
may benull
if the default factory has to be used for that endpoint. -
resolveContainerFactory
protected KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) -
getEndpointId
-
getEndpointGroupId
-
resolve
-
resolveProperties
protected Properties resolveProperties(List<KafkaInboundChannelModel.CustomProperty> consumerProperties) -
setBeanFactory
- Specified by:
setBeanFactory
in interfaceBeanFactoryAware
- Throws:
BeansException
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-
onApplicationEvent
- Specified by:
onApplicationEvent
in interfaceApplicationListener<ContextRefreshedEvent>
-
getKafkaOperations
-
setKafkaOperations
-
getKafkaAdminOperations
-
setKafkaAdminOperations
-
getEndpointRegistry
-
setEndpointRegistry
-
getContainerFactoryBeanName
-
setContainerFactoryBeanName
-
getContainerFactory
-
setContainerFactory
-
getKafkaConsumerBackoffManager
-
setKafkaConsumerBackoffManager
-
createRetryTopicConfiguration
protected RetryTopicConfiguration createRetryTopicConfiguration(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) -
resolveRetryConfiguration
protected KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration resolveRetryConfiguration(KafkaInboundChannelModel channelModel) -
createNonBlockingBackOffPolicy
protected org.springframework.retry.backoff.SleepingBackOffPolicy<?> createNonBlockingBackOffPolicy(KafkaInboundChannelModel.NonBlockingRetryBackOff backOff)
-