Some lessons are learnt the hard way. A datamanagement library (abstracting Jackrabbit-JCR) I designed a few months back showed tremendous weakness while handling large concurrency. My first instinct was to run for cover and make all access methods synchronized. Since the framework was a spring managed setup (some keys beans were singleton), the synchronization short cut only made things worse, performance dropped like a stone. Luckily, more for me and some for a handful of modules dependent on the reliability and accuracy of this datamanagement library, the solution was simple to implement.
The root cause of the problem was many re-entrant threads sharing the same session accessing data modification methods concurrently. Session sharing across threads is lamented as an absolute no-no in the JCR world. For some reason that I still have not debugged into, Jackrabbit has left the session management for the developers to perfect. To solve the problem I wrote a class that will manage sessions per thread using the Unique Thread ID. To avoid languishing sessions the sessions are managed through a LRU Map (A map of fixed size with an eviction policy based on Least Recently Used Session makes space for a new Session request).
RepositoryUtil.java
package org.boni.jrtutorial.util; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import javax.jcr.Repository; import javax.jcr.Session; import javax.jcr.Workspace; import org.apache.commons.collections.map.LRUMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class RepositoryUtil { private static Map<Integer, Session> sessionMap; private static Log logger = LogFactory.getLog(RepositoryUtil.class); protected int size(){ return ((sessionMap == null)?0:sessionMap.size()); } public RepositoryUtil(int maxConcurrentSessions){ reinitializeSessionMapResources(maxConcurrentSessions); logger.debug("Adding a shutdown hook for JCR session cleanup"); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { logger.debug("Starting the shutdown hook"); cleanJcrSessions(); logger.debug("Shutdown hook finished running"); } }); logger.debug("shutdown hook added for JCR session cleanup"); } private void cleanJcrSessions(){ logger.debug("Starting the JCR session cleanup through shutdown hook invokation"); synchronized (sessionMap) { for (Integer aKey : sessionMap.keySet()) { try { sessionMap.get(aKey).logout(); logger.debug("Released a session..."); } catch (Exception e) { logger.error(e, e); } } } } @SuppressWarnings("unchecked") private void reinitializeSessionMapResources(int size){ logger.debug("Initializing Session Map"); sessionMap = (Map<Integer,Session>)(Collections.synchronizedMap(new LRUMap(size))); logger.debug("Succesfully initialized Session Map"); } public Session getSession( Repository repository, String workspace, javax.jcr.Credentials credentials ) { long threadId = Thread.currentThread().getId(); int sessionKeyId = new SessionKey(workspace,threadId).hashCode(); logger.debug("Session map is managing " + ((sessionMap == null)?0:sessionMap.size()) + " sessions"); synchronized (sessionMap) { if (sessionMap.containsKey(sessionKeyId)) { Session session = sessionMap.get(threadId); if (session.isLive()) { String[] locks = session.getLockTokens(); for (String aLock : locks){ session.removeLockToken(aLock); } return session; } } try { Session session = repository .login(credentials, workspace); sessionMap.put(sessionKeyId, session); return session; } catch (Exception e) { logger.debug(e.getMessage()); throw new RuntimeException(e); } } } class SessionKey{ int hashCode = -1; SessionKey(String workspace, Long thread){ workspace = (workspace == null)? "default" : workspace; thread = (thread == null)? UUID.randomUUID().clockSequence() : thread; hashCode = 17 * workspace.hashCode() + 19 * thread.hashCode(); } @Override public int hashCode() { return hashCode; } } }
RepositoryUtilTest.java
package org.boni.jrtutorial.util; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jcr.Node; import javax.jcr.Repository; import javax.jcr.Session; import javax.jcr.SimpleCredentials; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.jackrabbit.core.TransientRepository; import org.junit.BeforeClass; import org.junit.Test; public class RepositoryUtilTest { private static Log logger = LogFactory.getLog(RepositoryUtilTest.class); static RepositoryUtil ru = null; static Repository repository = null; static int MAX_SESSIONS=10; @BeforeClass public static void beforeClass() throws Exception{ ru = new RepositoryUtil(MAX_SESSIONS); repository = new TransientRepository( "classpath:repository.xml", "target/repository"); } @Test public void testSingleLogin(){ Session aSession = ru.getSession(repository, "default",new SimpleCredentials("username","password".toCharArray())); assertNotNull(aSession); assertTrue(ru.size() > 0 && ru.size() <= MAX_SESSIONS); } @Test public void testTrannsactionWith1000Threads(){ ExecutorService ex = Executors.newFixedThreadPool(1000); for (int i = 0 ; i < 1000 ; i++){ ex.execute(new Thread(){ @Override public void run() { testSingleLogin(); } }); } ex.shutdown(); } private Node testNode = null; @Test public void testUpdateOnSameNodeWith1000Threads() throws Exception{ this.testNode = createTestNode(); ExecutorService ex = Executors.newFixedThreadPool(1000); for (int i = 0 ; i < 1000 ; i++){ ex.execute(new Thread(){ @Override public void run() { Session aSession = ru.getSession(repository, "default",new SimpleCredentials("username","password".toCharArray())); addNode(aSession, testNode, UUID.randomUUID().toString()); } }); } ex.shutdown(); } private void addNode(Session s, Node aNode, String newNode){ try { long before = aNode.getNodes().getSize(); aNode.addNode(newNode); s.save(); assertTrue(before + 1 <= aNode.getNodes().getSize()); } catch (Exception e) { logger.error(e, e); fail(); } } private Node createTestNode() throws Exception { Session aSession = ru.getSession(repository, "default",new SimpleCredentials("username","password".toCharArray())); Node testNode = aSession.getRootNode().addNode("testNode"); return testNode; } } [/sourcecode] <code>pom.xml [only the relevant dependencies]</code> <dependency> <groupId>javax.jcr</groupId> <artifactId>jcr</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency>