Archive | July, 2009

Heavy Concurrency: A better way to manage JCR Sessions

27 Jul

Some lessons are learnt the hard way.  A datamanagement library (abstracting Jackrabbit-JCR) I designed a few months back showed tremendous weakness while handling large concurrency.  My first instinct was to run for cover and make all access methods synchronized.  Since the framework was a spring managed setup (some keys beans were singleton), the synchronization short cut only made things worse, performance dropped like a stone.  Luckily, more for me and some for a handful of modules dependent on the reliability and accuracy of this datamanagement library, the solution was simple to implement.

The root cause of the problem was many re-entrant threads sharing the same session accessing data modification methods concurrently.  Session sharing across threads is lamented as an absolute no-no in the JCR world.  For some reason that I still have not debugged into, Jackrabbit has left the session management  for the developers to perfect.  To solve the problem I wrote a class that will manage sessions per thread using the Unique Thread ID.  To avoid languishing sessions the sessions are managed through a LRU Map (A map of fixed size with an eviction policy based on Least Recently Used Session makes space for a new Session request).

RepositoryUtil.java

package org.boni.jrtutorial.util;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import javax.jcr.Repository;
import javax.jcr.Session;
import javax.jcr.Workspace;

import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RepositoryUtil {
	private static Map<Integer, Session> sessionMap;
	private static Log logger = LogFactory.getLog(RepositoryUtil.class);
	
	protected int size(){
		return ((sessionMap == null)?0:sessionMap.size());
	}
	
	public RepositoryUtil(int maxConcurrentSessions){
		reinitializeSessionMapResources(maxConcurrentSessions);
		logger.debug("Adding a shutdown hook for JCR session cleanup");
		Runtime.getRuntime().addShutdownHook(new Thread(){
			@Override
			public void run() {
				logger.debug("Starting the shutdown hook");
				cleanJcrSessions();
				logger.debug("Shutdown hook finished running");
			}
		});
		logger.debug("shutdown hook added for JCR session cleanup");
	}
	
	private void cleanJcrSessions(){
		logger.debug("Starting the JCR session cleanup through shutdown hook invokation");
		synchronized (sessionMap) {
			for (Integer aKey : sessionMap.keySet()) {
				try {
					sessionMap.get(aKey).logout();
					logger.debug("Released a session...");
				} catch (Exception e) {
					logger.error(e, e);
				}
			}
		}
	}
	
	@SuppressWarnings("unchecked")
	private void reinitializeSessionMapResources(int size){
	    logger.debug("Initializing Session Map");
		sessionMap = (Map<Integer,Session>)(Collections.synchronizedMap(new LRUMap(size)));
		logger.debug("Succesfully initialized Session Map");
	}
    public Session getSession(	Repository repository,
    								String workspace,
    								javax.jcr.Credentials credentials 
    								) {
   	long threadId = Thread.currentThread().getId();
   	int sessionKeyId = new SessionKey(workspace,threadId).hashCode();
   	logger.debug("Session map is managing " + ((sessionMap == null)?0:sessionMap.size()) + " sessions"); 
   	synchronized (sessionMap) {
		if (sessionMap.containsKey(sessionKeyId)) {
			Session session = sessionMap.get(threadId);
			if (session.isLive()) {
				String[] locks = session.getLockTokens();
				for (String aLock : locks){
					session.removeLockToken(aLock);
				}
				return session;
			}
		}
		try {
			Session session = repository
					.login(credentials, workspace);
			sessionMap.put(sessionKeyId, session);
			return session;
		} catch (Exception e) {
			logger.debug(e.getMessage());
			throw new RuntimeException(e);
		}
	}
   }
   class SessionKey{
	   int hashCode = -1;
	   SessionKey(String workspace, Long thread){
		   workspace = (workspace == null)? "default" : workspace;
		   thread = (thread == null)? UUID.randomUUID().clockSequence() : thread;
		   hashCode =
		   17 * workspace.hashCode() +
		   19 * thread.hashCode();
	   }
		@Override
		public int hashCode() {
			return hashCode;
		}
   }
}

RepositoryUtilTest.java

package org.boni.jrtutorial.util;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.jackrabbit.core.TransientRepository;
import org.junit.BeforeClass;
import org.junit.Test;

public class RepositoryUtilTest {
	private static Log logger = LogFactory.getLog(RepositoryUtilTest.class);
	static RepositoryUtil ru = null;
	static Repository repository = null;
	static int MAX_SESSIONS=10;
	@BeforeClass
	public static void beforeClass() throws Exception{
		ru = new RepositoryUtil(MAX_SESSIONS);
		repository = new TransientRepository(
				"classpath:repository.xml",
				"target/repository");
		
	}
	
	@Test
	public void testSingleLogin(){
		Session aSession = ru.getSession(repository,  "default",new SimpleCredentials("username","password".toCharArray()));
		assertNotNull(aSession);
		assertTrue(ru.size() > 0 && ru.size() <= MAX_SESSIONS);
	}
	
	@Test
	public void testTrannsactionWith1000Threads(){
		ExecutorService ex = Executors.newFixedThreadPool(1000);
		for (int i = 0 ; i < 1000 ; i++){
			ex.execute(new Thread(){
				@Override
				public void run() {
					testSingleLogin();
				}
			});
		}
		ex.shutdown();
	}

