Caching - Part III - Distributed caches

In this part we'll be talking about general notions of a distributed cache and an actual implementation in EhCache by using JGroups. A distributed cache is useful if you have a group of processes(even running on different machines) and you want these processes to share the data in the cache. Meaning that you want the other process know if elements from the cache have been changed by one of the processes. In EhCache or JbossCache for example each process holds a local copy of the data and replication messages(adding elements, removing, etc.) are sent from one process to the others when a modification in the cache occurs.

Another type of a distributed cache is a partitioned distributed cache, one where part of data lives in one process(computer) while other parts of the data is distributed among other processes(computers), sometimes with a backup copy of the data in another process. This could be useful if the amount of data is very large, or it is more efficient to partition the data and have the machines work on a query in parallel on their subset of data.

A nice simple to use open source solution that offers data structures like maps and lists but with data distributed in a cluster is HazelCast. Another product that is actually a full data grid solution could be Oracle Coherence(formerly Tangosol Coherence, the inventors of partitioned caching). They offer data distribution with backup copies, locality of data(sending the processing commands to processes that have the needed data, and not having to send that data through the network to a process that does not have it), write through cache to a persistent store(database for example), in a word, very powerful but with a matching price tag.

Synchronous or Asynchronous replication?

The top issue when talking about data replication from one process to another is if the data replication is to be synchronous or asynchronous.

  1. In synchronous data replication a put request from a process will block all other processes access to the cache until it successfully replicates the data change to all other processes that use the cache. You can view in a term of a database transaction. It will update this process's cache and propagate the data modification to the other processes in the same unit of work. This would be the ideal mode of operation because it means that all the processes see the same data in the cache and no ever gets stale data from the cache. However it's likely that in a case of a distributed cache, the processes live on different machines connected through a network, the fact that a write request in one process will block all other reads from the cache this method may not be considered efficient. Also all involved processes must acknowledge the update before the lock is released. Caches are supposed to be fast and network I/O is not, not to mention prone to failure so maybe not wise to be very confident that all the participants are in sync, unless you have some mechanism of failure notification.
    • Advantages : data kept in sync
    • Disadvantages : network I/O is not fast and is prone to failure
  2. In contrary, the asynchronous data replication method does not propagate an update to the other processes in the same transaction. Rather, the replication messages are sent to the other processes at some time after the update of one of the process's cache. This could be implemented for example as another background thread that periodically wakes and sends the replication messages from a queue to the other processes. This means that an update operation on a process to it's local cache will finish very fast since it will not have to block until it receives an acknowledgment of the update from the other processes. If a peer process is not responding to a replication message, how about retrying later, but in no way hinder or block the other processes.
    • Advantages : Updates do not generate long blocks across processes. Simpler to deal with, for example in case of network failure maybe resend the modifications
    • Disadvantages : Data may not be in sync across processes

In most scenarios, considering the fact that when using caches it's acceptable to receive stale data, asynchronous caches are in most cases preferred.

Protocol or method of replication - Multicast

The next important thing you need to decide is what protocol or method of replication you should pick. The most widely used method, or at least one that you should be aware it exists is by multicasting. In IP Multicast there is a specific range of multicast addresses, and a packet sent to one such address is propagated to all the computers who have processes using that same address. It's the same thing as in a JMS Topic - one sent message, multiple receivers who subscribed to your topic.
The nice thing when using multicast is that you do not have to know beforehand who your receivers will be, another computer can join the group by using the same multicast address and be part of the conversation without any modification to the configuration files of the other computers to add the new computer's IP. This can be used for horizontal scalability, just add another computer to the network and that computer can join your cluster without modifications on the configuration of the already running processes.
The problem with IP multicast however is that technically is implemented on top of UDP and not TCP(UDP is not reliable by itself, packets might not reach the destination and the sender be unaware of this, also ordering of packet arrival at the source is not guaranteed), the sender does not know if any of it's listeners missed on receiving a message. This is acceptable for what multicast was intended for: streaming audio and video, where a discarded frame would not be catastrophic for the overall viewing experience of a movie clip and having the player wait for the successful transmission of a frame would be more of a degradation. But multicast had to be made reliable to be used for distributed computing, in case a subscriber missed say a replication message of a put event on the cache. This could be achieved if the listeners are expected to send back to the sender an acknowledgment message. Enter into the picture the cool open source java library JGroups it's specialty is to make "groups" of processes, and the members can send messages to the other participants.

JGroups

JGroups exposes an API for dealing with high group level communication like sending messages between the participants in a group, and internally it uses a stack of protocols one atop the other which can be configured together to implement the desired behavior. For example a typical stack of protocols (JGroups can be configured through a xml file):

 <config>
  <UDP mcast_addr=228.10.10.10 mcast_port=45588/> <!-- sets the underlying protocol to be used as UDP multicast -->
  <PING timeout="2000"/> <!-- discovery of members through multicast ping -->
  <MERGE2/>  <!-- handles regrouping of the members. Kicks in after a healed network partition when subgroups might have been created. -->
  <FD_SOCK/>   <!-- Failure detection of group members -->
  <VERIFY_SUSPECT timeout="1500"  />  <!-- double check that a suspected failed member had really failed -->
  <pbcast.NAKACK retransmit_timeout="2400,4800"/> <!--makes multicasting reliable and keeps the order of messages-->
  <UNICAST/>  <!-- makes UDP unicast messages reliable and ordered, UDP unicast is still used for sending messages to a specific member of a group for example-->
  <pbcast.STABLE/> <!--messages that have not been acknowledged as received are kept in memory for retransmission, and this protocol deals with removing those that have been acknowledged by all the subscribers-->
  <pbcast.GMS/> <!-- Group Membership, handles joining or leaving of members from the group -->
</config>  

The comments are self explanatory. We have added some layers on top of standard UDP multicast to make multicasting reliable, and be notified when another process joins or leaves our group.
Although it seems a bit complicated these are mostly standard setups that can be reused and need little changing, but if you know what you are doing, you could play around with the parameters for every layer since some layers do have lots of parameters to play with.
The good thing is that changing the protocol stack and thus the method of replication can be obtained by modifying the xml config file, without change to the source code. For example we could make the underlying replication mechanism use tcp instead of udp multicast
But using tcp means that we need to configure the ip of every participant beforehand in order to have the processes open unicast connections to them. This is not a problem in setups where you do not have to dynamically add a computer to the cluster, and you would get the benefit of reliability inherent from using tcp.


JGroups is a proven library, that is used in many open source and commercial projects, and I encourage you to have a look at it especially if you are interested in distributed computing. But JGroups is not the only available replication method available for EhCache. Other methods of replication include using a JMS server and have the processes listen for JMS messages. Another replication method available is standard RMI among the processes and finally using a Terracota server. Since EhCache was acquired by Terracota I suppose they will try to make the integration with the Terracota server very easy, but hope they will not neglect the other replication methods to get more people using Terracota replication (currently Ehcache JGroups replication does not work with the latest JGroups 2.8.0 library version but with the previous 2.4.7 version).

Cache listener

The starting point for replication in EhCache is the ability to add a cache listener to be able to intercept when a modification to the cache occurs. So basically since a cache listener is informed of the cache modification it will send an update message to the other processes that will take the same action on their version of the cache.

To add your own cache listener you need to implement the CacheEventListener interface in your custom class and have another factory class(you need a class that extends the abstract class CacheEventListenerFactory) that returns your custom cache listener. Next you add this cache listener factory to the cache. This can be done in the ehcache.xml configuration file. An example:

public class MyCacheEventListenerFactory extends CacheEventListenerFactory {

    public CacheEventListener createCacheEventListener(Properties properties) {
        String configValue = (String) properties.get("someProperty1");
        return new MyCacheEventListener();
    }
}

public class MyCacheEventListener implements CacheEventListener {

