小东子的个人技术专栏

dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(三)使用Spring AOP实现mysql的读写分离

1. 为什么要进行(主从复制)读写分离

分布式环境下数据库的读写分离策略是解决数据库读写性能瓶颈的一个关键解决方案,更是最大限度了提高了应用中读取 (Read)数据的速度和并发量。

在进行数据库读写分离的时候,我们首先要进行数据库的主从配置,最简单的是一台Master和一台Slave(大型网站系统的话,当然会很复杂,这里只是分析了最简单的情况)。通过主从配置主从数据库保持了相同的数据,我们在进行读操作的时候访问从数据库Slave,在进行写操作的时候访问主数据库Master。这样的话就减轻了一台服务器的压力。

在进行读写分离案例分析的时候。首先,配置数据库的主从复制,使用mysqlreplicate命令快速搭建 Mysql 主从复制。

2.MySQL主从复制的原理

2.1MySQL主从复制的原理

这里写图片描述

2.2MySQL主从复制的基本过程

MySQL主从复制的两种情况:同步复制和异步复制,实际复制架构中大部分为异步复制。复制的基本过程如下:

  1. Slave上面的IO进程连接上Master,并请求从指定日志文件的指定位置(或者从最开始的日志)之后的日志内容。
  2. Master接收到来自Slave的IO进程的请求后,负责复制的IO进程会根据请求信息读取日志指定位置之后的日志信息,返回给Slave的IO进程。返回信息中除了日志所包含的信息之外,还包括本次返回的信息已经到Master端的bin-log文件的名称以及bin-log的位置。
  3. Slave的IO进程接收到信息后,将接收到的日志内容依次添加到Slave端的relay-log文件的最末端,并将读取到的Master端的 bin-log的文件名和位置记录到master-info文件中,以便在下一次读取的时候能够清楚的告诉Master“我需要从某个bin-log的哪个位置开始往后的日志内容,请发给我”。
  4. Slave的Sql进程检测到relay-log中新增加了内容后,会马上解析relay-log的内容成为在Master端真实执行时候的那些可执行的内容,并在自身执行。

3.开始MySQL5.7.12的主从复制教程:

3.1 MySQL5.6开始主从复制有两种方式:基于日志(binlog);基于GTID(全局事务标示符)。

需要注意的是:GTID方式不支持临时表!所以如果你的业务系统要用到临时表的话就不要考虑这种方式了,至少目前最新版本MySQL5.6.12的GTID复制还是不支持临时表的。

所以此篇教程主要是告诉大家如何通过日志(binlog)方式做主从复制!

3.2 MySQL官方提供的MySQL Replication教程:

http://dev.mysql.com/doc/refman/5.6/en/replication.html

这个官方教程强烈建议大家阅读。

3.3、准备工作:

  1. 配置MySQL主从复制(读写分离)之前,需要在主从两台服务器先安装好MySQL5.7。
  2. 目前最新的MySQL5.7 GA版本是MySQL5.7.12(点此下载MySQL5.7.12源码包)。
  3. 需要注意如下两点:

(1)如果你需要用于生产环境,安教程安装MySQL时不要急着做mysql启动操作。建议把mysql初始化生成的/usr/local/mysql/mysql.cnf删除,然后把你优化好的mysql配置文件my.cnf放到/etc下。
(2)建议主备两台服务器在同一局域网,主备两台数据库网络需要互通。

  1. 我的所使用的环境:

主数据库IP:192.168.0.104

从数据库IP:192.168.0.105

3.4 修改主数据库的的配置文件:

1
2
3
4
5
6
7
8
9
10
1 [mysqld]
2 server-id=1
3 log-bin=mysqlmaster-bin.log
4 sync_binlog=1
5 #注意:下面这个参数需要修改为服务器内存的70%左右
6 innodb_buffer_pool_size = 512M
7 innodb_flush_log_at_trx_commit=1
8 sql_mode=STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO
9 lower_case_table_names=1
10 log_bin_trust_function_creators=1

修改之后要重启mysql服务:

1
# /etc/init.d/mysql restart

3.5、修改从数据库的的配置文件:

注意:(server-id配置为大于主数据库的server-id即可)

1
2
3
4
5
6
7
8
9
10
1 [mysqld]
2 server-id=2
3 log-bin=mysqlslave-bin.log
4 sync_binlog=1
5 #注意:下面这个参数需要修改为服务器内存的70%左右
6 innodb_buffer_pool_size = 512M
7 innodb_flush_log_at_trx_commit=1
8 sql_mode=STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO
9 lower_case_table_names=1
10 log_bin_trust_function_creators=1

修改之后要重启mysql:

1
# /etc/init.d/mysql restart

附一个我已优化过的从数据库配置文件:点此下载

3.6、SSH登录到主数据库:

3.6.1. 在主数据库上创建用于主从复制的账户(192.168.0.104换成你的从数据库IP):

