基于kafka+ehcache+redis完成缓存数据生产服务的开发

发布 : 2017-07-04 分类 : 大数据 浏览 :

将kafka整合到spring boot中

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1</version>
</dependency>

项目结构图

ProductInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.matrix.eshop.cache.model;

/**
* 商品信息
*
* @author Administrator
*
*/
public class ProductInfo {

private Long id;
private String name;
private Double price;
private String pictureList;
private String specification;
private String service;
private String color;
private String size;
private Long shopId;

public ProductInfo() {

}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Double getPrice() {
return price;
}

public void setPrice(Double price) {
this.price = price;
}

public String getPictureList() {
return pictureList;
}

public void setPictureList(String pictureList) {
this.pictureList = pictureList;
}

public String getSpecification() {
return specification;
}

public void setSpecification(String specification) {
this.specification = specification;
}

public String getService() {
return service;
}

public void setService(String service) {
this.service = service;
}

public String getColor() {
return color;
}

public void setColor(String color) {
this.color = color;
}

public String getSize() {
return size;
}

public void setSize(String size) {
this.size = size;
}

public Long getShopId() {
return shopId;
}

public void setShopId(Long shopId) {
this.shopId = shopId;
}

@Override
public String toString() {
return "ProductInfo [id=" + id + ", name=" + name + ", price=" + price + ", pictureList=" + pictureList
+ ", specification=" + specification + ", service=" + service + ", color=" + color + ", size=" + size
+ ", shopId=" + shopId + "]";
}

}

ShopInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.matrix.eshop.cache.model;

/**
* 店铺信息
*
* @author matrix
*
*/
public class ShopInfo {
private Long id;
private String name;
private Integer level;
private Double goodCommentRate;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getLevel() {
return level;
}

public void setLevel(Integer level) {
this.level = level;
}

public Double getGoodCommentRate() {
return goodCommentRate;
}

public void setGoodCommentRate(Double goodCommentRate) {
this.goodCommentRate = goodCommentRate;
}

@Override
public String toString() {
return "ShopInfo [id=" + id + ", name=" + name + ", level=" + level + ", goodCommentRate=" + goodCommentRate
+ "]";
}
}

Application

1
2
3
4
5
6
7
8
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean() {
ServletListenerRegistrationBean servletListenerRegistrationBean =
new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}

SpringContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.matrix.eshop.cache.spring;

import org.springframework.context.ApplicationContext;

/**
* spring上下文
*
* @author Administrator
*
*/
public class SpringContext {

private static ApplicationContext applicationContext;

public static ApplicationContext getApplicationContext() {
return applicationContext;
}

public static void setApplicationContext(ApplicationContext applicationContext) {
SpringContext.applicationContext = applicationContext;
}

}

编写业务逻辑

1
2
3
4
5
(1).两种服务会发送来数据变更消息:商品信息服务,商品店铺信息服务,每个消息都包含服务名以及商品id
(2).接收到消息之后,根据商品id到对应的服务拉取数据,这一步,在代码里面写死,会获取到什么数据,不去实际再写其他的服务去调用了
(3).商品信息:id,名称,价格,图片列表,商品规格,售后信息,颜色,尺寸
(4).商品店铺信息:id,店铺名称,店铺等级,店铺好评率
(5).分别拉取到了数据之后,将数据组织成json串,然后分别存储到ehcache中,和redis缓存中

测试业务逻辑

1
2
3
4
5
6
(1).创建一个kafka topic
(2).在命令行启动一个kafka producer
(3).启动系统,消费者开始监听kafka topic
(4).在producer中,分别发送两条消息,一个是商品信息服务的消息,一个是商品店铺信息服务的消息
(5).能否接收到两条消息,并模拟拉取到两条数据,同时将数据写入ehcache中,并写入redis缓存中
(6).ehcache通过打印日志方式来观察,redis通过手工连接上去来查询

生产数据

1
2
3
4
[root@matrix-cache02 ~]# cd /usr/local/kafka/
[root@matrix-cache02 kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.31.231:9092,192.168.31.232:9092,192.168.31.233:9092 --topic cache-message
{"serviceId":"productInfoService","productId":1}
{"serviceId":"shopInfoService","shopId":1}

启动程序

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2017/07/04/基于kafka+ehcache+redis完成缓存数据生产服务的开发/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