Class KafkaChannelDefinitionProcessor
java.lang.Object
org.flowable.eventregistry.spring.kafka.KafkaChannelDefinitionProcessor
- All Implemented Interfaces:
EventListener,ChannelModelProcessor,Aware,BeanFactoryAware,ApplicationContextAware,ApplicationListener<ContextRefreshedEvent>
public class KafkaChannelDefinitionProcessor
extends Object
implements BeanFactoryAware, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, ChannelModelProcessor
A
ChannelModelProcessor which is responsible for configuring Kafka Event registry integration.
This class is not meant to be extended.- Author:
- Filip Hrisafov
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classprotected static classprotected static class -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected ApplicationContextprotected BeanFactorystatic final Stringprotected KafkaListenerContainerFactory<?>protected Stringprotected booleanprotected static final intprotected StringValueResolverprotected KafkaListenerEndpointRegistryprotected BeanExpressionContextprotected KafkaAdminOperationsprotected KafkaConsumerBackoffManagerprotected KafkaOperations<Object,Object> protected final org.slf4j.Loggerprotected com.fasterxml.jackson.databind.ObjectMapperprotected BeanExpressionResolverprotected Map<String,Collection<String>> -
Constructor Summary
ConstructorsConstructorDescriptionKafkaChannelDefinitionProcessor(com.fasterxml.jackson.databind.ObjectMapper objectMapper) -
Method Summary
Modifier and TypeMethodDescriptionbooleancanProcess(ChannelModel channelModel) booleancanProcessIfChannelModelAlreadyRegistered(ChannelModel channelModel) createEndpointConfigurations(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry, KafkaListenerEndpoint mainEndpoint, KafkaListenerContainerFactory<?> containerFactory) protected KafkaListenerEndpointcreateKafkaListenerEndpoint(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) protected ListenerContainerFactoryConfigurercreateListenerContainerFactoryConfigurer(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration, BackOff backOff, DefaultDestinationTopicResolver topicResolver) protected GenericMessageListener<org.apache.kafka.clients.consumer.ConsumerRecord<Object,Object>> createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) protected voidcreateNewTopics(Collection<String> topics, int numPartitions, short replicationFactor) protected org.springframework.retry.backoff.SleepingBackOffPolicy<?>protected RetryTopicConfigurationcreateRetryTopicConfiguration(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) protected KafkaListenerContainerFactory<?>decorateFactory(DestinationTopic.Properties destinationTopicProperties, ListenerContainerFactoryConfigurer factoryConfigurer, RetryTopicConfiguration retryTopicConfiguration) protected StringgetEndpointGroupId(KafkaInboundChannelModel channelDefinition, String id) protected StringgetEndpointId(ChannelModel channelModel, String tenantId) protected KafkaConsumerBackoffManagerprotected TaskSchedulerprotected Consumer<Collection<String>>getTopicCreationFunction(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) protected static Collection<TopicPartitionOffset>getTopicPartitions(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset[] topicPartitionOffsets) protected static TopicPartitionOffsetgetTPOForMainTopic(Suffixer suffixer, TopicPartitionOffset tpo) protected static TopicPartitionOffsetgetTPOForRetryTopics(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset tpo) voidparsePartitions(String partsString) Parse a list of partitions into aList.protected voidprocessAndRegisterEndpoints(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) protected voidprocessOutboundDefinition(KafkaOutboundChannelModel channelModel) voidregisterChannelModel(ChannelModel channelModel, String tenantId, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean fallbackToDefaultTenant) protected voidregisterEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) Register a newKafkaListenerEndpointalongside theKafkaListenerContainerFactoryto use to create the underlying container.protected Stringprotected KafkaListenerContainerFactory<?>resolveContainerFactory(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) protected ObjectresolveExpression(String value) protected <T> TresolveExpression(String expression, Class<T> type) protected BooleanresolveExpressionAsBoolean(String value, String attribute) protected BooleanresolveExpressionAsBoolean(String value, String attribute, Boolean defaultValue) protected DoubleresolveExpressionAsDouble(String value, String attribute) protected IntegerresolveExpressionAsInteger(String value, String attribute) protected IntegerresolveExpressionAsInteger(String value, String attribute, Integer defaultValue) protected LongresolveExpressionAsLong(String value, String attribute) protected StringresolveExpressionAsString(String value, String attribute) protected KafkaMessageKeyProvider<?>resolveKafkaMessageKeyProvider(KafkaOutboundChannelModel channelModel) protected KafkaPartitionProviderresolveKafkaPartitionProvider(KafkaOutboundChannelModel channelModel) protected voidresolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result) protected PatternresolvePattern(KafkaInboundChannelModel channelModel) protected PropertiesresolveProperties(List<KafkaInboundChannelModel.CustomProperty> consumerProperties) resolveRetryConfiguration(KafkaInboundChannelModel channelModel) protected Collection<TopicPartitionOffset>resolveTopicPartitions(KafkaInboundChannelModel channelModel) protected voidresolveTopics(Object resolvedValue, List<String> result, KafkaInboundChannelModel channelDefinition) protected Collection<String>resolveTopics(KafkaInboundChannelModel channelDefinition) voidsetApplicationContext(ApplicationContext applicationContext) voidsetBeanFactory(BeanFactory beanFactory) voidsetContainerFactory(KafkaListenerContainerFactory<?> containerFactory) voidsetContainerFactoryBeanName(String containerFactoryBeanName) voidsetEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) voidsetKafkaAdminOperations(KafkaAdminOperations kafkaAdminOperations) voidsetKafkaConsumerBackoffManager(KafkaConsumerBackoffManager kafkaConsumerBackoffManager) voidsetKafkaOperations(KafkaOperations<Object, Object> kafkaOperations) voidunregisterChannelModel(ChannelModel channelModel, String tenantId, EventRepositoryService eventRepositoryService) protected voidunregisterEndpoint(String endpointId, ChannelModel channelModel, String tenantId) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecution
-
Field Details
-
CHANNEL_ID_PREFIX
- See Also:
-
DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT
protected static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT- See Also:
-
logger
protected final org.slf4j.Logger logger -
kafkaOperations
-
kafkaAdminOperations
-
endpointRegistry
-
containerFactoryBeanName
-
containerFactory
-
kafkaConsumerBackoffManager
-
beanFactory
-
applicationContext
-
contextRefreshed
protected boolean contextRefreshed -
objectMapper
protected com.fasterxml.jackson.databind.ObjectMapper objectMapper -
resolver
-
embeddedValueResolver
-
expressionContext
-
retryEndpointsByMainEndpointId
-
-
Constructor Details
-
KafkaChannelDefinitionProcessor
public KafkaChannelDefinitionProcessor(com.fasterxml.jackson.databind.ObjectMapper objectMapper)
-
-
Method Details
-
canProcess
- Specified by:
canProcessin interfaceChannelModelProcessor
-
canProcessIfChannelModelAlreadyRegistered
- Specified by:
canProcessIfChannelModelAlreadyRegisteredin interfaceChannelModelProcessor
-
registerChannelModel
public void registerChannelModel(ChannelModel channelModel, String tenantId, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean fallbackToDefaultTenant) - Specified by:
registerChannelModelin interfaceChannelModelProcessor
-
createKafkaListenerEndpoint
protected KafkaListenerEndpoint createKafkaListenerEndpoint(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) -
processAndRegisterEndpoints
protected void processAndRegisterEndpoints(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) -
createEndpointConfigurations
protected Collection<KafkaChannelDefinitionProcessor.Configuration> createEndpointConfigurations(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry, KafkaListenerEndpoint mainEndpoint, KafkaListenerContainerFactory<?> containerFactory) -
getTopicPartitions
protected static Collection<TopicPartitionOffset> getTopicPartitions(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset[] topicPartitionOffsets) -
getTPOForRetryTopics
protected static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset tpo) -
getTPOForMainTopic
protected static TopicPartitionOffset getTPOForMainTopic(Suffixer suffixer, TopicPartitionOffset tpo) -
getTopicCreationFunction
protected Consumer<Collection<String>> getTopicCreationFunction(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) -
createNewTopics
protected void createNewTopics(Collection<String> topics, int numPartitions, short replicationFactor) -
createListenerContainerFactoryConfigurer
protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfigurer(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration, BackOff backOff, DefaultDestinationTopicResolver topicResolver) -
getOrCreateKafkaConsumerBackoffManager
-
getOrCreateRetryTopicTaskScheduler
-
decorateFactory
protected KafkaListenerContainerFactory<?> decorateFactory(DestinationTopic.Properties destinationTopicProperties, ListenerContainerFactoryConfigurer factoryConfigurer, RetryTopicConfiguration retryTopicConfiguration) -
processOutboundDefinition
-
resolveExpressionAsInteger
-
resolveExpressionAsInteger
-
resolveExpressionAsLong
-
resolveExpressionAsDouble
-
resolveExpressionAsBoolean
-
resolveExpressionAsBoolean
-
resolveExpressionAsString
-
resolveTopics
-
resolveTopics
protected void resolveTopics(Object resolvedValue, List<String> result, KafkaInboundChannelModel channelDefinition) -
resolvePattern
-
resolveTopicPartitions
protected Collection<TopicPartitionOffset> resolveTopicPartitions(KafkaInboundChannelModel channelModel) -
resolvePartitionAsInteger
protected void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result) -
parsePartitions
Parse a list of partitions into aList. Example: "0-5,10-15". This parsing is the same as it is done in the SpringKafkaListenerAnnotationBeanPostProcessor.- Parameters:
partsString- the comma-delimited list of partitions/ranges.- Returns:
- the stream of partition numbers, sorted and de-duplicated.
-
resolveKafkaPartitionProvider
protected KafkaPartitionProvider resolveKafkaPartitionProvider(KafkaOutboundChannelModel channelModel) -
resolveKafkaMessageKeyProvider
protected KafkaMessageKeyProvider<?> resolveKafkaMessageKeyProvider(KafkaOutboundChannelModel channelModel) -
resolveExpression
-
resolveExpression
-
createMessageListener
protected GenericMessageListener<org.apache.kafka.clients.consumer.ConsumerRecord<Object,Object>> createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) -
unregisterChannelModel
public void unregisterChannelModel(ChannelModel channelModel, String tenantId, EventRepositoryService eventRepositoryService) - Specified by:
unregisterChannelModelin interfaceChannelModelProcessor
-
unregisterEndpoint
-
registerEndpoint
protected void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) Register a newKafkaListenerEndpointalongside theKafkaListenerContainerFactoryto use to create the underlying container.The
factorymay benullif the default factory has to be used for that endpoint. -
resolveContainerFactory
protected KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) -
getEndpointId
-
getEndpointGroupId
-
resolve
-
resolveProperties
protected Properties resolveProperties(List<KafkaInboundChannelModel.CustomProperty> consumerProperties) -
setBeanFactory
- Specified by:
setBeanFactoryin interfaceBeanFactoryAware- Throws:
BeansException
-
setApplicationContext
- Specified by:
setApplicationContextin interfaceApplicationContextAware- Throws:
BeansException
-
onApplicationEvent
- Specified by:
onApplicationEventin interfaceApplicationListener<ContextRefreshedEvent>
-
getKafkaOperations
-
setKafkaOperations
-
getKafkaAdminOperations
-
setKafkaAdminOperations
-
getEndpointRegistry
-
setEndpointRegistry
-
getContainerFactoryBeanName
-
setContainerFactoryBeanName
-
getContainerFactory
-
setContainerFactory
-
getKafkaConsumerBackoffManager
-
setKafkaConsumerBackoffManager
-
createRetryTopicConfiguration
protected RetryTopicConfiguration createRetryTopicConfiguration(KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration retryConfiguration) -
resolveRetryConfiguration
protected KafkaChannelDefinitionProcessor.ResolvedRetryConfiguration resolveRetryConfiguration(KafkaInboundChannelModel channelModel) -
createNonBlockingBackOffPolicy
protected org.springframework.retry.backoff.SleepingBackOffPolicy<?> createNonBlockingBackOffPolicy(KafkaInboundChannelModel.NonBlockingRetryBackOff backOff)
-