小东子的个人技术专栏

重点关注Android、Java、智能硬件、JavaEE、react-native,Swift,微信小程序


  • 首页

  • 归档

  • 标签
小东子的个人技术专栏

dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(八)SpringMVC上传文件到FastDFS

发表于 2017-02-27   |   字数统计: 2,964(字)   |   阅读时长: 13(分)

目前项目中需要存储一些文件、视频等。于是乎,查找了一些关于文件服务器资料。其中有Lustre、HDFS、Gluster、Alluxio、Ceph 、FastDFS。下面简单介绍一下:

  1. Lustre 是一个大规模的、安全可靠的、具备高可用性的集群文件系统,它是由SUN公司开发和维护的。该项目主要的目的就是开发下一代的集群文件系统,目前可以支持超过10000个节点,数以PB的数据存储量。

  2. HDFS Hadoop Distributed File System,简称HDFS,是一个分布式文件系统。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。

  3. GlusterFS 是一个集群的文件系统,支持PB级的数据量。GlusterFS 通过RDMA和TCP/IP方式将分布到不同服务器上的存储空间汇集成一个大的网络化并行文件系统。

  4. Alluxio 前身是Tachyon,是以内存为中心的分布式文件系统,拥有高性能和容错能力,能够为集群框架(如Spark、MapReduce)提供可靠的内存级速度的文件共享服务。

  5. Ceph 是新一代开源分布式文件系统,主要目标是设计成基于POSIX的没有单点故障的分布式文件系统,提高数据的容错性并实现无缝的复制。

  6. FastDFS是一个开源的轻量级分布式文件系统,它对文件进行管理,功能包括:文件存储、文件同步、文件访问(文件上传、文件下载)等,解决了大容量存储和负载均衡的问题。特别适合以文件为载体的在线服务,如相册网站、视频网站等等。FastDFS为互联网量身定制,充分考虑了冗余备份、负载均衡、线性扩容等机制,并注重高可用、高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传、下载等服务。
    通过以上6中文件的服务器的介绍,我们业务非常适合选择用FastDFS,所以就了解学习了一番,感觉确实颇为强大,在此再次感谢淘宝资深架构师余庆大神开源了如此优秀的轻量级分布式文件系统,本篇文章就记录一下FastDFS的最新版本5.0.9在CentOS7中的安装与配置。

1.Fastdfs的简介

了解一下基础概念,FastDFS是一个开源的轻量级分布式文件系统,由跟踪服务器(tracker server)、存储服务器(storage server)和客户端(client)三个部分组成,主要解决了海量数据存储问题,特别适合以中小文件(建议范围:4KB < file_size <500MB)为载体的在线服务。

FastDFS系统结构如下图所示:

这里写图片描述

跟踪器和存储节点都可以由一台多台服务器构成。跟踪器和存储节点中的服务器均可以随时增加或下线而不会影响线上服务。其中跟踪器中的所有服务器都是对等的,可以根据服务器的压力情况随时增加或减少。

为了支持大容量,存储节点(服务器)采用了分卷(或分组)的组织方式。存储系统由一个或多个卷组成,卷与卷之间的文件是相互独立的,所有卷 的文件容量累加就是整个存储系统中的文件容量。一个卷可以由一台或多台存储服务器组成,一个卷下的存储服务器中的文件都是相同的,卷中的多台存储服务器起 到了冗余备份和负载均衡的作用。

在卷中增加服务器时,同步已有的文件由系统自动完成,同步完成后,系统自动将新增服务器切换到线上提供服务。

当存储空间不足或即将耗尽时,可以动态添加卷。只需要增加一台或多台服务器,并将它们配置为一个新的卷,这样就扩大了存储系统的容量。

2.FastDFS的下载

Fastdfs的稳定版下载地址
这里写图片描述

3.FastDFS的 安装

详细见
CentOS 7 安装配置分布式文件系统 FastDFS 5.0.5

4.SpringMVC上传文件到FastDFS

####4.1 fast_client.cnf配置

1
2
3
4
5
6
7
8
9
10
11
connect_timeout = 2
#网络超时时间
network_timeout = 30
#字符集
charset = UTF-8
#跟踪服务器的端口
http.tracker_http_port = 9099
http.anti_steal_token = no
http.secret_key = FastDFS1234567890
#跟踪服务器地址 。跟踪服务器主要是起到负载均衡的作用
tracker_server = 192.168.0.116:22122

4.2 fastdfs文件上传的流程

这里写图片描述

上传文件交互过程:

  1. client询问tracker上传到的storage,不需要附加参数;
  2. tracker返回一台可用的storage;
  3. client直接和storage通讯完成文件上传。

