使用Spring Boot搭配Kafka实现RPC调用

发布时间:2021-04-05 11:30
最后更新:2021-04-05 11:30
所属分类:
JVM Spring

Kafka是目前十分流行的分布式消息队列,但是如何利用Kafka搭配Spring for Apache Kafka实现一个基于消息队列的RPC基础功能呢?

服务架构

Spring for Apache Kafka 中提供了以下几个概念来构建 Kafka 中的生产者和消费者。

  • ProducerFactory<K, V>,用于构建一个生产者实例的工厂类。
  • KafkaTemplate<K, V>,执行发送消息的功能类。
  • ReplyingKafkaTemplate<K, V, R>,具备发送消息和回收消息的功能类。
  • ConsumerFactory<K, V>,用于构建一个消费者的工厂类。
  • KafkaMessageListenerContainer<K, V>,用于持有消费者的容器类。
  • KafkaListenerContainerFactory<K, V>,用于构建只有消费者的容器的工厂类。
  • NewTopic,程序运行时自动构建的 Topic,如果 Topic 已经存在则跳过构建。
  • Message<?>,用于承载对象的消息。

生产方架构

生产方的架构十分简单,只需要在生产方的类构造函数中注入 KafkaTemplate<K, V> Bean 即可。当不使用事务时,ProducerFactory 的默认实现 DefaultKafkaProducerFactory 会创建一个单例的生产者。

要创建一个 ProducerFactory 需要一个类型为 Map<String, Object> 的配置集,以及一个键序列化器和一个值序列化器。配置集中的各个配置项名称在 ProducerConfig 类中定义。KafkaTemplate<K, V> 实例中可以注入一个 RecordMessageConverter 实例,用来对复杂的对象进行承载传输。

@startuml
skinparam {
    componentStyle uml2
    monochrome false
    shadowing false
    backgroundColor transparent
    classBackgroundColor transparent
}
hide fields

