高级Redis之Stream的用法示例

news/2024/7/7 1:27:46 标签: redis, 数据库

不想自己搭建一个mq怎么办?Redis的Stream 来帮你,Redis Stream 是 Redis 5.0 引入的一种新的数据结构,用于处理实时的、可持久化的、基于时间序列的数据流。它非常适合处理事件流、日志、消息队列等场景。下面是一个使用 Redis Stream 的具体应用场景:简单的消息队列系统。

应用场景:实时消息队列

假设你正在构建一个实时消息通知系统,多个服务需要向某个队列写入消息,多个消费者服务需要从这个队列中读取消息执行相应操作。这个消息队列需要有高性能和高可用性,并且能够应对突发流量。

以下是如何使用 Redis Stream 实现完成订单后通知会员服务加积分这个应用场景的步骤:

步骤 1: 添加必要的依赖

在你的 pom.xml 文件中添加 LettuceSpring Data Redis 依赖:

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Data Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!-- Lettuce dependency for Redis interaction -->
    <dependency>
        <groupId>io.lettuce.core</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>6.1.5</version>
    </dependency>
</dependencies>

步骤 2: 配置 Redis 连接

在你的 application.propertiesapplication.yml 文件中配置 Redis 连接:

spring:
  redis:
    host: localhost
    port: 6379

步骤 3: 创建订单服务 (生产者)

订单服务在订单完成后将订单信息写入 Redis Stream。可以使用 Lettuce 库来与 Redis 进行交互。

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class OrderService {

    private static final String STREAM_KEY = "order_stream";
    private RedisClient redisClient;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;

    public OrderService() {
        this.redisClient = RedisClient.create("redis://localhost:6379");
        this.connection = redisClient.connect();
        this.commands = connection.sync();
    }

    public void completeOrder(String orderId, String userId, int points) {
        Map<String, String> orderData = new HashMap<>();
        orderData.put("orderId", orderId);
        orderData.put("userId", userId);
        orderData.put("points", String.valueOf(points));

        String messageId = commands.xadd(STREAM_KEY, orderData);
        System.out.println("Order completed with messageId: " + messageId);
    }

    public void close() {
        connection.close();
        redisClient.shutdown();
    }
}

步骤 4: 创建会员服务 (消费者)

会员服务从 Redis Stream 中读取消息,并处理用户积分的增加。

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.StreamMessage;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

@Service
public class MemberService {

    private static final String STREAM_KEY = "order_stream";
    private static final String CONSUMER_GROUP = "member_group";
    private static final String CONSUMER_NAME = "member_service";

    private RedisClient redisClient;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;

    public MemberService() {
        this.redisClient = RedisClient.create("redis://localhost:6379");
        this.connection = redisClient.connect();
        this.commands = connection.sync();
        
        // 创建消费组
        try {
            commands.xgroupCreate(STREAM_KEY, CONSUMER_GROUP, io.lettuce.core.StreamOffset.from("0"), true);
        } catch (Exception e) {
            System.out.println("Consumer group already exists");
        }
    }

    public void consumeMessages() {
        while (true) {
            List<StreamMessage<String, String>> messages = commands.xreadgroup(
                    io.lettuce.core.Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
                    io.lettuce.core.XReadArgs.StreamOffset.lastConsumed(STREAM_KEY)
            );

            for (StreamMessage<String, String> message : messages) {
                Map<String, String> body = message.getBody();
                String orderId = body.get("orderId");
                String userId = body.get("userId");
                int points = Integer.parseInt(body.get("points"));
                
                // 处理用户积分增加逻辑
                System.out.println("Processing order: " + orderId + " for user: " + userId + ", adding points: " + points);

                // 确认处理完成
                commands.xack(STREAM_KEY, CONSUMER_GROUP, message.getId());
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    public void close() {
        connection.close();
        redisClient.shutdown();
    }
}

步骤 5: 调整 Spring Boot 启动类

在 Spring Boot 启动类中启动订单服务和会员服务,演示消息的生产和消费:

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RedisStreamDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisStreamDemoApplication.class, args);
    }

    @Bean
    public CommandLineRunner demo(OrderService orderService, MemberService memberService) {
        return args -> {
            // 模拟订单完成
            orderService.completeOrder("order123", "user1", 100);

            // 启动会员服务,处理消息
            new Thread(() -> memberService.consumeMessages()).start();

            // 等待一段时间,确保消息处理完成
            Thread.sleep(5000);

            orderService.close();
            memberService.close();
        };
    }
}

