「工程实践」Spring Boot - 使用RocketMQ实战样例

通过rocketmq-spring-boot-starter可以快速的搭建RocketMQ生产者和消费者服务。

  1. pom.xml引入组件rocketmq-spring-boot-starter依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
  1. 修改application.yml,添加RocketMQ相关配置
# 多个name-server(集群)使用英文;分割
rocketmq:
  name-server: 192.168.2.100:9876
  producer:
    group: test-group
  1. 发送消息与消费消息

使用RocketMQTemplate实现消息的发送; 使用实现RocketMQListener接口,并添加@RocketMQMessageListener注解,声明消费主题,消费者分组等,且默认消费模式是集群消费。

1. 普通消息

发送消息测试接口:http://localhost:8080/send/common

「工程实践」RocketMQ安装并整合SpringBoot

RocketMQ 是阿里巴巴团队使用Java语言开发的一个分布式、队列模型的消息中间件,后开源给Apache基金会成为了Apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

RocketMQ 主要由ProducerBrokerConsumerNameServer组成;其中Producer负责生产消息;Consumer负责消费消息;BrokerMQ服务,负责接收、分发消息;NameServer是路由中心,负责MQ服务之间的协调。

1. Centos安装RocketMQ

  1. 官网下载RocketMQ安装包
# 进入自定义软件安装目录
cd /mnt/newdatadrive/apps
# wget下载RocketMQ安装包
wget -c "https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip"
  1. 解压安装(环境基于JDK1.8或以上)
# 解压
unzip rocketmq-all-4.8.0-bin-release.zip
# 重命名为rocketmq
mv rocketmq-all-4.8.0-bin-release rocketmq
# 进入安装目录
cd rocketmq
  1. 修改配置
# RocketMQ的默认内存占用非常高,是4×4g的,将4g调整为512m
# 编辑runserver.sh
vim bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 编辑runbroker.sh
vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
  1. 配置RocketMQ的环境变量
# 编辑/etc/profile
vim /etc/profile
# 添加:
export ROCKETMQ_HOME=/mnt/newdatadrive/apps/rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
# 使rocketmq的配置生效
source /etc/profile
  1. 启动RockerMQ顺序
# 先启动 NameServer,然后启动 Broker
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &
  1. 关闭RockerMQ顺序
# 先关闭Broker,再关闭NameServer
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
  1. 启动日志
# 查看 Name Server 启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
# 查看 Broker Server 启动日志
tail -f ~/logs/rocketmqlogs/broker.log

# 若出现如下报错
file doesn't exist on this path: /root/store/commitlog
file doesn't exist on this path: /root/store/consumequeue
# 对应创建即可:
cd ~/store
mkdir commitlog consumequeue
  1. 防火墙
    • 若外网IP调试,关闭防火墙 或 开放防火墙端口9876,10911
# NameServer默认端口:9876
firewall-cmd --add-port=9876/tcp --permanent
# Broker对外服务的监听端口
firewall-cmd --add-port=10911/tcp --permanent
# 更新防火墙规则
firewall-cmd --reload

2. SpringBoot 整合 RocketMQ

  1. pom.xml引入组件rocketmq-spring-boot-starter依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
  1. 修改application.yml,添加RocketMQ相关配置
# 多个name-server(集群)使用英文;分割
rocketmq:
  name-server: 192.168.2.100:9876
  producer:
    group: test-group
  1. 消息生产者
@Component
public class MessageProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 生产者发送消息
     * @param topic   主题
     * @param message 消息体
     */
    public void sendMessage(String topic, String message){
        this.rocketMQTemplate.convertAndSend(topic, message);
    }
}
  1. 消费者
