- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
redis redisson 集合操作
相关类及接口
Rlist:链表 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public
interface
RList<V>
extends
List<V>, RExpirable, RListAsync<V>, RSortable<List<V>>, RandomAccess {
List<V> get(
int
... var1);
//获取指定的节点值
int
addAfter(V var1, V var2);
//在var1前添加var2
int
addBefore(V var1, V var2);
//在var1后添加var2
void
fastSet(
int
var1, V var2);
//修改var1处的只为var2
List<V> readAll();
//获取链表的所有值
RList<V> subList(
int
var1,
int
var2);
//获取var1到var2的子链表
List<V> range(
int
var1);
//返回var1往后的链表
List<V> range(
int
var1,
int
var2);
//返回var1到var2的链表
void
trim(
int
var1,
int
var2);
//保留var1到var2处的链表,其余删除
void
fastRemove(
int
var1);
//删除var1处的值
boolean
remove(Object var1,
int
var2);
//判断元素是否删除
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
//mapreduce操作
}
|
RSet:无序集合 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public
interface
RSet<V>
extends
Set<V>, RExpirable, RSetAsync<V>, RSortable<Set<V>> {
V removeRandom();
Set<V> removeRandom(
int
var1);
//删除对象
V random();
Set<V> random(
int
var1);
//随机返回对象
boolean
move(String var1, V var2);
//判断集合var1中是否存在var2,类似contains()方法
Set<V> readAll();
//获取所有对象
int
union(String... var1);
//集合并集对象个数
Set<V> readUnion(String... var1);
//集合并集
int
diff(String... var1);
//集合差集对象个数
Set<V> readDiff(String... var1);
//集合差集
int
intersection(String... var1);
//集合交集的对象个数
Set<V> readIntersection(String... var1);
//集合交集
Iterator<V> iterator(
int
var1);
Iterator<V> iterator(String var1,
int
var2);
Iterator<V> iterator(String var1);
//遍历集合
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
RSemaphore getSemaphore(V var1);
RCountDownLatch getCountDownLatch(V var1);
RPermitExpirableSemaphore getPermitExpirableSemaphore(V var1);
//信号量
RLock getLock(V var1);
RLock getFairLock(V var1);
RReadWriteLock getReadWriteLock(V var1);
//锁操作
Stream<V> stream(
int
var1);
Stream<V> stream(String var1,
int
var2);
Stream<V> stream(String var1);
//流操作
}
|
RMap:键值对 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public
interface
RMap<K, V>
extends
ConcurrentMap<K, V>, RExpirable, RMapAsync<K, V> {
void
loadAll(
boolean
var1,
int
var2);
void
loadAll(Set<?
extends
K> var1,
boolean
var2,
int
var3);
V get(Object var1);
//获取var1的值
V put(K var1, V var2);
//添加对象
V putIfAbsent(K var1, V var2);
//对象不存在则设置
V replace(K var1, V var2);
//替换对象
boolean
replace(K var1, V var2, V var3);
//替换对象
V remove(Object var1);
//移除对象
boolean
remove(Object var1, Object var2);
//移除对象
void
putAll(Map<?
extends
K, ?
extends
V> var1);
void
putAll(Map<?
extends
K, ?
extends
V> var1,
int
var2);
//添加对象
Map<K, V> getAll(Set<K> var1);
//获取key在集合var1中的键值对
int
valueSize(K var1);
//key为var1的value大小
V addAndGet(K var1, Number var2);
//key为var1的value加var2
long
fastRemove(K... var1);
//移除对象
boolean
fastPut(K var1, V var2);
//添加对象
boolean
fastReplace(K var1, V var2);
//替换key为var1的值为var2
boolean
fastPutIfAbsent(K var1, V var2);
//如果不存在则设置
Set<K> readAllKeySet();
//获取所有key,以set形式返回
Collection<V> readAllValues();
//获取所有value,以collection返回
Set<Entry<K, V>> readAllEntrySet();
//遍历键值对
Map<K, V> readAllMap();
//集合形式转换为map类型
Set<K> keySet();
Set<K> keySet(
int
var1);
Set<K> keySet(String var1,
int
var2);
Set<K> keySet(String var1);
//获取key集合
Collection<V> values();
Collection<V> values(String var1);
Collection<V> values(String var1,
int
var2);
Collection<V> values(
int
var1);
//获取所有value
Set<Entry<K, V>> entrySet();
Set<Entry<K, V>> entrySet(String var1);
Set<Entry<K, V>> entrySet(String var1,
int
var2);
Set<Entry<K, V>> entrySet(
int
var1);
//遍历键值对
<KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce();
RSemaphore getSemaphore(K var1);
RCountDownLatch getCountDownLatch(K var1);
RPermitExpirableSemaphore getPermitExpirableSemaphore(K var1);
//信号量操作
RLock getLock(K var1);
RLock getFairLock(K var1);
RReadWriteLock getReadWriteLock(K var1);
//锁操作
}
|
使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public
class
MyTest {
public
static
void
main(String[] args){
Config config=
new
Config();
config.useSingleServer().setAddress(
"redis://******:6379"
).setPassword(
"123456"
);
RedissonClient client= Redisson.create(config);
RList<String> list=client.getList(
"list"
);
for
(
int
i=
0
;i<
10
;i++){
list.add(
"瓜田李下 "
+i);
}
list.readAll().forEach(System.out::println);
System.out.println(
"list的数量为:"
+list.size()+
"\n"
);
RSet<String> set=client.getSet(
"set"
);
for
(
int
i=
0
;i<
10
;i++){
set.add(
"瓜田李下 "
+i);
}
for
(String s : set) {
System.out.println(s);
}
System.out.println(
"set的大小为:"
+set.size()+
"\n"
);
RMap<Integer,String> map=client.getMap(
"map"
);
for
(
int
i=
0
;i<
10
;i++){
map.put(i,
"瓜田李下 "
+i);
}
for
(Map.Entry<Integer,String> entry:map.entrySet()){
System.out.println(entry.getKey()+
" ==> "
+entry.getValue());
}
System.out.println(
"map的大小为:"
+map.size());
}
}
|
控制台输出 。
瓜田李下 0 瓜田李下 1 瓜田李下 2 瓜田李下 3 瓜田李下 4 瓜田李下 5 瓜田李下 6 瓜田李下 7 瓜田李下 8 瓜田李下 9 list的数量为:10 瓜田李下 0 瓜田李下 1 瓜田李下 7 瓜田李下 3 瓜田李下 5 瓜田李下 4 瓜田李下 9 瓜田李下 8 瓜田李下 6 瓜田李下 2 set的大小为:10 0 ==> 瓜田李下 0 1 ==> 瓜田李下 1 2 ==> 瓜田李下 2 3 ==> 瓜田李下 3 4 ==> 瓜田李下 4 5 ==> 瓜田李下 5 6 ==> 瓜田李下 6 7 ==> 瓜田李下 7 8 ==> 瓜田李下 8 9 ==> 瓜田李下 9 map的大小为:10 。
Redisson使用注意事项
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格,相较于暴露底层操作的Jedis,Redisson提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务.
特性 & 功能:
- 支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式
- 程序接口调用方式采用异步执行和异步流执行两种方式
- 数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
- 单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能
- 提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
- 提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
- 分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等
- 提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)
- 更多特性和功能,请关注官网:http://redisson.org
实现原理
redis本身是不支持上述的分布式对象和集合,Redisson是通过利用redis的特性在客户端实现了高级数据结构和特性,例如优先队列的实现,是通过客户端排序整理后再存入redis.
客户端实现,意味着当没有任何客户端在线时,这些所有的数据结构和特性都不会保留,也不会自动生效,例如过期事件的触发或原来优先队列的元素增加.
注意事项
实时性
RMap中有一个功能是可以设置键值对的过期时间的,并可以注册键值对的事件监听器 。
- 元素淘汰功能(Eviction)
- Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。
- 目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
正如官方wiki所述,这个功能是通过后台线程定时去清理的, 所以这个是非实时的(issue-1234:on expired event is not executed in real-time.),延迟在5秒到2小时之间,因此对实时性要求比较高的场景就得自己衡量了.
由于过期时间的非实时性,所以导致过期事件的发生也是非实时的,相应的监听器可能会延迟了一会儿才收到通知,在我的测试中,ttl设置在秒级误差是比较大的,分钟级别的ttl倒还好(左侧设置值,右侧实际耗时):
1s _ 5s 3s _ 5s 4s _ 5s 5s _ 9s 6s _ 10s 10s _ 15s 1m _ 1m11s 。
序列化
由Redisson默认的编码器为JsonJacksonCodec,JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环.
在我的情况中,我序列化了一个service,这个service已被spring托管,而且和另一个service之间也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常.
为了序列化后的内容可见,所以不用redission其他自带的二进制编码器,自行实现编码器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import
com.alibaba.fastjson.JSON;
import
com.alibaba.fastjson.serializer.SerializerFeature;
import
io.netty.buffer.ByteBuf;
import
io.netty.buffer.ByteBufAllocator;
import
io.netty.buffer.ByteBufInputStream;
import
io.netty.buffer.ByteBufOutputStream;
import
org.redisson.client.codec.BaseCodec;
import
org.redisson.client.protocol.Decoder;
import
org.redisson.client.protocol.Encoder;
import
java.io.IOException;
public
class
FastjsonCodec
extends
BaseCodec {
private
final
Encoder encoder = in -> {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try
{
ByteBufOutputStream os =
new
ByteBufOutputStream(out);
JSON.writeJSONString(os, in,SerializerFeature.WriteClassName);
return
os.buffer();
}
catch
(IOException e) {
out.release();
throw
e;
}
catch
(Exception e) {
out.release();
throw
new
IOException(e);
}
};
private
final
Decoder<Object> decoder = (buf, state) ->
JSON.parseObject(
new
ByteBufInputStream(buf), Object.
class
);
@Override
public
Decoder<Object> getValueDecoder() {
return
decoder;
}
@Override
public
Encoder getValueEncoder() {
return
encoder;
}
}
|
订阅发布
Redisson对订阅发布的封装是RTopic,这也是Redisson中很多事件监听的实现原理(例如键值对的事件监听).
使用单元测试时发现,在事件发布后,订阅方需要延时一下才能收到事件。具体原因待查 。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持我.
我是一名优秀的程序员,十分优秀!