	private Node testNode = null;
	
	@Test
	public void testUpdateOnSameNodeWith1000Threads() throws Exception{
		this.testNode = createTestNode();
		ExecutorService ex = Executors.newFixedThreadPool(1000);
		for (int i = 0 ; i < 1000 ; i++){
			ex.execute(new Thread(){
				@Override
				public void run() {
					Session aSession = ru.getSession(repository,  "default",new SimpleCredentials("username","password".toCharArray()));
					addNode(aSession, testNode, UUID.randomUUID().toString());
				}
			});
		}
		ex.shutdown();
	}

	
	private void addNode(Session s, Node aNode, String newNode){
		try {
			long before = aNode.getNodes().getSize();
			aNode.addNode(newNode);
			s.save();
			assertTrue(before + 1 <=  aNode.getNodes().getSize());
		} catch (Exception e) {
			logger.error(e, e);
			fail();
		} 
		
	}
	
	private Node createTestNode() throws Exception {
		Session aSession = ru.getSession(repository,  "default",new SimpleCredentials("username","password".toCharArray()));
		Node testNode = aSession.getRootNode().addNode("testNode");
		return testNode;
	}
	
}
&#91;/sourcecode&#93;

<code>pom.xml [only the relevant dependencies]</code>

  	<dependency>
  		<groupId>javax.jcr</groupId>
  		<artifactId>jcr</artifactId>
  		<version>1.0.1</version>
  	</dependency>
  	<dependency>
  		<groupId>commons-collections</groupId>
  		<artifactId>commons-collections</artifactId>
  		<version>3.2.1</version>
  	</dependency>

Learning Scala

27 Jul

This weekend I took some baby steps in scala.  The [ultimate] target is to learn scala and to build a cool folder synchronizer in scala that will work fast and furious in keeping remote folders in synch.

code

svn checkout http://whiteboardjunkie.googlecode.com/svn/trunk/foldersync foldersync

From what I understod so far, scala is an excellent language improvement over java. The code one writes in scala gets compiled into .class files in the byte code format. This mechanism makes compiled scala indistinguishable from Java. In my case this was the ‘Gotcha’ moment and armed with this knowledge I was able to setup the development environment just the way I wanted it.

  • Can build and test scala with maven2
  • Can write testcases for scala code not only in scala but in java also(Vice Versa also true).
  • Can use a batch file to run the final artifact.
  • Can be packaged with an IzPack installer (Installer can be invoked right within Maven2, nothing specific to scala though).

Important Tools:

  1. Scala IDE for Eclipse
  2. scala-maven-plugin
  3. izpack-maven-plugin

Using the scala-maven-plugin I could extend the standard maven2 code layout into scala with src/main/scala for scala code and src/test/scala for testcases in scala.  Another important aspect is java code can coexist.  In my case I follwed the recommendation from the plugin and used src/main/java and src/test/java.

package

Another great feature of this tool is it’s complete self sufficient nature in it’s relationship with the scala compiler.  It locates the scala compiler through the maven dependency tree.

		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
			<version>${scala-version}</version>
		</dependency>

The fun part was realizing you could write testcases in Java or in Scala.

package org.boni.java.foldersync.domain;
import org.junit.Test;
import static org.junit.Assert.*;
public class JavaFolderTest {
	@Test
	public void testCreateJavaFolder(){
		assertNotNull(new Folder());
	}

	@Test
	public void testCreateScalaFolder(){
		org.boni.scala.foldersync.domain.Folder folder = new org.boni.scala.foldersync.domain.Folder();
		assertNotNull(folder);
	}
}
package org.boni.scala.foldersync.domain
import org.junit._
import Assert._
class ScalaFolderTest {
	@Test
	def testCreateJavaFolder(){
		assertNotNull(new org.boni.java.foldersync.domain.Folder());
	}

	@Test
	def testCreateScalaFolder(){
		assertNotNull(new org.boni.scala.foldersync.domain.Folder());
	}
}

While writing code is fun you would expect a solid transport for packaging and deploying your final product. IzPack came in handy with it’s amazingly simple yet powerful xml driven install scripts – needless to say but it worked perfectly fine with the scala classes and with the maven build process. For this project I wanted a simple batch file based invocation system that will appropriately modify the libraries it needs and then invoke the class with the main.

package org.boni.scala.foldersync.runner

object FolderSynchronizer {
	def main(args: Array[String]){
	  println("Starting folder synchronizer...");
	}
}

The build process I hacked together copies all the necessary dependencies and the other files I need as part of the installation (README, License, Documents) to a staging folder. Then it invokes the IzPack installer plugin that packages everything together as directed in a install.xml script. It also packages along the batch file I want to run and in addition it modifies the batch file to substitute some variables with actual installation conditions. The installer can be invoked by typing java -jar [folder-sync-standard.jar]
installation

Once installed, user should be able to run the application through command line by typing run.bat. The complete build including the installer creation is driven through maven life cycle.

In the coming days as I learn more scala I intend to extend this rudimentary framework to a full scale folder synchronizer implementation in scala.  Will be updating the progress.  Stay tuned on both wordpress and  google code.