6. 优点

使用 Redis Stream 实现消息队列有以下几个优点:

  1. 高性能:Redis Stream 提供了高性能的读写操作,适用于高吞吐量的场景。
  2. 持久化:Redis Stream 支持数据持久化,不会因为 Redis 重启而丢失数据。
  3. 消费组:支持创建消费者组,多消费者可以协同工作,提高消费效率。
  4. 自动化管理:Redis 可以自动管理消息的 ID、时间戳等,简化开发。

7. 缺点

  • 内存占用:Redis 是内存数据库,若消息量过大,可能会占用大量内存。
  • 学习曲线:Stream API 的使用相对于其他简单数据结构较为复杂,需要一定的学习成本。

总结

通过上述示例,我们展示了如何使用 Redis Stream 实现一个简单的消息队列系统,包括生产者发布消息、消费者读取消息和处理以及消费组的管理。Redis Stream 的高性能、持久化和自动管理特性使其非常适合处理实时数据流、消息队列等场景。希望这个示例能够帮助你更好地理解如何使用 Redis Stream 应对实际开发中的问题。


http://www.niftyadmin.cn/n/5534897.html

相关文章

CSS-position/transform

1 需求 2 语法 在CSS中&#xff0c;positioning 和 transform 是两个非常重要的概念&#xff0c;它们分别用于控制元素在页面上的布局和变换。 Positioning CSS中的position属性用于设置元素的定位类型。它有几个值&#xff0c;包括&#xff1a; static&#xff1a;这是默认…

【入门】5分钟了解卷积神经网络CNN是什么

本文来自《老饼讲解-BP神经网络》https://www.bbbdata.com/ 目录 一、卷积神经网络的结构1.1.卷积与池化的作用2.2.全连接层的作用 二、卷积神经网络的运算2.1.卷积层的运算2.2.池化的运算2.3.全连接层运算 三、pytorch实现一个CNN例子3.1.模型的搭建3.2.CNN完整训练代码 CNN神…

DFS,BFS最短路,树与图的深度/广度优先遍历,拓扑排序

DFS 例题&#xff1a;排列数字 在排列组合问题中&#xff0c;每个位置需要尝试多个不同的数字组合&#xff0c;需要回溯以尝试不同的可能性。因此&#xff0c;需要显式地恢复现场&#xff08;撤销标记&#xff09;&#xff0c;以确保每个可能的路径都被探索。 #include <b…

【3分钟准备前端面试】vue3

目录 Vue3比vue2有什么优势vue3升级了哪些重要功能生命周期变化Options APIComposition APIreftoRef和toRefstoReftoRefsHooks (代码复用)Vue3 script setupsetupdefineProps和defineEmitsdefineExposeVue3比vue2有什么优势 性能更好体积更小更好的TS支持更好的代码组织更好的逻…

2024.06.27 校招 实习 内推 面经

绿*泡*泡VX&#xff1a; neituijunsir 交流*裙 &#xff0c;内推/实习/校招汇总表格 1、提前批 | 禾赛科技2025届校招/提前批招聘 提前批 | 禾赛科技2025届校招提前批招聘 2、提前批 | CVTE2025校园招聘/提前批正式启动&#xff08;内推&#xff09; 提前批 | CVTE2025校园…

RPC远程过程调用--Thrift

RPC远程过程调用–Thrift 简介 Thrift是一个由Facebook开发的轻量级、跨语言的远程服务调用框架&#xff0c;后进入Apache开源项目。支持通过自身接口定义语言IDL定义RPC接口和数据类型&#xff0c;然后通过编译器生成不同语言代码&#xff0c;用于构建抽象易用、可互操作的R…

Python 学习之常用第三方库(五)

Python 常用第三方库 Python 是一门功能强大的编程语言&#xff0c;其生态系统中包含了许多优秀的第三方库&#xff0c;这些库极大地扩展了 Python 的功能。以下是一些常用的 Python 第三方库&#xff1a; 1. NumPy&#xff1a; a. 用于数值计算的库&#xff0c;提供了大量的…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 英文单词联想(100分) - 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 📎在线评测链接 https://app5938.acapp.acwing.com.cn/contest/2/problem/OD…