
样例的消息队列采用kafka。相关软件(kafka软件和zookeeper软件)的安装见初探一(安装篇)。
注意:新的版本,kafka不采用sink和source(注解被弃用),而是改用supplier和consumer方式。
演示架构
左侧分支是publisher启动即发布一个主题关联的消息到stream-kafka,然后subscriber(服务本身)从队列获取该主题的消息并打印出来;
右侧分支是访问http://localhost:7888/request,则发布另外一个主题相关的消息到stream-kafka,然后subscriber A(另一个服务)从队列获取该主题的消息并打印出来。
一、Publisher
pom.xml
引入spring-cloud-stream和spring-cloud-stream-binder-kafka
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.1 com.abcd MessageStreamPublisher0.0.1-SNAPSHOT MessageStreamPublisher MessageStreamPublisher 1.8 2021.0.0 org.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-stream-binder-kafkaorg.springframework.boot spring-boot-starter-testtest org.springframework.cloud spring-cloud-streamtest test-binder test-jar org.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
application.yml
server:
port: 7888
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
spring:
application:
name: msg_producer
cloud:
stream:
function:
definition: testSupplier;testConsumer;publishMsg
bindings:
testSupplier-out-0:
destination: topic-hello
content-type: text/plain
testConsumer-in-0:
destination: topic-hello
publishMsg-out-0:
destination: topic-user-request
content-type: text/plain
kafka:
binder:
brokers: localhost:9092
#zk-nodes: localhost:2181
auto-create-topics: true
required-acks: 1
java
application实现supplier和consumer同体,即Publisher和Subscriber的URL和端口一样。
特别注意:方法返回采用Supplier和Consumer(java.util.function.Supplier/Consumer)
Application代码 @EnableEurekaClient @SpringBootApplication public class MessageStreamPublisherApplication { private static final Logger log = LoggerFactory.getLogger(MessageStreamPublisherApplication.class); public static void main(String[] args) { SpringApplication.run(MessageStreamPublisherApplication.class, args); } @Bean public SuppliertestSupplier(){ return () -> { String message = "www.test.com"; log.info("Sending value: " + message); return message; }; } @Bean public Consumer testConsumer(){ return message -> { log.info("Received message: " + message); }; } }
以下service和controller实现supplier和consumer不同体(不同微服务实现)
Publisher Controller代码
@RestController
public class PublishController {
@Autowired
public PublishService publishService;
@GetMapping("request")
public void log(){
publishService.log("userRequest");
}
}
Publisher service代码
@Service
public class PublishService {
private BlockingQueue mRequestList = new linkedBlockingDeque<>();
public void log(String userRequest){
mRequestList.offer(userRequest);
}
@Bean
public Supplier publishMsg(){
return () -> {
String request = mRequestList.poll();
System.out.println("publish msg: " + request);
return request;
};
}
}
二、Subscriber A
pom.xml
引入spring-cloud-stream和spring-cloud-stream-binder-kafka
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.1 com.abcd MessageStreamSubscriberA0.0.1-SNAPSHOT MessageStreamSubscriberA MessageStreamSubscriberA 1.8 2021.0.0 org.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-stream-binder-kafkaorg.springframework.boot spring-boot-starter-testtest org.springframework.cloud spring-cloud-streamtest test-binder test-jar org.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
application.yml
server:
port: 7889
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
spring:
application:
name: msg_consumer_a
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
#zk-nodes: localhost:2181
auto-create-topics: true
bindings:
receiveMsg-in-0:
destination: topic-user-request
function:
definition: receiveMsg
java
Application代码
@SpringBootApplication
public class MessageStreamSubscriberAApplication {
public static void main(String[] args) {
SpringApplication.run(MessageStreamSubscriberAApplication.class, args);
}
}
Service代码
@Service
public class SubscriberAService {
@Bean
public Consumer receiveMsg(){
return request -> {
System.out.println("SubscriberA receive msg: " + request);
};
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)