1
2
1 # mysql -uroot -p
2 mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'192.168.0.104' IDENTIFIED BY 'repl';

3.6.2. 主数据库锁表(禁止再插入数据以获取主数据库的的二进制日志坐标):

mysql> FLUSH TABLES WITH READ LOCK;

3.6.3. 然后克隆一个SSH会话窗口,在这个窗口打开MySQL命令行:

这里写图片描述

在这个例子中,二进制日志文件是mysqlmaster-bin.000001,位置是332,记录下这两个值,稍后要用到。

3.6.4 在主数据库上使用mysqldump命令创建一个数据快照:

mysqldump -uroot -p -h127.0.0.1 -P3306 –all-databases –triggers –routines –events >all.sql

注意:接下来会提示你输入mysql数据库的root密码,输入完成后,如果当前数据库不大,很快就能导出完成。

3.6.5 解锁第2.6.2步主数据的锁表操作:

mysql> UNLOCK TABLES;

3.7、SSH登录到从数据库:

(1)通过FTP、SFTP或其他方式,将上一步备份的主数据库快照all.sql上传到从数据库某个路径,例如我放在了/home/lidong/目录下;

(2)从导入主的快照:

cd /home/lidong

# mysql -uroot -p -h127.0.0.1 -P3306 < all.sql

注意:接下来会提示你输入mysql数据库的root密码,输入完成后,如果当前数据库不大,很快就能导入完成。

(3)给从数据库设置复制的主数据库信息(注意修改MASTER_LOG_FILE和MASTER_LOG_POS的值):

1
2
3
4
5
6
7
8
9
# mysql -uroot -p
mysql> CHANGE MASTER TO MASTER_HOST='192.168.100.2',MASTER_USER='repl',MASTER_PASSWORD='repl',MASTER_LOG_FILE='mysqlmaster-bin.000001',MASTER_LOG_POS=332;
# 然后启动从数据库的复制线程:
mysql> START slave;
# 接着查询数据库的slave状态:
mysql> SHOW slave STATUS \G
# 如果下面两个参数都是Yes,则说明主从配置成功!
Slave_IO_Running: Yes
Slave_SQL_Running: Yes

(4)接下来我们可以在主数据库上创建数据库、表、插入数据,然后看从数据库是否同步了这些操作。

一主一从的主从复制就实现了。

4、实现读写分离的两种方法

  1. 第一种方式是我们最常用的方式,就是定义2个数据库连接,一个是MasterDataSource,另一个是SlaveDataSource。更新数据时我们读取MasterDataSource,查询数据时我们读取SlaveDataSource。这种方式很简单。
  2. 第二种方式动态数据源切换,就是在程序运行时,把数据源动态织入到程序中,从而选择读取主库还是从库。主要使用的技术是:Annotation,spring AOP ,反射。

    5.Spring AOP 实现MySQL读写分离的具体实现

    5.1创建用于配置动态分配的读写的数据源ChooseDataSource.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lidong.util.aspect;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
/**
* 获取数据源,用于动态切换数据源
*/
public class ChooseDataSource extends AbstractRoutingDataSource {
public static Map<String, List<String>> METHOD_TYPE_MAP = new HashMap<String, List<String>>();
/**
* 实现父类中的抽象方法,获取数据源名称
* @return
*/
protected Object determineCurrentLookupKey() {
return DataSourceHandler.getDataSource();
}
// 设置方法名前缀对应的数据源
public void setMethodType(Map<String, String> map) {
for (String key : map.keySet()) {
List<String> v = new ArrayList<String>();
String[] types = map.get(key).split(",");
for (String type : types) {
if (StringUtils.isNotBlank(type)) {
v.add(type);
}
}
METHOD_TYPE_MAP.put(key, v);
}
}
}

