用户工具

站点工具


java:redission

Stable
Release Version
Release Date JDK Version
compatibility
CompletionStage
support
ProjectReactor version
compatibility
3.7.5 19.07.2018 1.8, 1.9, 1.10+ Yes 3.1.x
2.12.5 19.07.2018 1.6, 1.7, 1.8, 1.9, 1.10, Android No 2.0.8

  • Maven
<!-- JDK 1.8+ compatible -->
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.7.5</version>
</dependency>
<!-- JDK 1.6+ compatible version=2.12.5 -->  
// 默认连接地址 127.0.0.1:6379
RedissonClient redisson = Redisson.create();
// config.useClusterServers支持其他模式
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
  • 订阅分发
RTopic<SomeObject> topic = redisson.getTopic("anyTopic");
topic.addListener(new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// 在其他线程或JVM节点
RTopic<SomeObject> topic = redisson.getTopic("anyTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
  • 分布式锁
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法,Config.lockWatchdogTimeout看门狗会每30秒给锁延长有效期,避免宕机+持久导致死锁
lock.lock();  //lock.unlock();
// 加锁以后10秒钟自动解锁,无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
//Redisson同时还为分布式锁提供了异步执行的相关方法:lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
//联合锁很形象地表明了同时需要多把锁,红锁在大部分节点加锁成功即可,new RedissonRedLock(lock1, lock2, lock3);
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
//读写锁允许同时有多个读取锁,但是最多只能有一个写入锁
RReadWriteLock rwlock = redisson.getLock("anyRWLock");
rwlock.readLock().lock();
rwlock.writeLock().lock();
  • 分布式并发远程调用RPC
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();
// 在调用远程方法以前,应该首先注册远程服务
// 只注册了一个服务端工作者实例,只能同时执行一个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl);
// 注册了12个服务端工作者实例,可以同时执行12个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
// 客户端调用
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);
String result = service.doSomeStuff(1L, "secondParam", new AnyParam());
// 应答回执超时1秒钟,noResult不等待执行结果,noAck无需应答,expectAckWithin(1, TimeUnit.MINUTES)应答超时1分钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();
YourService service = remoteService.get(YourService.class, options);
//远程过程调用也可以采用异步的方式执行
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync { //重复RemoteInterface的接口方法 }
// 异步调用方法,接口MyRemoteInterfaceAsync和MyRemoteInterface的方法签名相同
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam");
// 取消异步调用
future.cancel(true);
// 远程接口的实现,支持cancel取消
public class MyRemoteServiceImpl implements MyRemoteInterface {
   public Long myBusyMethod(Long param1, String param2) {
       for (long i = 0; i < Long.MAX_VALUE; i++) {
           iterations.incrementAndGet();
           if (Thread.currentThread().isInterrupted()) {
                System.out.println("interrupted! " + i);
                return;
           }
       }
   }
}
  • 分布式共享实时对象
@REntity
public class MyLiveObject {
    @RId
    private String name;
    //其他字段,get和set方法
}
RLiveObjectService service = redisson.getLiveObjectService();
MyLiveObject myObject1 = new MyLiveObject();
myObject1.setName("myName");
MyLiveObject myObject1 = service.<MyLiveObject, String>persist(myObject1);
//或者取得一个已经存在的RLO实例,每次获取都是重新构建对象
MyLiveObject myObject1 = service.<MyLiveObject, String>get(MyLiveObject.class, "myName");

Redisson分布式实时对象服务自动将以下普通Java对象转换成与之匹配的Redisson分布式对象RObject

普通Java类 转换后的Redisson类
SortedSet.class RedissonSortedSet.class
Set.class RedissonSet.class
ConcurrentMap.class RedissonMap.class
Map.class RedissonMap.class
BlockingDeque.class RedissonBlockingDeque.class
Deque.class RedissonDeque.class
BlockingQueue.class RedissonBlockingQueue.class
Queue.class RedissonQueue.class
List.class RedissonList.class

  • 分布式执行服务Executor Service
public class CallableTask implements Callable<Long> {
    @RInject
    private RedissonClient redissonClient;
    @Override
    public Long call() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        // map里包含了许多的元素
        for (Integer value : map.values()) {
           if (Thread.currentThread().isInterrupted()) {
                // 任务被取消了
                return null;
           }
           result += value;
        }
        return result;
    }
}
RExecutorService executorService = redisson.getExecutorService("myExecutor");
Future<Long> future = executorService.submit(new CallableTask()); //异步submitAsync,RunnableTask类似
future.cancel(true);  // future.get();
ScheduledFuture<?> future = executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));  //任务计划
executorService.cancelScheduledTask(taskId);  // 取消任务,taskId=future.getTaskId(); 
  • MapReduce
    public class WordMapper implements RCollectionMapper<String, String, Integer> {
        @Override
        public void map(String value, RCollector<String, Integer> collector) {
            String[] words = value.split("[^a-zA-Z]");
            for (String word : words) {
                collector.emit(word, 1);
            }
        }  
    }
    public class WordReducer implements RReducer<String, Integer> {
        @Override
        public Integer reduce(String reducedKey, Iterator<Integer> iter) {
            int sum = 0;
            while (iter.hasNext()) {
               Integer i = (Integer) iter.next();
               sum += i;
            }
            return sum;
        }
    }
    RList<String> list = redisson.getList("myList");
    list.add("Alice was beginning to get very tired"); 
    list.add("of sitting by her sister on the bank and");
    list.add("of having nothing to do once or twice she");
    list.add("had peeped into the book her sister was reading");
    list.add("but it had no pictures or conversations in it");
    list.add("and what is the use of a book");
    list.add("thought Alice without pictures or conversation");
    RCollectionMapReduce<String, String, Integer> mapReduce
             = list.<String, Integer>mapReduce()
                   .mapper(new WordMapper())
                   .reducer(new WordReducer());
    // 统计词频
    Map<String, Integer> mapToNumber = mapReduce.execute();
    // 统计字数
    public class WordCollator implements RCollator<String, Integer, Integer> {
        @Override
        public Integer collate(Map<String, Integer> resultMap) {
            int result = 0;
            for (Integer count : resultMap.values()) {
                result += count;
            }
            return result;
        }
    }    
    Integer totalWordsAmount = mapReduce.execute(new WordCollator());
java/redission.txt · 最后更改: 2018/08/21 22:32 由 admin