`
xiaobian
  • 浏览: 579194 次
  • 来自: 北京
社区版块
存档分类
最新评论

Java对象缓存系统的实现,实现了LRU算法,并可以进行集群同步

    博客分类:
  • Java
阅读更多

LRU算法实现:

package com.javaeye.xiaobian.jgroups;

import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LRUCache<K, V> extends LinkedHashMap<K, V> {
	private final int maxCapacity;

	private static final float DEFAULT_LOAD_FACTOR = 0.75f;

	private final Lock lock = new ReentrantLock();

	public LRUCache(int maxCapacity) {
		super(maxCapacity, DEFAULT_LOAD_FACTOR, true);
		this.maxCapacity = maxCapacity;
	}

	@Override
	protected boolean removeEldestEntry(java.util.Map.Entry<K, V> eldest) {
		return size() > maxCapacity;
	}

	/**
	 * Weather contains someone Object according to a key
	 * 
	 * @param key
	 */
	@Override
	public boolean containsKey(Object key) {
		lock.lock();
		try {
			return super.containsKey(key);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Get one Object according to a key
	 * 
	 * ��ȡ@param key
	 */
	@Override
	public V get(Object key) {
		lock.lock();
		try {
			return super.get(key);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Put an Object to cache
	 * 
	 * @param key,valueӻ���
	 */
	@Override
	public V put(K key, V value) {
		lock.lock();
		try {
			return super.put(key, value);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * The cache's size
	 */
	public int size() {
		lock.lock();
		try {
			return super.size();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * clear the cache
	 * 
	 */
	public void clear() {
		lock.lock();
		try {
			super.clear();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Delete one Object from cache
	 * 
	 * @param key
	 */
	public void delete(Object key) {
		lock.lock();
		try {
			super.remove(key);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Delete some Objects from cache
	 * 
	 * @param objArrs
	 */
	public void batchDelete(Object[] objArrs) {
		lock.lock();
		try {
			for (int i = 0; i < objArrs.length; i++) {
				delete(objArrs[i]);
			}
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Get all Objects from cache
	 * 
	 * @return Set
	 */
	public Set getAll() {
		lock.lock();
		try {
			return this.entrySet();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Update one Object in the Cache
	 * 
	 * @param key
	 * @param value
	 */
	public void update(K key, V value) {
		lock.lock();
		try {
			super.remove(key);
			this.put(key, value);
		} finally {
			lock.unlock();
		}
	}

}

 

 

利用Jgroups实现集群同步:

 

package com.javaeye.xiaobian.jgroups;

import java.io.Serializable;

import org.jgroups.Channel;
import org.jgroups.ExtendedReceiverAdapter;
import org.jgroups.JChannel;
import org.jgroups.Message;

public class CacheSynchronizer {

	private static String protocolStackString = "UDP(mcast_addr=236.11.11.11;mcast_port=32767;ip_ttl=64;"
			+ "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):"
			+ "PING(timeout=2000;num_initial_members=3):"
			+ "MERGE2(min_interval=5000;max_interval=10000):"
			+ "FD_SOCK:"
			+ "VERIFY_SUSPECT(timeout=1500):"
			+ "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
			+ "pbcast.STABLE(desired_avg_gossip=20000):"
			+ "UNICAST(timeout=2500,5000):"
			+ "FRAG:"
			+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false)";
	private static String groupName = "CACHEGROUP";

	private Channel channel = null;

	private ReceiveCallback recvCallback = null;

	// inner class ,receive the broadcast and handle
	private class ReceiveCallback extends ExtendedReceiverAdapter {
		private Cache cache = null;

		public void setCache(Cache baseCache) {
			cache = baseCache;
		}

		public void receive(Message msg) {
			if (cache == null)
				return;
			CacheMessage message = (CacheMessage) msg.getObject();
			Object key = message.getKey();
			Object value = message.getValue();

			if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_ADD) {
				cache.put(key, value);
			} else if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_DELETE) {
				cache.delete(key);
			} else if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_DELETE_SOME) {
				cache.batchDelete((Object[]) key);
			} else if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_UPDATE) {
				cache.update(key, value);
			}

		}
	}

	public CacheSynchronizer(Cache cache) throws Exception {
		channel = new JChannel(protocolStackString);
		recvCallback = new ReceiveCallback();
		recvCallback.setCache(cache);
		channel.setReceiver(recvCallback);
		channel.connect(groupName);
	}

	/**
	 * Send broadcast message
	 * 
	 * @param msg
	 * @throws Exception
	 */

	public void sendCacheFlushMessage(Object msg) throws Exception {
		channel.send(null, null, (Serializable) msg); // 发送广播
	}

}

 

 

辅助类:

 

package com.javaeye.xiaobian.jgroups;

public class CacheConstants {

	/**
	 * Cluster add handle
	 */
	public final static int CLUSTER_ENTRY_ADD = 10;

	/**
	 * Cluster update handle
	 */
	public final static int CLUSTER_ENTRY_UPDATE = 20;

	/**
	 * Cluster delete handle
	 */
	public final static int CLUSTER_ENTRY_DELETE = 30;

	/**
	 * Cluster delete sone handle
	 */
	public final static int CLUSTER_ENTRY_DELETE_SOME = 40;
}

 

package com.javaeye.xiaobian.jgroups;

import java.io.Serializable;

public abstract class CacheMessage implements Serializable {
	public abstract int getOperation();

	public abstract Object getKey();

	public abstract Object getValue();
}

 

package com.javaeye.xiaobian.jgroups;

import java.io.Serializable;

public class SynCacheMessage extends CacheMessage implements Serializable {

	private Object key;
	private Object value;
	private int op;

	public SynCacheMessage() {

	}

	public SynCacheMessage(Object key, Object value, int op) {
		this.key = key;
		this.op = op;
		this.value = value;
	}

	public Object getKey() {
		return key;
	}

	public int getOperation() {
		return op;
	}

	public Object getValue() {
		return value;
	}

}

 

 

实际使用的Cache类:

 

package com.javaeye.xiaobian.jgroups;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class Cache {
	private static Cache singleton = null;
	private LRUCache lruCache = null;
	private CacheSynchronizer cacheSyncer = null;

	private Cache(int size) {
		lruCache = new LRUCache(size);
		createSynchronizer();
	}

	public static Cache getInstance() {
		if (singleton == null) {
			singleton = new Cache(100000);
		}
		return singleton;
	}

	public CacheSynchronizer getSynchronizer() {
		return cacheSyncer;
	}

	public int size() {
		return lruCache.size();
	}

	public void createSynchronizer() {
		try {
			cacheSyncer = new CacheSynchronizer(this);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void batchDelete(Object[] objArrs) {
		lruCache.batchDelete(objArrs);

	}

	public void delete(Object key) {
		lruCache.remove(key);
	}

	public void deleteAll() {
		lruCache.clear();
	}

	public Object get(Object key) {
		return lruCache.get(key);
	}

	public void put(Object key, Object value) {
		lruCache.put(key, value);
	}

	public void update(Object key, Object value) {
		lruCache.update(key, value);
	}

	public Set getAll() {
		return lruCache.getAll();
	}

	@Override
	public String toString() {

		Set set = Collections.synchronizedSet(getAll());
		StringBuffer sb = new StringBuffer();
		Iterator it = set.iterator();
		while (it.hasNext()) {
			Map.Entry entry = (Map.Entry) it.next();
			sb.append("key == " + entry.getKey());
			sb.append("  value == " + entry.getValue());
			sb.append("\n");
		}
		return sb.toString();
	}
}

 

测试:

 

package com.javaeye.xiaobian.jgroups;

public class Test {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub

		Cache cache = Cache.getInstance();

		for (int i = 0; i < 10; i++) {
			cache.put(String.valueOf(i), "cache" + String.valueOf(i));
			CacheMessage msg = new SynCacheMessage(String.valueOf(i), "cache"
					+ String.valueOf(i), CacheConstants.CLUSTER_ENTRY_ADD);
			cache.getSynchronizer().sendCacheFlushMessage(msg);
			System.out.println(cache.size());
			System.out.println(cache.toString());
			Thread.currentThread().sleep(1000 * 5);
		}
		/*
		 * Object[] keys = new Object[10]; for (int i = 0; i < 10; i++) {
		 * keys[i] = String.valueOf(i); } CacheMessage msg1 = new
		 * SynCacheMessage(keys, "cache",
		 * CacheConstants.CLUSTER_ENTRY_DELETE_SOME);
		 * 
		 * cache.getSynchronizer().sendCacheFlushMessage(msg1);
		 */

		for (int i = 0; i < 10; i++) {
			CacheMessage msg = new SynCacheMessage(String.valueOf(i),
					"cacheUpdate" + String.valueOf(i),
					CacheConstants.CLUSTER_ENTRY_UPDATE);
			cache.update(String.valueOf(i), "cacheUpdate" + String.valueOf(i));
			cache.getSynchronizer().sendCacheFlushMessage(msg);
			System.out.println(cache.size());
			System.out.println(cache.toString());
			Thread.currentThread().sleep(1000 * 5);
		}

	}
}

 

 

 

 参考:

http://618119.com/tag/jgroups

http://bypassfacebook.net/browse.php?b=5&u=Oi8vcmVuZXh1LmJsb2dzcG90LmNvbS8yMDA2LzA4L211bHRpY2FzdC1hbmQtamdyb3Vwc18yOC5odG1s

http://www.blogjava.net/swingboat/archive/2007/07/16/130565.html

http://bbs.81tech.com/read.php?tid=84461

 

http://whitesock.iteye.com/blog/199956

  • lib.rar (2.8 MB)
  • 下载次数: 91
1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics