Post

redis 发布订阅模式笔记

redis 发布订阅模式笔记

通常来说,发布订阅(又叫 pub/sub)的特点是订阅者(listener)负责订阅频道(channel),发布者(publisher)负责向频道发送二进制字符串消息(binary string message)。每当有消息被发送给特定的频道,频道的所有订阅者都会收到消息。

命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 订阅一个或者多个频道
SUBSCRIBE
SUBSCRIBE channel [channel ...] 

# 退订给定的一个或者多个频道
# 如果不指定频道名称,则退订所有频道
UNSUBSCRIBE
UNSUBSCRIBE [channel [channel ...]]

# 订阅给定模式的频道
PSUBSCRIBE
PSUBSCRIBE pattern [pattern ...]

# 退订给定模式的频道
# 如果不指定模式,则退订所有频道
PUNSUBSCRIBE 
PUNSUBSCRIBE [pattern [pattern ...]

# 向给定的频道发送消息
PUBLISH
PUBLISH <channel> <message>

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
		
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

Jedis

Jedis中的JedisPubSub抽象类提供了订阅和取消的功能。想处理订阅和取消订阅某些channel的相关事件,我们得扩展JedisPubSub类并实现相关的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {//注意这里继承了抽象类JedisPubSub

    private static final Logger LOGGER = Logger.getLogger(Subscriber.class);

    @Override
    public void onMessage(String channel, String message) {
    	LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message));
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
    	LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s", 
    	    pattern, channel, message));
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onSubscribe");
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onUnsubscribe");
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPUnsubscribe");
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPSubscribe");
    }
}

有了订阅者,我们还需要一个发布者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.demo.redis;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;

public class Publisher {

    private static final Logger LOGGER = Logger.getLogger(Publisher.class);
    private final Jedis publisherJedis;
    private final String channel;

    public Publisher(Jedis publisherJedis, String channel) {
        this.publisherJedis = publisherJedis;
        this.channel = channel;
    }

    /**
     * 不停的读取输入,然后发布到channel上面,遇到quit则停止发布。
     */
    public void startPublish() {
    	LOGGER.info("Type your message (quit for terminate)");
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String line = reader.readLine();
                if (!"quit".equals(line)) {
                    publisherJedis.publish(channel, line);
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            LOGGER.error("IO failure while reading input", e);
        }
    }
}

为简单起见,这个发布者接收控制台的输入,然后将输入的消息发布到指定的channel上面,如果输入quit,则停止发布消息。

接下来是主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class Program {
    
    public static final String CHANNEL_NAME = "MyChannel";
    //我这里的Redis是一个集群,192.168.56.101和192.168.56.102都可以使用
    public static final String REDIS_HOST = "192.168.56.101";
    public static final int REDIS_PORT = 7000;
    
    private final static Logger LOGGER = Logger.getLogger(Program.class);
    private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
    private final static JedisPool JEDIS_POOL = 
            new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);
    
    public static void main(String[] args) throws Exception {
        final Jedis subscriberJedis = JEDIS_POOL.getResource();
        final Jedis publisherJedis = JEDIS_POOL.getResource();
        final Subscriber subscriber = new Subscriber();
        //订阅线程:接收消息
        new Thread(new Runnable() {
            public void run() {
                try {
                    LOGGER.info("Subscribing to \"MyChannel\". This thread will be blocked.");
                    //使用subscriber订阅CHANNEL_NAME上的消息,这一句之后,线程进入订阅模式,阻塞。
                    subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                    
                    //当unsubscribe()方法被调用时,才执行以下代码
                    LOGGER.info("Subscription ended.");
                } catch (Exception e) {
                    LOGGER.error("Subscribing failed.", e);
                }
            }
        }).start();
        
        //主线程:发布消息到CHANNEL_NAME频道上
        new Publisher(publisherJedis, CHANNEL_NAME).startPublish();
        publisherJedis.close();
        
        //Unsubscribe
        subscriber.unsubscribe();
        subscriberJedis.close();
    }
}

参考: https://my.oschina.net/itblog/blog/601284

Spring

config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
 * @author zhangsi
 * @date created in 2018/3/1 15:30
 */
@Configuration
public class RedisConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisConfig.class);

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(redisConnectionFactory);
        template.setKeySerializer(stringSerializer);
        template.setValueSerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public Jedis jedis(RedisProperties redisProperties) {

//        Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis");
//
//        ReflectionUtils.makeAccessible(jedisField);
//
//        Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, redisConnectionFactory.getConnection());

        Jedis jedis = new Jedis(redisProperties.getHost(), redisProperties.getPort());

        if (redisProperties.getPassword() != null) {
            jedis.auth(redisProperties.getPassword());
        }

        return jedis;
    }


    /**
     * 本模块中,创建 jedis 客户端的目的是为了使用原生的 api 来完成上锁和解锁的操作
     * 见 {@code CtrlCommandHelper#tryGetDistributedLock() } 和
     * {@code CtrlCommandHelper.releaseDistributedLock() }方法
     *
     * 因为 RedisTemplate 提供的 api 不足以完成原子操作的同时并返回执行结果
     * spring boot2 后 redis 默认的客户端由 jedis 变为 Lettuce
     * 所以理论上可以不创建 jedis 客户端,所以建议通过从
     * RedisTemplate 中获取 nativeConnection 来获取 lettuce 原生 api 来完成加锁和解锁的操作。
     *
     * @see  cn.futuremove.tsp.vehicle.control.service.CommandHelper#tryGetDistributedLock
     * @see  cn.futuremove.tsp.vehicle.control.service.CommandHelper#releaseDistributedLock
     */
    @Bean
    public JedisPool jedisPool(RedisProperties redisProperties) {


        JedisPool jedisPool = new JedisPool(new GenericObjectPoolConfig(),
                redisProperties.getHost(),
                redisProperties.getPort() == 0 ? Protocol.DEFAULT_PORT : redisProperties.getPort(),
                Protocol.DEFAULT_TIMEOUT,
                redisProperties.getPassword());

        LOGGER.debug("JedisPool : {}", jedisPool);

        return jedisPool;
    }

    @Bean
    @ConditionalOnProperty(value = {"spring.redis.cluster"})
    public JedisCluster jedisCluster(RedisProperties redisProperties) {

        List<String> nodes = redisProperties.getCluster().getNodes();

        Set<HostAndPort> nodeSet = new HashSet<>(nodes.size());

        for (String node : nodes) {

            String[] nodeArr = node.split(":");

            nodeSet.add(new HostAndPort(nodeArr[0], Integer.parseInt(nodeArr[1])));
        }

        JedisCluster jedisCluster = new JedisCluster(nodeSet);

        LOGGER.debug("JedisCluster : {}", jedisCluster);

        return jedisCluster;
    }

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private OfflineNotifyRedisMessageListener offlineNotifyRedisMessageListener;

    @Autowired
    private CommandResultMessageListener commandResultMessageListener;

    /**
     *
     * @return
     */
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler scheduler =
                new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(20);
        //scheduler.setPoolSize(50);
        scheduler.setThreadGroupName("chaos");
        scheduler.setThreadNamePrefix("chaos-");
        return scheduler;
    }

    /**
     * 定义 Redis 的监听容器
     * @return offlineNotifyRedisMessageListener
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {

        RedisMessageListenerContainer container =
                new RedisMessageListenerContainer();

        container.setConnectionFactory(redisConnectionFactory);
        container.setTaskExecutor(threadPoolTaskScheduler());

        Topic appOfflineTopic = new ChannelTopic("app_offline");
        container.addMessageListener(offlineNotifyRedisMessageListener, appOfflineTopic);

        Topic ctrlNotify = new ChannelTopic(RedisKeyType.VEHICLE_CTRL_NOTIFY_CHANNEL_TOPIC);
        container.addMessageListener(commandResultMessageListener, ctrlNotify);

        return container;
    }
}

OfflineNotifyRedisMessageListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * <p>
 * <strong>
 *     一个 Redis 消息监听器
 * </strong><br /><br />
 * 用于监听 Redis 中的消息, 当有用户异地登录, 通知原来登录的用户,
 * 并提示下线.
 *
 *
 * </p>
 *
 * @author chengchao - 2018-12-15 16:43 <br />
 * @see [相关类方法]
 * @since [产品模块版本]
 */
@Component
public class OfflineNotifyRedisMessageListener implements MessageListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OfflineNotifyRedisMessageListener.class);

    @Override
    public void onMessage(Message message, byte[] bytes) {

        String topic = new String(message.getChannel(), StandardCharsets.UTF_8);
        String body = new String(message.getBody(), StandardCharsets.UTF_8);

        if (StringUtils.isAnyBlank(body, topic)) {
            LOGGER.warn("body or topic is null or empty !");
            return;
        }
        LOGGER.debug("topic: {}, message body: {}", topic, body);


        VehicleControlContext.sendOfflineNotify(body);
    }
}


public

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void sendOfflineNotifyMessage(BoundValueOperations<String, String> operations) {
        String userLoginInfos = operations.get();

        if (StringUtils.isBlank(userLoginInfos)) {
           return;
        }
        RedisUserDto redisUserDto;

        try {
            redisUserDto = JSON.parseObject(userLoginInfos, RedisUserDto.class);
            if (Objects.isNull(redisUserDto)) {
                return;
            }
        } catch (Exception e) {
            // 解析失败, 无所谓
            LOGGER.warn("exception : {} -> {}", userLoginInfos, e.getMessage());
            return;
        }

        String token = redisUserDto.getToken();
        if (StringUtils.isBlank(token)) {
            return;
        }

        redisTemplate.convertAndSend("app_offline", redisUserDto.getUser().getId() + ":"+ token);

    }

« EOF »

If you like TeXt, don’t forget to give me a star :star2:.

This post is licensed under CC BY 4.0 by the author.