/**
 * 消费者监听消息
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "test-topic",          // 指定topic
        consumerGroup = "test-group",  // 指定消费组
        selectorExpression = "*"
)
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("收到的消息是: {}", message);
    }
}
  1. 测试类
@Slf4j
@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private MessageProducer messageProducer;

    @Test
    void testMQ() {
        String message = "Hello RocketMQ!";
        messageProducer.sendMessage("test-topic",message);
        log.info("生产一条消息:" + message);
    }
}
  1. 运行结果
2021-06-10 14:56:25.180  INFO 17720 --- [           main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='test-group', nameServer='192.168.2.100:9876', topic='test-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
2021-06-10 14:56:25.188  INFO 17720 --- [           main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:messageConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
2021-06-10 14:56:25.230  INFO 17720 --- [           main] c.e.fastdfsdemo.DemoApplicationTests     : Started DemoApplicationTests in 10.371 seconds (JVM running for 13.888)
2021-06-10 14:56:26.410  INFO 17720 --- [           main] c.e.fastdfsdemo.DemoApplicationTests     : 生产一条消息:Hello RocketMQ!
2021-06-10 14:56:26.426  INFO 17720 --- [MessageThread_1] c.e.f.rocketmq.MessageConsumer           : 收到的消息是: Hello RocketMQ!
2021-06-10 14:56:26.496  INFO 17720 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[192.168.2.100:10911] result: true
2021-06-10 14:56:26.496  INFO 17720 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[192.168.2.100:9876] result: true

「踩坑记录」记一次生产事故引发的FastDFS图片迁移

  • 事件经过:由于前端同学不小心把上传图片服务器地址写死了测试域名(指向测试服务器),然后项目上到正式环境,一段时间后,发现用户发布商品时的商品详情富文本中的图片全部指向测试图片服务器域名,然后图片又太多了,手动逐条修复数据不太现实。

  • 解决思路

    • 从数据库中查询商品详情富文本,分析出所有测试域名图片的Url地址;
    • 通过Url下载图片到本地服务器并保持图片存放路径与图片文件名和原本一致;
    • 通过rsync远程同步命令同步到正式服务器;
    • 修改数据库中商品图片Url地址指向正式服务器域名(因为路径和文件名与测试服务器一致,只需替换域名即可)。
  1. 引入pom.xml依赖
<dependency>
    <groupId>org.jsoup</groupId>
    <artifactId>jsoup</artifactId>
    <version>1.11.2</version>
</dependency>
  1. 提供出临时接口以备调用同步图片
/**
 * 提供出临时接口以备调用修复
 */
@PostMapping("/product/img/repair")
public void repairProductImg() {
    // 获取带有测试域名图片的富文本列表
    List<String> infoList = productDao.getProductInfo();
    // 下载图片到本地服务器
    infoList.forEach(RepairImgUtil::saveProductImg);
}
<!-- 这里测试图片服务器域名为:img-test.abc.com -->
<select id="getProductInfo" resultType="java.lang.String">
      select detail from productInfo where detail like '%img-test.abc.com%'
</select>
@Slf4j
public class RepairImgUtil {
    /**
     * 保存图片到本地服务器
     * @param textBody 富文本
     */
    public static void saveProductImg(String textBody) {
        // 解析富文本
        Element doc = Jsoup.parseBodyFragment(textBody).body();
        Elements images = doc.select("img[src]");
        List<String> srcList = new ArrayList<>();
        for (Element element : images) {
            String imgUrl = element.attr("src");
            // 筛选测试服务器的图片路径
            if (imgUrl.indexOf("img-test.abc.com") > 0) {
                srcList.add(imgUrl);
            }
        }
        // 本地存放路径
        String basePath = "/apps/fdfs/storage/data/";
        srcList.forEach(img -> {
            try {
                // 下载图片,并根据路径规律保持原有路径
                RepairImgUtil.downloadImage(img, img.substring(45), basePath+img.substring(39, 45));
            } catch (Exception e) {
                log.error("try-catch:",e);
            }
        });
    }

    /**
     * 下载图片
     *
     * @param urlString 图片链接
     * @param filename  图片名称
     * @param savePath  保存路径
     */
    private static void downloadImage(String urlString, String filename, String savePath) throws Exception {
        // 构造URL打开连接,并设置输入流与缓冲
        URL url = new URL(urlString);
        URLConnection con = url.openConnection();
        con.setConnectTimeout(5*1000);
        InputStream is = con.getInputStream();
        byte[] bs = new byte[1024];
        // 读取到的数据长度
        int len;
        // 输出的文件流
        File sf=new File(savePath);
        if(!sf.exists()){
           sf.mkdirs();
        }
        OutputStream os = new FileOutputStream(sf.getPath()+"/"+filename);
        // 开始读取
        while ((len = is.read(bs)) != -1) {
            os.write(bs, 0, len);
        }
        os.close();
        is.close();
        // 打印图片链接
        log.info(urlString);
    }
}
  1. 调用接口后,图片会先备份到本地服务器,通过rsync远程同步命令同步到正式服务器
# 登录远程服务器,进入图片服务器目录
cd /apps/fdfs
# 远程登陆下载图片的服务器并同步数据
rsync -avz myuser@119.29.36.15:/apps/fdfs/storage  ./