    public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
        System.out.println("Element was removed");
    }

    public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
        System.out.println("Element was put");
    }

    public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
        System.out.println("Element was updated");
    }

    public void notifyElementExpired(Ehcache cache, Element element) {
        System.out.println("Element expired ");
    }

    public void notifyElementEvicted(Ehcache cache, Element element) {
        System.out.println("Element evicted");
    }

    public void notifyRemoveAll(Ehcache cache) {
        System.out.println("Elements removed");
    }
}

and ehcache.xml configuration:

    <cache name="customCache"
           maxElementsInMemory="30"
           eternal="false"
           timeToLiveSeconds="300"
           overflowToDisk="false">

        <cacheEventListenerFactory class="com.balamaci.MyCacheEventListenerFactory" properties="someProperty1=true"/>

    </cache>

Implementing JGroups replication

To set up replication, EhCache provides cache listeners classes for every replication method(JMS, RMI and JGroups). Let's see how we can configure JGroups replication. First download the jgroups-all.jar library and add it to the project classpath. Also ehcache-jgroupsreplication.jar must be in your classpath:

<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="../config/ehcache.xsd"  
             updateCheck="false" monitoring="autodetect">

<cacheManagerPeerProviderFactory class="net.sf.ehcache.distribution.jgroups.JGroupsCacheManagerPeerProviderFactory"  
     properties="connect=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;
     mcast_send_buf_size=150000;mcast_recv_buf_size=80000):
     PING(timeout=2000;num_initial_members=6):
     MERGE2(min_interval=5000;max_interval=10000):
     FD_SOCK:VERIFY_SUSPECT(timeout=1500):
     pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):
     UNICAST(timeout=5000):
     pbcast.STABLE(desired_avg_gossip=20000):
     FRAG:
     pbcast.GMS(join_timeout=5000;join_retry_timeout=2000)"
 propertySeparator="::"
     />


<cacheManagerPeerProviderFactory class="net.sf.ehcache.distribution.jgroups.JGroupsCacheManagerPeerProviderFactory"/>  
    <cache name="customCache"
           maxElementsInMemory="30"
           eternal="false"
           timeToLiveSeconds="300"
           overflowToDisk="false">

        <cacheEventListenerFactory
           class="net.sf.ehcache.distribution.jgroups.JGroupsCacheReplicatorFactory"
           properties="replicateAsynchronously=true, replicatePuts=true,
           replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true,
           asynchronousReplicationIntervalMillis=1000" />

        <bootstrapCacheLoaderFactory class="net.sf.ehcache.distribution.jgroups.JGroupsBootstrapCacheLoaderFactory"/>
    </cache>

First the JGroups protocol stack that we already talked about is configured in the JGroupsCacheManagerPeerProviderFactory.
We can see that the JGroupsCacheReplicatorFactory class also received a set of properties. Most are self explanatory:

  • replicateAsynchronously = true or false. It's about what we talked about asynchronous or synchronous replication.
  • replicatePuts, replicateRemovals, ... = true or false if any of these types of modifications are to be sent to the peer processes or not
  • replicateUpdatesViaCopy = true or false if the updated object is passed along with the update message, otherwise a remove message is sent to the peer caches so that a cache miss occurs and the element is refreshed from the persistent storage.

You may notice that we also introduced a bootstrapCacheLoaderFactory element. Well, having the cache listeners is not enough for a good replication implementation. Consider that not all the processes start at the same time. The processes that started late must receive from the others the initial state of the cache(it's elements) to be in sync, and afterwards they will receive the update messages. This initial state transfer is handled by JGroupsBootstrapCacheLoaderFactory. Nothing else changes in the way the cache is handled programatically, you still put and remove elements in the same way.

Conclusion

We saw how we can implement a distributed cache in EhCache, and that a distributed cache is not hard to obtain. On the other hand, the realm of distributed caches is one that brings additional problems, one that you must be aware for example that other processes can be separated by a network failure, have stale data, or fail to respond for some reason.

comments powered by Disqus