class KafkaProperties {
    + buildProducerProperties()
}
class KafkaTemplate {
    + setMessageConverter(RecordMessageConverter)
    + send(String, K, V)
    + send(String, V)
    + send(ProducerRecord)
    + send(Message)
}
class KeySerializer {
    + serialize(String, Header, T)
    + serialize(String, T
}
class ValueSerializer {
    + serialize(String, Header, T)
    + serialize(String, T)
}
interface ProducerFactory {
    + createProducer()
}
interface MessageConverter {
    + commonHeaders()
}
interface RecordMessageConverter {
    + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
    + fromMessage(Message, String)
}

ProducerFactory --* KafkaTemplate
KeySerializer --* ProducerFactory
ValueSerializer --* ProducerFactory
KafkaProperties --* ProducerFactory
MessageConverter <|-- RecordMessageConverter
RecordMessageConverter --* KafkaTemplate
@enduml

消费方架构

消费方的架构要略复杂,由于消费方需要对 Kafka 传递来的消息进行监听,所以需要将监听器(Listener)置入容器中,由容器负载并进行处理。常用的监听器接口主要有 MessageListener<K, V>AcknowledgingMessageListener<K, V> 等,或者使用 @KafkaListener 注解标记处理方法或者处理类。容器根据功能需要,常用的则有两种 KafkaMessageListenerContainerConcurrentMessageListenerContainer,分别用于单线程监听和多线程监听。

与生产方相同,消费方也需要使用工厂类来创建消费方实例。消费方工厂类一般都实现了接口 ConsumerFactory<K ,V>,常用的是 DefaultKafkaConsumerFactory<K ,V>。监听容器的构建需要同时提供消费方工厂类实例和容器配置集。

@startuml
skinparam {
    componentStyle uml2
    monochrome false
    shadowing false
    backgroundColor transparent
    classBackgroundColor transparent
}
hide fields

class ContainerProperties {
    + ContainerProperties(String...)
    + setMessageListener(MessageListener)
}
interface MessageListener {
    + onMessage(ConsumerRecord)
}
interface ConsumerFactory {
    + createConsumer()
}
class KafkaProperties {
    + buildConsumerProperties()
}
class KeyDeserializer {
    + deserialize(String, Header, T)
    + deserialize(String, T
}
class ValueDeserializer {
    + deserialize(String, Header, T)
    + deserialize(String, byte[])
}
class KafkaMessageListenerContainer {
    # doStart()
}
interface KafkaListenerContainerFactory {
    + createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
    + setConsumerFactory(ConsumerFactory)
    + setMessageConverter(MessageConverter)
}
interface MessageConverter {
    + commonHeaders()
}
interface RecordMessageConverter {
    + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
    + fromMessage(Message, String)
}

MessageListener --* ContainerProperties
KafkaProperties --* ConsumerFactory
KeyDeserializer --* ConsumerFactory
ValueDeserializer --* ConsumerFactory
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
ConsumerFactory --* AbstractKafkaListenerContainerFactory
ContainerProperties --* AbstractKafkaListenerContainerFactory
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
MessageConverter <|-- RecordMessageConverter
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
@enduml

RPC 架构

在使用 Kafka 执行 RPC 调用时,被调用的消费方的建立与其他用途中没有太多区别,只是需要在 AbstractKafkaListenerContainerFactory<C, K, V> 中加入一个用于发送消息的 KafkaTemplate<K, V> 实例即可,并在使用 @KafkaListener 注解的监听器上增加 @SendTo 注解,并使监听器返回要发回的对象即可。但是生产方的配置就相应的要复杂许多了,除了要配置专用的 ReplyingKafkaTemplate<K, V, R> 以外,还需要配置针对返回消息的消费方设置。

总起来说,在使用 RPC 调用时,无论调用方还是被调用方,都是一个集成了生产方和消费方的全功能 Kafka 客户端。

调用方架构

@startuml
skinparam {
    componentStyle uml2
    monochrome false
    shadowing false
    backgroundColor transparent
    classBackgroundColor transparent
}
hide fields

interface ProducerFactory {
    + createProducer()
}
interface RecordMessageConverter {
    + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
    + fromMessage(Message, String)
}
class ReplyingKafkaTemplate {
    + sendAndReceive(ProducerRecord)
    + sendAndReceive(ProducerRecord, Duration)
}
class KafkaMessageListenerContainer {
    # doStart()
}
interface KafkaListenerContainerFactory {
    + createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
    + setConsumerFactory(ConsumerFactory)
    + setMessageConverter(MessageConverter)
}
interface ConsumerFactory {
    + createConsumer()
}
interface MessageListener {
    + onMessage(ConsumerRecord)
}
class ContainerProperties {
    + ContainerProperties(String...)
    + setMessageListener(MessageListener)
}

MessageListener --* ContainerProperties
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
ProducerFactory --* ReplyingKafkaTemplate
ContainerProperties --* AbstractKafkaListenerContainerFactory
KafkaListenerContainerFactory <|.. AbstractKafkaListenerContainerFactory
KafkaMessageListenerContainer - AbstractKafkaListenerContainerFactory : 生成 <
ConsumerFactory --* AbstractKafkaListenerContainerFactory
KafkaMessageListenerContainer --* ReplyingKafkaTemplate
@enduml

被调用方架构

@startuml
skinparam {
    componentStyle uml2
    monochrome false
    shadowing false
    backgroundColor transparent
    classBackgroundColor transparent
}
hide fields

class ContainerProperties {
    + ContainerProperties(String...)
    + setMessageListener(MessageListener)
}
interface MessageListener {
    + onMessage(ConsumerRecord)
}
interface ConsumerFactory {
    + createConsumer()
}
class KafkaMessageListenerContainer {
    # doStart()
}
interface KafkaListenerContainerFactory {
    + createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
    + setConsumerFactory(ConsumerFactory)
    + setMessageConverter(MessageConverter)
    + setReplyTemplate(KafkaTemplate)
}
interface RecordMessageConverter {
    + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
    + fromMessage(Message, String)
}
class KafkaTemplate {
    + setMessageConverter(RecordMessageConverter)
    + send(String, K, V)
    + send(String, V)
    + send(ProducerRecord)
    + send(Message)
}

MessageListener --* ContainerProperties
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
ConsumerFactory --* AbstractKafkaListenerContainerFactory
ContainerProperties --* AbstractKafkaListenerContainerFactory
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
RecordMessageConverter --* KafkaTemplate
KafkaTemplate --* AbstractKafkaListenerContainerFactory
@enduml

配置项内容的获取

Spring Boot 所接管的配置项内容可以通过依赖注入获取,而不必像说明手册中描述的一样需要在程序中手动置入。要获取 Kafka 的配置,只需要声明并注入一个 KafkaProperties 类型的属性即可。

单向发送字符串

单向发送功能需要在发送方创建 KafkaTemplate<K, V> 的实例,需要注意的是, Spring Boot 已经内置提供了 KafkaTemplate<String, String> 的 Bean,对于字符串信息可以直接发送。

以下是发送方的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Component
public class MessageProducer {
    private final KafkaTemplate<String, String> template;

    @Autowired
    public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.template = kafkaTemplate;
    }

    public void sendMessage(String message) {
        this.template.send("some-topic", message);
    }
}

以下是消费方的示例。

1
2
3
4
5
6
7
8
@Component
@Slf4j
public class MessageConsumer {
    @KafkaListener(id = "client_grp", topic = "some-topic")
    public void consumeMessage(String message) {
        log.info(message);
    }
}

双向发送字符串

与单向发送字符串功能相似,针对字符串的发送和接收,Spring for Kafka 已经提供了许多已经配置好的现成 Bean 可供使用,但是需要注意的是,RPC 调用方的 ReplyingKafkaTemplate<K, V, R> 是需要手工配置的。

以下是调用方的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 应用主类
@SpringBootApplication
public class RpcRequestApplication {
    private final KafkaProperties kProperties;

    @Autowired
    public RpcRequestApplication(
        KafkaProperties properties
    ) {
        this.kProperties = properties;
    }

    public static void main(String[] args) {
        SpringApplication.run(RpcRequestApplication.class, args).close();
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyTemplate(
        ProducerFactory<String, String> factory,
        ConcurrentMessageListenerContainer<String, String> repliesContainer
    ) {
        return new ReplyingKafkaTemplate<>(factory, repliesContainer);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = this.kProperties.buildProducerProperties();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(
        KafkaTemplate<String, String> kafkaTemplate
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(kafkaTemplate);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
            this.kProperties.buildConsumerProperties(),
            StringDeserializer::new,
            StringDeserializer::new
        );
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> messageContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> factory
    ) {
        ConcurrentMessageListenerContainer<String, String> container =
            factory.createContainer("RPC-Response");
        container.getContainerProperties().setGroupId("replies");
        container.setAutoStartup(false);
        return container;
    }

    @Bean
    public NewTopic rpcRequestTopic() {
        return TopicBuilder.name("RPC-Request")
            .partitions(1)
            .replicas(3)
            .build();
    }

    @Bean
    public NewTopic rpcReplyTopic() {
        return TopicBuilder.name("RPC-Response")
            .partitions(1)
            .replicas(3)
            .build();
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 功能类
@Component
@Slf4j
public class RpcRequester implements CommandLineRunner {
    private final ReplyingKafkaTemplate<String, String> template;

    @Autowired
    public RpcRequester(
        ReplyingKafkaTemplate<String, String> template
    ) {
        this.template = template;
    }

    @Override
    public void run(String... args) throws Exception {
        try {
            RequestReplyFuture<String, String, String> reply = this.template.sendAndReceive(
                new ProducerRecord<>("RPC-Request", "greeting")
            );
            String result = reply.get().value();
            log.info("Hello from " + result);
        } catch (InterruptedException | ExecutionException e) {
            log.error(e.getMessage());
        }
    }
}

以下是响应方的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Component
@Slf4j
public class RpcReplier {
    @KafkaListener(id="rpc-server", topic="RPC-Request")
    @SendTo
    public String replyGreeting(String message) {
        log.info("Requester send: " + message);
        return "Replier";
    }
}

在响应方中,与单向发送唯一的不同是添加了 @SendTo 注解并在监听器上增加了返回值类型。

单向发送自定义对象

单向发送自定义对象需要自行配置完整的业务链条,其中生产方需要配置 ProducerFactory<K, V>KafkaTemplate<K, V>,而消费方则需要配置 ConsumerFactory<K, V>,以及监听器容器工厂和容器。

以下给出生产方的示例代码。

1
2
3
4
5
6
7
# 生产方配置文件
spring:
    kafka:
        bootstrap-servers: 192.168.1.1:9092
        producer:
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 自定义载荷类
@Data
@Builder
public class Cargo {
    @NonNull private final String action;
    private Object payload;

    public Cargo(
        @JsonProperty("action") String action,
        @JsonProperty("payload") @Nullable Object payload
    ) {
        this.action = action;
        this.payload = payload;
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 主类文件
@SpringBootApplication
public class SenderApplication {

    private final KafkaProperties kProperties;

    @Autowired
    public SenderApplication(KafkaProperties props) {
        this.kProperties = props;
    }

    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class, args);
    }

    @Bean
    public ProducerFactory<String, Cargo> producerFactory() {
        Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
        // 以下两条语句与上面配置文件中的 producer 的配置功能相同
        // 择一使用即可,一般不建议在此进行硬编码
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(producerProps);
    }

    @Bean
    public KafkaTemplate<String, Cargo> sendTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 功能类
@Component
@Slf4j
public class Requester implements CommandLineRunner {

    private final KafkaTemplate<String, Cargo> sendTemplate;

    @Autowired
    public Requester(KafkaTemplate<String, Cargo> template) {
        this.sendTemplate = template;
    }

    @Override
    public void run(String... args) throws Exception {
        Cargo load = Cargo.builder().action("test").build();
        ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
        this.sendTemplate.send(request);
        log.info("Custom package sent.");
    }
}

以下是消费方示例代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 消费方配置文件
spring:
    kafka:
        bootstrap-servers: 192.168.1.1:9092
        consumer:
            group-id: response-group
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            properties:
                spring.json:
                    trusted.packages: '*'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 主类文件
@SpringBootApplication
public class ReceiverApplication {

    private final KafkaProperties kProperties;

    @Autowired
    public ReceiverApplication(KafkaProperties props) {
        this.kProperties = props;
    }

    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class, args);
    }

    @Bean
    public ConsumerFactory<String, Cargo> consumerFactory() {
        Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
        // 以下三条语句与上面配置文件中的 consumer 的配置功能相同
        // 择一使用即可,一般不建议在此进行硬编码
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaProducerFactory<>(consumerProps);
    }

    // 这里有一个小坑,如果生成容器工厂的 Bean 方法名不是 kafkaListenerContainerFactory,
    // 就必须将 Bean 的名称设置为 kafkaListenerContainerFactory,
    // 否则将提示无法找到类型为 ConsumerFactory<Object, Object> 的 Bean,
    // 但实际上是没有找到监听器容器工厂 Bean。
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
        ConcurrentMessageListenerContainer<String, Cargo> container =
            kafkaListenerContainerFactory().createContainer("RPC-Request");
        container.getContainerProperties().setGroupId("replies");
        container.setAutoStartup(false);
        return container;
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 功能类
@Component
@Slf4j
public class Receiver {

    @KafkaListener(id = "rpc-server", topics = "RPC-Request")
    public void receive(Cargo cargo) {
        log.info("Received: {}", cargo.getAction());
    }
}

双向发送自定义对象

双向发送自定义对象实际上与双向发送字符串一样,需要将生产方和消费方结合起来,形成 RPC 的调用方和被调用方。在以下示例中,调用方和被调用方都采用如下的配置文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
spring:
    kafka:
        bootstrap-servers: 192.168.1.1:9092
        producer:
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        consumer:
            # 针对调用方和被调用方,group-id 可以不相同,也尽量不要相同
            group-id: response-group
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            properties:
                spring.json:
                    trusted.packages: '*'

以下是调用方的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 主类文件
@SpringBootApplication
public class SenderApplication {

    private final KafkaProperties kProperties;

    @Autowired
    public SenderApplication(KafkaProperties props) {
        this.kProperties = props;
    }

    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class, args);
    }

    @Bean
    public ProducerFactory<String, Cargo> producerFactory() {
        Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(producerProps);
    }

    @Bean
    public ConsumerFactory<String, Cargo> consumerFactory() {
        Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
        return new DefaultKafkaProducerFactory<>(consumerProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
        ConcurrentMessageListenerContainer<String, Cargo> container =
            kafkaListenerContainerFactory().createContainer("RPC-Response");
        container.getContainerProperties().setGroupId("requests");
        container.setAutoStartup(false);
        return container;
    }

    @Bean
    public ReplyingKafkaTemplate<String, Cargo, Cargo> replyingTemplate(
        ProducerFactory<String, Cargo> factory,
        ConcurrentMessageListenerContainer<String, Cargo> container
    ) {
        return new ReplyingKafkaTemplate<>(factory, container);
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 功能类
@Component
@Slf4j
public class Requester implements CommandLineRunner {

    private final ReplyingKafkaTemplate<String, Cargo, Cargo> replyTemplate;

    @Autowired
    public Requester(ReplyingKafkaTemplate<String, Cargo, Cargo> template) {
        this.replyTemplate = template;
    }

    @Override
    public void run(String... args) throws Exception {
        try {
            Cargo load = Cargo.builder().action("request").build();
            ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
            RequestReplyFuture<String, Cargo, Cargo> requestFuture = this.replyTemplate.sendAndReceive(request);
            Cargo response = requestFuture.get().value();
            log.info("Received: {}", response.getAction());
        } catch (InterruptedException | ExecutionException e) {
            log.error(e.getMessage());
        }
    }
}

以下是被调用方的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 主类文件
@SpringBootApplication
public class ReceiverApplication {

    private final KafkaProperties kProperties;

    @Autowired
    public ReceiverApplication(KafkaProperties props) {
        this.kProperties = props;
    }

    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class, args);
    }

    @Bean
    public ProducerFactory<String, Cargo> producerFactory() {
        Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(producerProps);
    }

    @Bean
    public ConsumerFactory<String, Cargo> consumerFactory() {
        Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
        return new DefaultKafkaProducerFactory<>(consumerProps);
    }

    @Bean
    public KafkaTemplate<String, Cargo> sendTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(sendTemplate());
        return factory;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
        ConcurrentMessageListenerContainer<String, Cargo> container =
            kafkaListenerContainerFactory().createContainer("RPC-Request");
        container.getContainerProperties().setGroupId("replies");
        container.setAutoStartup(false);
        return container;
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 功能类
@Component
@Slf4j
public class Receiver {

    @KafkaListener(id = "rpc-server", topics = "RPC-Request")
    @SendTo
    public Cargo receive(Cargo cargo) {
        log.info("Received: {}", cargo.getAction());
        return Cargo.builder().action("response").build();
    }
}

索引标签
JVM
Spring Boot
Java
Kafka
RPC
分布式通信