5.2 DataSourceAspect进行具体方法的AOP拦截

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.lidong.util.aspect;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;
/**
* 切换数据源(不同方法调用不同数据源)
*/
@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class DataSourceAspect {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Pointcut("execution(* com.lidong.core.*.dao.*.*(..))")
public void aspect() {
}
/**
* 配置前置通知,使用在方法aspect()上注册的切入点
*/
@Before("aspect()")
public void before(JoinPoint point) {
String className = point.getTarget().getClass().getName();
String method = point.getSignature().getName();
logger.info(className + "." + method + "(" + StringUtils.join(point.getArgs(), ",") + ")");
try {
for (String key : ChooseDataSource.METHOD_TYPE_MAP.keySet()) {
for (String type : ChooseDataSource.METHOD_TYPE_MAP.get(key)) {
if (method.startsWith(type)) {
DataSourceHandler.putDataSource(key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

5.3 数据源的Handler类 DataSourceHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lidong.util.aspect;
/**
* 数据源的Handler类
* @author lidong
*
*/
public class DataSourceHandler {
// 数据源名称线程池
public static final ThreadLocal<String> holder = new ThreadLocal<String>();
/**
* 在项目启动的时候将配置的读、写数据源加到holder中
*/
public static void putDataSource(String datasource) {
holder.set(datasource);
}
/**
* 从holer中获取数据源字符串
*/
public static String getDataSource() {
return holder.get();
}
}

5.4 spring-db.xml读写数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
driver=com.mysql.jdbc.Driver
url=jdbc:mysql://127.0.0.1:3306/hms
url1=jdbc:mysql://192.168.0.105:3306/hms
username=root
password=123456
password1=
initialSize=0
maxActive=20
maxIdle=20
minIdle=1
maxWait=60000
timeBetweenEvictionRunsMillis=3000
minEvictableIdleTimeMillis=300000
maxPoolPreparedStatementPerConnectionSize=20
druid.filters=
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd">
<!-- 引入配置文件 <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:jdbc.properties" /> </bean> -->
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
<value>classpath:mongo.properties</value>
<value>classpath:redis.properties</value>
</list>
</property>
</bean>
<bean id="statFilter" class="com.alibaba.druid.filter.stat.StatFilter"
lazy-init="true">
<property name="logSlowSql" value="true" />
<property name="mergeSql" value="true" />
</bean>
<bean id="readDataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="url" value="${url}" />
<property name="username" value="${username}" />
<property name="password" value="${password}" />
<property name="filters" value="stat" />
<property name="maxActive" value="20" />
<property name="initialSize" value="1" />
<property name="maxWait" value="60000" />
<property name="minIdle" value="1" />
<property name="timeBetweenEvictionRunsMillis" value="3000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="SELECT 'x'" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize"
value="20" />
</bean>
<bean id="writeDataSource" class="com.alibaba.druid.pool.DruidDataSource"
destroy-method="close" init-method="init" lazy-init="true">
<property name="driverClassName" value="${driver}" />
<property name="url" value="${url1}" />
<property name="username" value="${username}" />
<property name="password" value="${password1}" />
<property name="initialSize" value="${initialSize}" />
<property name="maxActive" value="${maxActive}" />
<property name="minIdle" value="${minIdle}" />
<property name="maxWait" value="${maxWait}" />
<property name="proxyFilters">
<list>
<ref bean="statFilter" />
</list>
</property>
<property name="filters" value="${druid.filters}" />
<property name="connectionProperties" value="password=${password1}" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="validationQuery" value="SELECT 'x'" />
<property name="timeBetweenLogStatsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="${minEvictableIdleTimeMillis}" />
<property name="timeBetweenEvictionRunsMillis" value="${timeBetweenEvictionRunsMillis}" />
</bean>
<!-- 配置动态分配的读写 数据源 -->
<bean id="dataSource" class="com.lidong.util.aspect.ChooseDataSource"
lazy-init="true">
<property name="targetDataSources">
<map key-type="java.lang.String" value-type="javax.sql.DataSource">
<!-- write -->
<entry key="write" value-ref="writeDataSource" />
<!-- read -->
<entry key="read" value-ref="readDataSource" />
</map>
</property>
<property name="defaultTargetDataSource" ref="writeDataSource" />
<property name="methodType">
<map key-type="java.lang.String">
<!-- read -->
<entry key="read" value=",get,select,count,list,query" />
<!-- write -->
<entry key="write" value=",add,create,update,delete,remove," />
</map>
</property>
</bean>
</beans>

配置了readDataSource和writeDataSource两个数据源,但是交给
SqlSessionFactoryBean进行管理的只有dataSource,使用了com.lidong.util.aspect.ChooseDataSource来进行数据源的选择,默认的数据源是writeDataSource。methodType是定义了方法的关键字,那些是选择读库,那个是写库。

5.5 在Dao层通过切面选择数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.lidong.core.user.service;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.lidong.api.service.user.IUserService;
import com.lidong.core.user.dao.IUserDao;
import com.lidong.model.user.User;
@Service("userService")
public class UserServiceImp implements IUserService {
@Resource
IUserDao mIUserDao;
@Override
public User getUserById(int userId) {
return mIUserDao.selectByPrimaryKey(userId);
}
@Override
public User getUserByUsername(String username) {
return mIUserDao.selectByPrimaryUsername(username);
}
@Override
public void addUser(User user) {
mIUserDao.insert(user);
}
@Override
public List<User> getAllUser() {
return mIUserDao.selectAllUsers();
}
@Override
public int delUserById(Integer userId) {
return mIUserDao.deleteByPrimaryKey(userId);
}
@Override
public int updateUser(User user) {
return mIUserDao.updateByPrimaryKey(user);
}
}

注意:通过看com.alibaba.druid.pool.DruidDataSourceStatLoggerImpl 是一分钟发一次心跳,监听写数据源有没有宕机。如果宕机会进行重连。

[代码地址]