# 根据提示,确认连接输入 yes,输入本地服务器(下载图片的服务器)用户密码后开始同步

The authenticity of host '119.29.36.15 (119.29.36.15)' can't be established.
ECDSA key fingerprint is SHA256:5m4KgPF0QgBO1xE7Tz1RT7U/tfCue+QBE/t4zEDEDJQ.
Are you sure you want to continue connecting (yes/no/[fingerprint])? yes
Warning: Permanently added '119.29.36.15' (ECDSA) to the list of known hosts.
myuser@119.29.36.15's password: 
receiving incremental file list
storage/
storage/data/
storage/data/03/
storage/data/03/08/
storage/data/03/08/Cmgy61-BU3GANDoCAAGOnj-x-ws715.jpg
storage/data/03/08/Cmgy61-BU8SAeTvqAAGOnj-x-ws868.jpg
storage/data/03/08/Cmgy61-BVn-AdXoKAA2Msh3VVsk076.jpg
...
storage/data/03/3C/Cmgy62CwSTOARHdWABLV4kLKua4925.png

sent 15,786 bytes  received 422,224,711 bytes  10,425,691.28 bytes/sec
total size is 448,804,964  speedup is 1.06
  1. 把测试域名换成正式域名访问图片成功!最后修改数据库商品详情:
update productInfo set detail = REPLACE(detail, 'img-test.abc.com%'', 'img.abc.com%'') where detail like like '%img-test.abc.com%'

「工程实践」Spring Boot + Thymeleaf页面静态化实现

页面静态化是指把动态生成的HTML页面变为静态文件保存,当请求到来,直接访问静态文件,而不需要经过项目服务器的渲染。

1. 配置Nginx代理静态页面

location / {
    root D:/temp/static;                  # 自定义静态文件存放根目录
    set $www_temp_path $request_filename; # 设置请求的文件名到临时变量
    if ($uri = '/') {                     # 若为根目录则加上/index.html
        set $www_temp_path $request_filename/index.html;
    }
    if (!-f $www_temp_path) {             # 若请求的文件不存在,就反向代理服务器的渲染
        proxy_pass http://127.0.0.1:8080;
    }
    # 其他配置...
}

然后重启Nginx

「踩坑记录」Redis故障处理-持久化时内存不足

问题描述

# Java错误日志:
redis.clients.jedis.exceptions.JedisDataException: MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.

# Redis错误日志:
Can't save in background: fork: Resource temporaily unavailable
# 或
Can’t save in background: fork: Cannot allocate memory

Redis在个默认情况下,如果在RDB snapshots持久化过程中出现问题,Redis不允许用户进行任何更新操作;即:stop-writes-on-bgsave-error yes

「工程实践」Spring Boot - Java接口幂等性设计

幂等性:多次调用方法或者接口不会改变业务状态,可以保证重复调用的结果和单次调用的结果一致。 selectdelete操作具有天然幂等性:select多次结果总是一致,delete第一次执行后继续再执行也不会对数据有影响; 一般没有幂等性而出现异常的操作:insert操作,update操作,混合类型操作(同时包含增删改等)。

1. 使用幂等的场景

  1. 前端重复提交:前端瞬时点击多次造成表单重复提交;
  2. 接口超时重试:接口可能会因为某些原因而调用失败,出于容错性考虑会加上失败重试的机制。如果接口调用一半,再次调用就会因为脏数据的存在而出现异常。
  3. 消息重复消费:在使用消息中间件来处理消息队列,且手动ack确认消息被正常消费时。如果消费者突然断开连接,那么已经执行了一半的消息会重新放回队列。被其他消费者重新消费时就会导致结果异常,如数据库重复数据,数据库数据冲突,资源重复等。
  4. 请求重发:网络抖动引发的nginx重发请求,造成重复调用;

2. 幂等性设计

  1. update操作
    • 根据唯一业务id去更新数据。
    • 使用乐观锁(增加版本号或修改时间字段)。
  2. insert操作
    • 若该操作具有唯一业务号,则可通过数据库层面的唯一/联合唯一索引来限制重复数据;或通过分布式锁来保证接口幂等性。
    • 若该操作没有唯一业务号,可以使用Token机制,保证幂等性。
  3. 混合操作(一个接口包含多种操作)
    • 使用Token机制,或使用Token + 分布式锁的方案来解决幂等性问题。

3. 解决方案

3.1 Token机制实现

通过Token 机制实现接口的幂等性,这是一种比较通用性的实现方法。

0%