Index: src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java
===================================================================
--- src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java (revision 18889)
+++ src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java (working copy)
@@ -33,93 +33,30 @@
*/
package info.magnolia.module.exchangesimple;
-import java.util.Vector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
/**
* Simple thread pool used to speed up activation to multiple subscribers.
* @author had
*
*/
-public class ThreadPool {
-
- private static final Logger log = LoggerFactory.getLogger(ThreadPool.class);
+class ThreadPool {
- private ThreadGroup group = new ThreadGroup("PooledActivators");
+ private static final int NUM_THREADS = 10;
- private Vector runnables = new Vector();
+ private static ExecutorService instance;
- private static ThreadPool instance = new ThreadPool();
-
- /**
- * Creates new pool of threads and starts them.
- */
- private ThreadPool() {
- for (int i = 0; i < 10; i++) {
- PooledThread pt = new PooledThread(group, "" + i);
- pt.start();
-
- }
+ static {
+ instance = Executors.newFixedThreadPool(NUM_THREADS);
}
-
+
/**
* Gets single instance of the thread pool.
* @return Single instance of pool per VM/classloader.
*/
- public static ThreadPool getInstance() {
+ public static ExecutorService getInstance() {
return instance;
}
- /**
- * Schedules task for execution. Method returns immediatelly even when all worker threads are busy at the moment.
- * @param r Runnable task.
- */
- public void run(Runnable r) {
- runnables .add(r);
- synchronized (this) {
- notifyAll();
- }
- }
-
- /**
- * Thread instance with infinite loop in run method to ensure periodic execution.
- * @author had
- *
- */
- class PooledThread extends Thread {
-
- public PooledThread(ThreadGroup group, String name) {
- super(group, name);
- // make sure those threads don't stop VM from exiting.
- setDaemon(true);
- }
-
- public void run() {
- while (true ) {
- try {
- Runnable r = null;
- try {
- r = (Runnable) runnables.remove(0);
- } catch (IndexOutOfBoundsException e) {
- // no item found sleep and retry
- try {
- // sleep for while
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- // waked up from outside ... ignore
- }
- continue;
- }
- if (r != null) {
- r.run();
- }
- } catch (Throwable t) {
- log.error("Activation error detected.", t);
- }
- }
- }
-
- }
}
Index: pom.xml
===================================================================
--- pom.xml (revision 18889)
+++ pom.xml (working copy)
@@ -40,6 +40,11 @@
jcr
+ backport-util-concurrent
+ backport-util-concurrent
+ 3.1
+
+
commons-httpclient
commons-httpclient
Index: src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java
===================================================================
--- src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java (revision 18889)
+++ src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java (working copy)
@@ -46,27 +46,32 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
-import java.util.Hashtable;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.Vector;
+import java.util.Map;
import java.util.Map.Entry;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
/**
*
* @author Sameer Charles $Id$
*/
public class SimpleSyndicator extends BaseSyndicatorImpl {
private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class);
-
+
public SimpleSyndicator() {
}
public void activate(final ActivationContent activationContent) throws ExchangeException {
- Iterator subscribers = ActivationManagerFactory.getActivationManager().getSubscribers().iterator();
- final Vector batch = new Vector();
- final Hashtable errors = new Hashtable();
- while (subscribers.hasNext()) {
- final Subscriber subscriber = (Subscriber) subscribers.next();
+ Collection subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
+ final CountDownLatch batch = new CountDownLatch(subscribers.size());
+ final Map errors = new ConcurrentHashMap();
+ Iterator subscriberIterator = subscribers.iterator();
+ while (subscriberIterator.hasNext()) {
+ final Subscriber subscriber = (Subscriber) subscriberIterator.next();
if (subscriber.isActive()) {
// Create runnable task for each subscriber.
Runnable r = new Runnable() {
@@ -77,58 +82,67 @@
log.error("Failed to activate content.", e);
errors.put(subscriber,e);
} finally {
- batch.remove(this);
+ batch.countDown();
}
}
};
- batch.add(r);
// execute task.
- ThreadPool.getInstance().run(r);
+ ThreadPool.getInstance().submit(r);
+ } else {
+ // count down directly
+ batch.countDown();
}
} //end of subscriber loop
// wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user.
- while (!batch.isEmpty()) {
- log.debug("Waiting for {} tasks to finish.", new Integer(batch.size()));
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // waked up externally - ignore
- }
- }
+ awaitIgnoringInterruption(batch);
- String uuid = activationContent.getproperty(NODE_UUID);
// collect all the errors and send them back.
if (!errors.isEmpty()) {
- Exception e = null;
+ Exception e = null;
StringBuffer msg = new StringBuffer(errors.size() + " error").append(
- errors.size() > 1 ? "s" : "").append(" detected: ");
- Iterator iter = errors.entrySet().iterator();
- while (iter.hasNext()) {
- Entry entry = (Entry) iter.next();
- e = (Exception) entry.getValue();
- Subscriber subscriber = (Subscriber) entry.getKey();
- msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
- log.error(e.getMessage(), e);
- }
+ errors.size() > 1 ? "s" : "").append(" detected: ");
+ Iterator iter = errors.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry entry = (Entry) iter.next();
+ e = (Exception) entry.getValue();
+ Subscriber subscriber = (Subscriber) entry.getKey();
+ msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName());
+ log.error(e.getMessage(), e);
+ }
throw new ExchangeException(msg.toString(), e);
}
- ThreadPool.getInstance().run(new Runnable() {
+ // all task are done, no need to wait for anybody
+ ThreadPool.getInstance().submit(new Runnable() {
public void run() {
- while (!batch.isEmpty()) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // waked up from outside ... ignore
- }
- }
cleanTemporaryStore(activationContent);
}
-
});
}
+
+
+ /**
+ * Waits for a {@link CountDownLatch} to count down ignoring any
+ * interruptions. Should any interruption occur the interruption status
+ * will be set.
+ *
+ * @see CountDownLatch#await()
+ *
+ * @param latch the latch on which to wait
+ */
+ private static void awaitIgnoringInterruption(CountDownLatch latch) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ // waked up externally - ignore
+ // try again
+ awaitIgnoringInterruption(latch);
+ // be a good citizen and set back the interruption status
+ Thread.currentThread().interrupt();
+ }
+ }
/**
* Send activation request if subscribed to the activated URI
@@ -197,7 +211,6 @@
String handle = getActivationURL(subscriber);
- String versionName = null;
try {
// authentication headers
if (subscriber.getAuthenticationMethod() != null && "form".equalsIgnoreCase(subscriber.getAuthenticationMethod())) {
@@ -222,13 +235,16 @@
throw new ExchangeException(e);
}
}
+
+
public void doDeactivate() throws ExchangeException {
- Iterator subscribers = ActivationManagerFactory.getActivationManager().getSubscribers().iterator();
- final Vector batch = new Vector();
- final Hashtable errors = new Hashtable();
- while (subscribers.hasNext()) {
- final Subscriber subscriber = (Subscriber) subscribers.next();
+ Collection subscribers = ActivationManagerFactory.getActivationManager().getSubscribers();
+ final CountDownLatch batch = new CountDownLatch(subscribers.size());
+ final Map errors = new ConcurrentHashMap();
+ Iterator subscriberIterator = subscribers.iterator();
+ while (subscriberIterator.hasNext()) {
+ final Subscriber subscriber = (Subscriber) subscriberIterator.next();
if (subscriber.isActive()) {
// Create runnable task for each subscriber.
Runnable r = new Runnable() {
@@ -239,25 +255,19 @@
log.error("Failed to deactivate content.", e);
errors.put(subscriber,e);
} finally {
- batch.remove(this);
+ batch.countDown();
}
}
};
- batch.add(r);
- // execute task.
- ThreadPool.getInstance().run(r);
+ ThreadPool.getInstance().submit(r);
+ } else {
+ // count down directly
+ batch.countDown();
}
} //end of subscriber loop
// wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user.
- while (!batch.isEmpty()) {
- log.debug("Waiting for {} tasks to finish.", new Integer(batch.size()));
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // waked up externally - ignore
- }
- }
+ awaitIgnoringInterruption(batch);
// collect all the errors and send them back.
if (!errors.isEmpty()) {