4.3 FastDFS文件下载的流程

这里写图片描述

下载文件交互过程:

  1. client询问tracker下载文件的storage,参数为文件标识(卷名和文件名);
  2. tracker返回一台可用的storage;
  3. client直接和storage通讯完成文件下载。

需要说明的是,client为使用FastDFS服务的调用方,client也应该是一台服务器,它对tracker和storage的调用均为服务器间的调用。

4.4 FastDFSUtil 的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package com.lidong.dubbo.util;
import org.csource.common.MyException;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
/**
* @项目名称:lidong-dubbo
* @类名:FastDFSUtil
* @类的描述: FastDFS 上传文件到文件服务器
* @作者:lidong
* @创建时间:2017/2/6 下午5:23
* @公司:chni
* @QQ:1561281670
* @邮箱:lidong1665@163.com
*/
public class FastDFSUtil {
private final static
Logger logger = LoggerFactory.getLogger(FastDFSUtil.class);
/**
*上传服务器本地文件-通过Linux客户端,调用客户端命令上传
* @param filePath 文件绝对路径
* @return Map<String,Object> code-返回代码, group-文件组, msg-文件路径/错误信息
*/
public static Map<String, Object> uploadLocalFile(String filePath) {
Map<String, Object> retMap = new HashMap<String, Object>();
/**
* 1.上传文件的命令
*/
String command = "fdfs_upload_file /etc/fdfs/client.conf " + filePath;
/**
* 2.定义文件的返回信息
*/
String fileId = "";
InputStreamReader inputStreamReader = null;
BufferedReader bufferedReader = null;
try {
/**
* 3.通过调用api, 执行linux命令上传文件
*/
Process process = Runtime.getRuntime().exec(command);
/**
* 4.读取上传后返回的信息
*/
inputStreamReader = new InputStreamReader(process.getInputStream());
bufferedReader = new BufferedReader(inputStreamReader);
String line;
if ((line = bufferedReader.readLine()) != null) {
fileId = line;
}
/**
* 5.如果fileId包含M00,说明文件已经上传成功。否则文件上传失败
*/
if (fileId.contains("M00")) {
retMap.put("code", "0000");
retMap.put("group", fileId.substring(0, 6));
retMap.put("msg", fileId.substring(7, fileId.length()));
} else {
retMap.put("code", "0001"); //上传错误
retMap.put("msg", fileId); //返回信息
}
} catch (Exception e) {
logger.error("IOException:" + e.getMessage());
retMap.put("code", "0002");
retMap.put("msg", e.getMessage());
}finally {
if (inputStreamReader!=null){
try {
inputStreamReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return retMap;
}
/**
* Description: 直接通过fdfs java客户端上传到服务器-读取本地文件上传
*
* @param filePath 本地文件绝对路径
* @return Map<String,Object> code-返回代码, group-文件组, msg-文件路径/错误信息
*/
public static Map<String, Object> upload(String filePath) {
Map<String, Object> retMap = new HashMap<String, Object>();
File file = new File(filePath);
TrackerServer trackerServer = null;
StorageServer storageServer = null;
if (file.isFile()) {
try {
String tempFileName = file.getName();
byte[] fileBuff = FileUtil.getBytesFromFile(file);
String fileId = "";
//截取后缀
String fileExtName = tempFileName.substring(tempFileName.lastIndexOf(".") + 1);
ConfigAndConnectionServer configAndConnectionServer = new ConfigAndConnectionServer().invoke(1);
StorageClient1 storageClient1 = configAndConnectionServer.getStorageClient1();
storageServer = configAndConnectionServer.getStorageServer();
trackerServer = configAndConnectionServer.getTrackerServer();
/**
* 4.设置文件的相关属性。调用客户端的upload_file1的方法上传文件
*/
NameValuePair[] metaList = new NameValuePair[3];
//原始文件名称
metaList[0] = new NameValuePair("fileName", tempFileName);
//文件后缀
metaList[1] = new NameValuePair("fileExtName", fileExtName);
//文件大小
metaList[2] = new NameValuePair("fileLength", String.valueOf(file.length()));
//开始上传文件
fileId = storageClient1.upload_file1(fileBuff, fileExtName, metaList);
retMap = handleResult(retMap, fileId);
} catch (Exception e) {
e.printStackTrace();
retMap.put("code", "0002");
retMap.put("msg", e.getMessage());
}finally {
/**
* 5.关闭跟踪服务器的连接
*/
colse(storageServer, trackerServer);
}
} else {
retMap.put("code", "0001");
retMap.put("msg", "error:本地文件不存在!");
}
return retMap;
}
/**
* Description:远程选择上传文件-通过MultipartFile
*
* @param file 文件流
* @return Map<String,Object> code-返回代码, group-文件组, msg-文件路径/错误信息
*/
public static Map<String, Object> upload(MultipartFile file) {
Map<String, Object> retMap = new HashMap<String, Object>();
TrackerServer trackerServer = null;
StorageServer storageServer = null;
try {
if (file.isEmpty()) {
retMap.put("code", "0001");
retMap.put("msg", "error:文件为空!");
} else {
ConfigAndConnectionServer configAndConnectionServer = new ConfigAndConnectionServer().invoke(1);
StorageClient1 storageClient1 = configAndConnectionServer.getStorageClient1();
storageServer = configAndConnectionServer.getStorageServer();
trackerServer = configAndConnectionServer.getTrackerServer();
String tempFileName = file.getOriginalFilename();
//设置元信息
NameValuePair[] metaList = new NameValuePair[3];
//原始文件名称
metaList[0] = new NameValuePair("fileName", tempFileName);
//文件后缀
byte[] fileBuff = file.getBytes();
String fileId = "";
//截取后缀
String fileExtName = tempFileName.substring(tempFileName.lastIndexOf(".") + 1);
metaList[1] = new NameValuePair("fileExtName", fileExtName);
//文件大小
metaList[2] = new NameValuePair("fileLength", String.valueOf(file.getSize()));
/**
* 4.调用客户端呢的upload_file1的方法开始上传文件
*/
fileId = storageClient1.upload_file1(fileBuff, fileExtName, metaList);
retMap = handleResult(retMap, fileId);
}
} catch (Exception e) {
retMap.put("code", "0002");
retMap.put("msg", "error:文件上传失败!");
}finally {
/**
* 5.关闭跟踪服务器的连接
*/
colse(storageServer, trackerServer);
}
return retMap;
}
/**
* 下载文件
*
* @param response
* @param filepath 数据库存的文件路径
* @param downname 下载后的名称
* filepath M00/开头的文件路径
* group 文件所在的组 如:group0
* @throws IOException
*/
public static void download(HttpServletResponse response, String group, String filepath, String downname) {
StorageServer storageServer = null;
TrackerServer trackerServer = null;
try {
ConfigAndConnectionServer configAndConnectionServer = new ConfigAndConnectionServer().invoke(0);
StorageClient storageClient = configAndConnectionServer.getStorageClient();
storageServer = configAndConnectionServer.getStorageServer();
trackerServer = configAndConnectionServer.getTrackerServer();
/**
*4.调用客户端的下载download_file的方法
*/
byte[] b = storageClient.download_file(group, filepath);
if (b == null) {
logger.error("Error1 : file not Found!");
response.getWriter().write("Error1 : file not Found!");
} else {
logger.info("下载文件..");
downname = new String(downname.getBytes("utf-8"), "ISO8859-1");
response.setHeader("Content-Disposition", "attachment;fileName=" + downname);
OutputStream out = response.getOutputStream();
out.write(b);
out.close();
}
} catch (Exception e) {
e.printStackTrace();
try {
response.getWriter().write("Error1 : file not Found!");
} catch (IOException e1) {
e1.printStackTrace();
}
}finally {
/**
* 5.关闭跟踪服务器的连接
*/
colse(storageServer, trackerServer);
}
}
/**
* 删除文件
*
* @param group 文件分组, filepath 已M00/ 开头的文件路径
* @return Map<String,Object> code-返回代码, msg-错误信息
*/
public static Map<String, Object> delete(String group, String filepath) {
Map<String, Object> retMap = new HashMap<String, Object>();
StorageServer storageServer = null;
TrackerServer trackerServer = null;
try {
ConfigAndConnectionServer configAndConnectionServer = new ConfigAndConnectionServer().invoke(0);
StorageClient storageClient = configAndConnectionServer.getStorageClient();
storageServer = configAndConnectionServer.getStorageServer();
trackerServer = configAndConnectionServer.getTrackerServer();
/**
* 4.调用客户端的delete_file方法删除文件
*/
int i = storageClient.delete_file(group, filepath);
if (i == 0) {
retMap.put("code", "0000");
retMap.put("msg", "删除成功!");
} else {
retMap.put("code", "0001");
retMap.put("msg", "文件不存在!");
}
} catch (Exception e) {
e.printStackTrace();
retMap.put("code", "0002");
retMap.put("msg", "删除失败!");
} finally {
/**
* 5.关闭跟踪服务器的连接
*/
colse(storageServer, trackerServer);
}
return retMap;
}
/**
* 关闭服务器
*
* @param storageServer
* @param trackerServer
*/
private static void colse(StorageServer storageServer, TrackerServer trackerServer) {
if (storageServer != null && trackerServer != null) {
try {
storageServer.close();
trackerServer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理上传到文件服务器之后,返回来的结果
*
* @param retMap
* @param fileId
* @return
*/
private static Map<String, Object> handleResult(Map<String, Object> retMap, String fileId) {
if (!fileId.equals("") && fileId != null) {
retMap.put("code", "0000");
retMap.put("group", fileId.substring(0, 6));
retMap.put("msg", fileId.substring(7, fileId.length()));
} else {
retMap.put("code", "0003");
retMap.put("msg", "error:上传失败!");
}
return retMap;
}
/**
* @项目名称:lidong-dubbo
* @类名:FastDFSUtil
* @类的描述: ConfigAndConnectionServer
* @作者:lidong
* @创建时间:2017/2/7 上午8:47
* @公司:chni
* @QQ:1561281670
* @邮箱:lidong1665@163.com
*/
private static class ConfigAndConnectionServer {
private TrackerServer trackerServer;
private StorageServer storageServer;
private StorageClient storageClient;
private StorageClient1 storageClient1;
public TrackerServer getTrackerServer() {
return trackerServer;
}
public StorageServer getStorageServer() {
return storageServer;
}
public StorageClient getStorageClient() {
return storageClient;
}
public StorageClient1 getStorageClient1() {
return storageClient1;
}
public ConfigAndConnectionServer invoke(int flag) throws IOException, MyException {
/**
* 1.读取fastDFS客户端配置文件
*/
ClassPathResource cpr = new ClassPathResource("fdfs_client.conf");
/**
* 2.配置文件的初始化信息
*/
ClientGlobal.init(cpr.getClassLoader().getResource("fdfs_client.conf").getPath());
TrackerClient tracker = new TrackerClient();
/**
* 3.建立连接
*/
trackerServer = tracker.getConnection();
storageServer = null;
/**
* 如果flag=0时候,构造StorageClient对象否则构造StorageClient1
*/
if (flag == 0) {
storageClient = new StorageClient(trackerServer, storageServer);
} else {
storageClient1 = new StorageClient1(trackerServer, storageServer);
}
return this;
}
}
}

4.5 SpringMVC 上传文件到Fastdfs文件服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RequestMapping("/upload")
public String addUser(@RequestParam("file") CommonsMultipartFile[] files,
HttpServletRequest request){
for(int i = 0;i<files.length;i++){
logger.info("fileName-->" + files[i].getOriginalFilename()+" file-size--->"+files[i].getSize());
Map<String, Object> retMap = FastDFSUtil.upload(files[i]);
String code = (String) retMap.get("code");
String group = (String) retMap.get("group");
String msg = (String) retMap.get("msg");
if ("0000".equals(code)){
logger.info("文件上传成功");
//TODO:将上传文件的路径保存到mysql数据库
}else {
logger.info("文件上传失败");
}
}
return "/success";
}

基本上就这么多。大家在学习的过程中如果遇到问题。可以直接在下面评论、吐槽。

代码地址

小东子的个人技术专栏

RabbitMQ工作原理和Spring的集成

发表于 2017-02-04   |   字数统计: 2,108(字)   |   阅读时长: 9(分)

在介绍RadditMQ之前,先介绍一下AMQP。

1.什么是AMPQ?

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

2.AMPQ的特征

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

3.AMPQ的实现

3.1 OpenAMQ

AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS

3.2 Apache Qpid

Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NET

3.3 Redhat Enterprise MRG

实现了AMQP的最新版本0-10,提供了丰富的特征集,比如完全管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持C++、Ruby、Java、JMS、Python和.NET

3.4 RabbitMQ

一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD、Centos中的实现

4.RabbitMQ是什么

RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。

5.RabbitMQ能为你做些什么?

消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接。消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶。

或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

6.RabbitMQ的结构图

这里写图片描述

几个概念说明:

  • Broker:简单来说就是消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • Producer:消息生产者,就是投递消息的程序。
  • Consumer:消息消费者,就是接受消息的程序。
  • Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange,并设置相关属性。
  3. 客户端声明一个queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  5. 客户端投递消息到exchange。
  6. exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

  1. exchange持久化,在声明时指定durable => 1
  2. queue持久化,在声明时指定durable => 1
  3. 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

7.RabbitMQ之交换机

消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。

RabbitMQ包含四种不同的交换机类型:

  1. Direct exchange:直连交换机,转发消息到routigKey指定的队列,如果消息的routigKey和binding的routigKey直接匹配的话,消息将会路由到该队列
  2. Fanout exchange:扇形交换机,转发消息到所有绑定队列(速度最快),不管消息的routigKey息和binding的参数表头部信息和值是什么,消息将会路由到所有的队列
  3. Topic exchange:主题交换机,按规则转发消息(最灵活),如果消息的routigKey和binding的routigKey符合通配符匹配的话,消息将会路由到该队列
  4. Headers exchange:首部交换机 ,如果消息的头部信息和binding的参数表中匹配的话,消息将会路由到该队列。

8.Spring的集成RabbitMQ

8.1 在pom.xml 配置依赖的包

1
2
3
4
5
6
7
8
9
10
11
12
<!-- rabbitmq -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq-client.version}</version>
</dependency>
<!-- spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
</dependency>

8.2 rabbit.properties 配置文件

1
2
3
4
rabbit_username=guest
rabbit_password=guest
rabbit_host=127.0.0.1
rabbit_port=5672

8.3 创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lidong.dubbo.core.spittle.service;
import com.lidong.dubbo.api.spittle.service.IMessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @项目名称:lidong-dubbo
* @类名:MessageProducerImp
* @类的描述:
* @作者:lidong
* @创建时间:2017/2/4 上午10:01
* @公司:chni
* @QQ:1561281670
* @邮箱:lidong1665@163.com
*/
@Service
public class MessageProducerServiceImp implements IMessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducerServiceImp.class);
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(Object message) {
logger.info("发送消息");
logger.info("to send message:",message);
amqpTemplate.convertAndSend("queueTestKey",message);
}
}

8.4 创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.lidong.dubbo.core.util.customer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @项目名称:lidong-dubbo
* @类名:MessageConsumer
* @类的描述: RabbitMQ 消息消费者
* @作者:lidong
* @创建时间:2017/2/4 上午9:33
* @公司:chni
* @QQ:1561281670
* @邮箱:lidong1665@163.com
*/
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("------消费者处理消息------");
logger.info("receive message",message);
}
}

8.5 配置spring-rabbitmq.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="rabbitConnectionFactory"
username="${rabbit_username}"
password="${rabbit_password}"
host="${rabbit_host}"
port="${rabbit_port}" />
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
exchange="exchangeTest" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding
queue="queueTest"
key="queueTestKey">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean id="messageReceiver" class="com.lidong.dubbo.core.util.customer.MessageConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container
connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="queueTest"
ref="messageReceiver"/>
</rabbit:listener-container>
</beans>

8.6 测试消息

1
2
3
4
5
6
7
8
9
10
11
@RequestMapping("/sendRabbitMessage")
@ResponseBody
public void sendRabbitMessage(){
logger.info("--------------------");
User user = new User();
user.setName("lidong");
user.setAge(25);
user.setPassword("123456");
user.setId(12);
iMessageProducer.sendMessage(user);
}

注意:本例子采用的direct exchange交换机 。

代码地址

小东子的个人技术专栏

mac 安装消息中间件---ActiveMQ

发表于 2017-01-29   |   字数统计: 1,164(字)   |   阅读时长: 7(分)

一般在mac上安装软件大家都是比较喜欢用brew来安装,今天就用brew来安装ActiveMQ。

1.使用brew来安装 ActiveMQ

1
brew install activemq

看到如下的结果时候,可以很庆幸的告诉你,你已经成功安装了activemq。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
localhost:~ lidong$ brew install activemq
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 1 tap (homebrew/core).
==> New Formulae
bit gobby molecule statik
carrot2 gsmartcontrol opencoarrays tarsnap-gui
cnats gtk-chtheme orc-tools terminator
easy-tag imagemagick@6 source-to-image todoman
geeqie klavaro speexdsp watchexec
gifcap mingw-w64-binutils sqlparse
==> Updated Formulae
abcde gradle opus
adns grafana orientdb
afl-fuzz groonga osc
amazon-ecs-cli grsync osquery
ammonite-repl gsoap packer
ansible gtk+ packetbeat
ansible-cmdb gtk+3 pandoc
ant h2o parallel
antigen hana pazpar2
apktool haproxy pbzip2
app-engine-go-64 harfbuzz pcsc-lite
arangodb heroku pdf2htmlex
aria2 hivemind pdfcrack
armor htmlcleaner pdftoedn
arping httrack pdns
asio hunspell pev
assh hyperscan pgformatter
aubio icarus-verilog pgrouting
autotrace icoutils pius
aws-elasticbeanstalk ievms pkcs11-helper
aws-sdk-cpp imagemagick plantuml
awscli influxdb pngcrush
b2-tools innotop pod2man
beansdb intercal poppler
berkeley-db ios-webkit-debug-proxy ✔ postgrest
bfg iperf3 pre-commit
bib-tool irssi prips
bibutils iso-codes prometheus
bind jack pstoedit
bitlbee javarepl pulseaudio
bitrise jdnssec-tools purescript
blockhash jenkins pushpin
bogofilter jid pwntools
buku jigdo pyenv
cabal-install joe pyqt5
caddy jruby qbs
cadubi kapacitor qcachegrind
caf kawa qemu
carina khal qjackctl
cattle kibana qscintilla2
cdk kobalt quantlib
certbot kotlin rabbitmq
clasp kubernetes-cli rancher-cli
cloc kubernetes-helm rancher-compose
cmake languagetool rancid
coffeescript lastpass-cli ranger
collectd lean-cli ripgrep
commandbox leptonica rocksdb
conan lft rpm
consul-template libass rswift
coturn libcec rtags
cromwell libcouchbase rtv
crystal-lang libdap ruby-build
curlpp libev rust
dar libfabric sbcl
darcs libgcrypt sdb
darkice libgit2 serd
datetime-fortran libgit2-glib sfk
datomic libgosu shadowsocks-libev
dbhash libgphoto2 shmcat
dbt libgtop sip
dbxml liblas snap7
dcmtk libmikmod snort
deis libmill soci
deisctl libming sops
dependency-check libmwaw sord
diff-pdf libosmium sourcekitten
diffoscope libphonenumber speedtest_cli
dirt libpng sphinx-doc
dmd libproxy sqldiff
dnscrypt-proxy libsass sqlite
docker libslax sqlite-analyzer
docker-compose libspectre sshguard
docker-machine libsvm sstp-client
docker-machine-nfs libtasn1 ✔ stern
docker-machine-parallels libtiff stoken
docker-swarm libupnp stormpath-cli
dockward libusb ✔ svtplay-dl
doitlive libvirt swaks
dpkg libvpx swift
dub libxc swiftformat
duplicity libxml2 ✔ swiftgen
dwarfutils lighttpd swiftlint
ecl link-grammar swig
eiffelstudio linkerd syncthing
ejabberd liquigraph synfig
eject lmdb syntaxerl
elasticsearch logentries tbox
elasticsearch@2.4 logstash tcpkali
elixir logtalk telegraf
elixirscript lrdf terraform
emscripten lsyncd terragrunt
etcd ltc-tools thefuck
euca2ools lz4 thrift
extract_url macvim tile38
fabio makeself tin
fdk-aac mariadb tintin
fftw mcabber tippecanoe
filebeat mediaconch tomcat
flatbuffers memcached ✔ transcrypt
flow ✔ memcacheq tty-clock
fluent-bit mercurial ttyd
fontforge metaproxy tvnamer
fonttools metricbeat twarc
fossil micropython typescript
fping ✔ mikutter u-boot-tools
fq minizip udunits
freeswitch mktorrent unittest-cpp
freetds mkvtoolnix unrar
fwup moc unshield
fzf mongo-c-driver vapoursynth
gammu mongodb ✔ vdirsyncer
gcal mongoose vice
gdb mono vim
geckodriver mpd wavpack
geoipupdate mpv webalizer
ghc msgpack weechat
ghostscript mypy whatmp3
ginac neofetch wireguard-tools
git-cola nexus xapian
git-lfs nghttp2 xmlrpc-c
git-subrepo nim xonsh
git-test no-more-secrets xqilla
git-tracker node ✔ xrootd
giter8 node-build xxhash
gitlab-ci-multi-runner node@0.12 xz
gitup node@4 yadm
gitversion node@6 yank
gmime nodeenv yarn
gnu-cobol notmuch yash
gnu-sed nss yaws
gnupg-pkcs11-scd nvc yaz
gnupg2 nvi yle-dl
gnuradio open-cobol you-get
gnutls open-jtalk youtube-dl
go open-mesh zabbix ✔
godep open-ocd zbar
gofabric8 openconnect zeromq
google-java-format opencore-amr zimg
gosu openshift-cli zplug
==> Renamed Formulae
eigen32 -> eigen@3.2 scala210 -> scala@2.10 scala211 -> scala@2.11
==> Deleted Formulae
cpp-netlib dmtx-utils dynamodb-local gcc@6 qtplay
==> Using the sandbox
==> Downloading https://www.apache.org/dyn/closer.cgi?path=/activemq/5.14.3/apac
==> Best Mirror http://mirrors.cnnic.cn/apache/activemq/5.14.3/apache-activemq-5
######################################################################## 100.0%
==> Caveats
To have launchd start activemq now and restart at login:
brew services start activemq
Or, if you don't want/need a background service you can just run:
activemq start
==> Summary
�� /usr/local/Cellar/activemq/5.14.3: 555 files, 59.7M, built in 1 minute 39 seconds
localhost:~ lidong$

2.使用activemq –version来查看安装的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
localhost:~ lidong$ activemq --version
INFO: Loading '/usr/local/Cellar/activemq/5.14.3/libexec//bin/env'
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/bin/java'
Java Runtime: Oracle Corporation 1.8.0_92 /Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre
Heap sizes: current=62976k free=61648k max=932352k
JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/usr/local/Cellar/activemq/5.14.3/libexec//conf/login.config -Dactivemq.classpath=/usr/local/Cellar/activemq/5.14.3/libexec//conf:/usr/local/Cellar/activemq/5.14.3/libexec//../lib/: -Dactivemq.home=/usr/local/Cellar/activemq/5.14.3/libexec/ -Dactivemq.base=/usr/local/Cellar/activemq/5.14.3/libexec/ -Dactivemq.conf=/usr/local/Cellar/activemq/5.14.3/libexec//conf -Dactivemq.data=/usr/local/Cellar/activemq/5.14.3/libexec//data
Extensions classpath:
[/usr/local/Cellar/activemq/5.14.3/libexec/lib,/usr/local/Cellar/activemq/5.14.3/libexec/lib/camel,/usr/local/Cellar/activemq/5.14.3/libexec/lib/optional,/usr/local/Cellar/activemq/5.14.3/libexec/lib/web,/usr/local/Cellar/activemq/5.14.3/libexec/lib/extra]
ACTIVEMQ_HOME: /usr/local/Cellar/activemq/5.14.3/libexec
ACTIVEMQ_BASE: /usr/local/Cellar/activemq/5.14.3/libexec
ACTIVEMQ_CONF: /usr/local/Cellar/activemq/5.14.3/libexec/conf
ACTIVEMQ_DATA: /usr/local/Cellar/activemq/5.14.3/libexec/data
ActiveMQ 5.14.3
For help or more information please see: http://activemq.apache.org

3.activemq常用的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Tasks:
browse - Display selected messages in a specified destination.
bstat - Performs a predefined query that displays useful statistics regarding the specified broker
consumer - Receives messages from the broker
create - Creates a runnable broker instance in the specified path.
decrypt - Decrypts given text
dstat - Performs a predefined query that displays useful tabular statistics regarding the specified destination type
encrypt - Encrypts given text
export - Exports a stopped brokers data files to an archive file
list - Lists all available brokers in the specified JMX context
producer - Sends messages to the broker
purge - Delete selected destination's messages that matches the message selector
query - Display selected broker component's attributes and statistics.
start - Creates and starts a broker using a configuration file, or a broker URI.
stop - Stops a running broker specified by the broker name.
Task Options (Options specific to each task):
--extdir <dir> - Add the jar files in the directory to the classpath.
--version - Display the version information.
-h,-?,--help - Display this help information. To display task specific help, use Main [task] -h,-?,--help

4.启动activeMQ服务

1
activemq start

看到如下信息,就表示已经安装成功

1
2
3
4
5
localhost:~ lidong$ activemq start
INFO: Loading '/usr/local/Cellar/activemq/5.14.3/libexec//bin/env'
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/usr/local/Cellar/ActiveMQ/5.14.3/libexec//data/activemq.pid' (pid '2402')

然后就可以访问管理web console。在浏览器中输入url: http://localhost:8161/

这里写图片描述

点击 Manager ActiveMQ boker 输入用户名:admin 密码admin

这里写图片描述

看到这个页面,就可以到ActiveMQ 启动成功了。

小东子的个人技术专栏

mac 安装RabbitMQ

发表于 2017-01-29   |   字数统计: 516(字)   |   阅读时长: 3(分)

一般在mac上安装软件大家都是比较喜欢用brew来安装,今天就用brew来安装RabbitMQ。详细信息可以查看官网http://www.rabbitmq.com/install-standalone-mac.html

1.使用brew来安装 RabbitMQ

1
brew install rabbitmq

看到如下的代码表示RabbitMQ安装成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
localhost:~ lidong$ brew install rabbitmq
Updating Homebrew...
==> Installing dependencies for rabbitmq: jpeg, libpng, libtiff, wxmac, erlang
==> Installing rabbitmq dependency: jpeg
==> Downloading https://homebrew.bintray.com/bottles/jpeg-8d.sierra.bottle.2.tar
######################################################################## 100.0%
==> Pouring jpeg-8d.sierra.bottle.2.tar.gz
🍺 /usr/local/Cellar/jpeg/8d: 19 files, 708.3K
==> Installing rabbitmq dependency: libpng
==> Downloading https://homebrew.bintray.com/bottles/libpng-1.6.28.sierra.bottle
######################################################################## 100.0%
==> Pouring libpng-1.6.28.sierra.bottle.tar.gz
🍺 /usr/local/Cellar/libpng/1.6.28: 26 files, 1.2M
==> Installing rabbitmq dependency: libtiff
==> Downloading https://homebrew.bintray.com/bottles/libtiff-4.0.7_1.sierra.bott
######################################################################## 100.0%
==> Pouring libtiff-4.0.7_1.sierra.bottle.tar.gz
🍺 /usr/local/Cellar/libtiff/4.0.7_1: 248 files, 3.4M
==> Installing rabbitmq dependency: wxmac
==> Downloading https://homebrew.bintray.com/bottles/wxmac-3.0.2_4.sierra.bottle
######################################################################## 100.0%
==> Pouring wxmac-3.0.2_4.sierra.bottle.tar.gz
🍺 /usr/local/Cellar/wxmac/3.0.2_4: 810 files, 24.6M
==> Installing rabbitmq dependency: erlang
==> Downloading https://homebrew.bintray.com/bottles/erlang-19.2.sierra.bottle.t
######################################################################## 100.0%
==> Pouring erlang-19.2.sierra.bottle.tar.gz
==> Caveats
Man pages can be found in:
/usr/local/opt/erlang/lib/erlang/man
Access them with `erl -man`, or add this directory to MANPATH.
==> Summary
🍺 /usr/local/Cellar/erlang/19.2: 7,310 files, 280.9M
==> Installing rabbitmq
==> Using the sandbox
==> Downloading https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitm
######################################################################## 100.0%
==> /usr/bin/unzip -qq -j /usr/local/Cellar/rabbitmq/3.6.6/plugins/rabbitmq_mana
==> Caveats
Management Plugin enabled by default at http://localhost:15672
Bash completion has been installed to:
/usr/local/etc/bash_completion.d
To have launchd start rabbitmq now and restart at login:
brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
rabbitmq-server
==> Summary
🍺 /usr/local/Cellar/rabbitmq/3.6.6: 188 files, 5.8M, built in 9 minutes 12 seconds

注意: rabbitmq的安装目录: /usr/local/Cellar/rabbitmq/3.6.6

3.RabbitMQ 的启动

进入到 /usr/local/Cellar/rabbitmq/3.6.6,执行

1
localhost:3.6.6 lidong$ sbin/rabbitmq-server

4.RabbitMQ 启动插件

待RabbitMQ 的启动完毕之后,另起终端进入cd /Users/lidong/javaEE/rabbitmq_server-3.6.6/sbin 。启动插件:

1
sudo ./rabbitmq-plugins enable rabbitmq_management(执行一次以后不用再次执行)

5.登陆管理界面

浏览器输入:http://localhost:15672/

这里写图片描述

账号密码初始默认都为guest

这里写图片描述

小东子的个人技术专栏

MySQL配置远程登录

发表于 2017-01-23   |   字数统计: 152(字)   |   阅读时长: 1(分)

1、修改数据表

可能是你的帐号不允许从远程登陆,只能在localhost。这个时候只要在 localhost 的那台电脑,登入MySQL后,更改 “MySQL” 数据库里的 “user” 表里的 “Host” 项,从“localhost”改称“%”,%表示所有机器都允许。

1
2
3
mysql> use mysql
mysql> update user set Host='%' where User='root';

2、授权权限

允许任何主机使用“myuser”账号和“mypwd”密码连接到 MySQL 服务器。

1
mysql> GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypwd' WITH GRANT OPTION;

即可生效。

1
mysql> FLUSH PRIVILEGES;

只要完成以上两个步骤就可以远程访问mysql数据库了。

1…345…10
李东

李东

细节决定成败,点滴铸就辉煌

46 文章
18 标签
© 2017 李东
由 Hexo 强力驱动
主题 - NexT.Pisces