Index: src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java =================================================================== --- src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java (revision 18889) +++ src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java (working copy) @@ -33,93 +33,30 @@ */ package info.magnolia.module.exchangesimple; -import java.util.Vector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; +import edu.emory.mathcs.backport.java.util.concurrent.Executors; /** * Simple thread pool used to speed up activation to multiple subscribers. * @author had * */ -public class ThreadPool { - - private static final Logger log = LoggerFactory.getLogger(ThreadPool.class); +class ThreadPool { - private ThreadGroup group = new ThreadGroup("PooledActivators"); + private static final int NUM_THREADS = 10; - private Vector runnables = new Vector(); + private static ExecutorService instance; - private static ThreadPool instance = new ThreadPool(); - - /** - * Creates new pool of threads and starts them. - */ - private ThreadPool() { - for (int i = 0; i < 10; i++) { - PooledThread pt = new PooledThread(group, "" + i); - pt.start(); - - } + static { + instance = Executors.newFixedThreadPool(NUM_THREADS); } - + /** * Gets single instance of the thread pool. * @return Single instance of pool per VM/classloader. */ - public static ThreadPool getInstance() { + public static ExecutorService getInstance() { return instance; } - /** - * Schedules task for execution. Method returns immediatelly even when all worker threads are busy at the moment. - * @param r Runnable task. - */ - public void run(Runnable r) { - runnables .add(r); - synchronized (this) { - notifyAll(); - } - } - - /** - * Thread instance with infinite loop in run method to ensure periodic execution. - * @author had - * - */ - class PooledThread extends Thread { - - public PooledThread(ThreadGroup group, String name) { - super(group, name); - // make sure those threads don't stop VM from exiting. - setDaemon(true); - } - - public void run() { - while (true ) { - try { - Runnable r = null; - try { - r = (Runnable) runnables.remove(0); - } catch (IndexOutOfBoundsException e) { - // no item found sleep and retry - try { - // sleep for while - Thread.sleep(5000); - } catch (InterruptedException e1) { - // waked up from outside ... ignore - } - continue; - } - if (r != null) { - r.run(); - } - } catch (Throwable t) { - log.error("Activation error detected.", t); - } - } - } - - } } Index: pom.xml =================================================================== --- pom.xml (revision 18889) +++ pom.xml (working copy) @@ -40,6 +40,11 @@ jcr + backport-util-concurrent + backport-util-concurrent + 3.1 + + commons-httpclient commons-httpclient Index: src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java =================================================================== --- src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java (revision 18889) +++ src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java (working copy) @@ -46,27 +46,32 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; -import java.util.Hashtable; +import java.util.Collection; import java.util.Iterator; -import java.util.Vector; +import java.util.Map; import java.util.Map.Entry; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; + /** * * @author Sameer Charles $Id$ */ public class SimpleSyndicator extends BaseSyndicatorImpl { private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class); - + public SimpleSyndicator() { } public void activate(final ActivationContent activationContent) throws ExchangeException { - Iterator subscribers = ActivationManagerFactory.getActivationManager().getSubscribers().iterator(); - final Vector batch = new Vector(); - final Hashtable errors = new Hashtable(); - while (subscribers.hasNext()) { - final Subscriber subscriber = (Subscriber) subscribers.next(); + Collection subscribers = ActivationManagerFactory.getActivationManager().getSubscribers(); + final CountDownLatch batch = new CountDownLatch(subscribers.size()); + final Map errors = new ConcurrentHashMap(); + Iterator subscriberIterator = subscribers.iterator(); + while (subscriberIterator.hasNext()) { + final Subscriber subscriber = (Subscriber) subscriberIterator.next(); if (subscriber.isActive()) { // Create runnable task for each subscriber. Runnable r = new Runnable() { @@ -77,58 +82,67 @@ log.error("Failed to activate content.", e); errors.put(subscriber,e); } finally { - batch.remove(this); + batch.countDown(); } } }; - batch.add(r); // execute task. - ThreadPool.getInstance().run(r); + ThreadPool.getInstance().submit(r); + } else { + // count down directly + batch.countDown(); } } //end of subscriber loop // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user. - while (!batch.isEmpty()) { - log.debug("Waiting for {} tasks to finish.", new Integer(batch.size())); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // waked up externally - ignore - } - } + awaitIgnoringInterruption(batch); - String uuid = activationContent.getproperty(NODE_UUID); // collect all the errors and send them back. if (!errors.isEmpty()) { - Exception e = null; + Exception e = null; StringBuffer msg = new StringBuffer(errors.size() + " error").append( - errors.size() > 1 ? "s" : "").append(" detected: "); - Iterator iter = errors.entrySet().iterator(); - while (iter.hasNext()) { - Entry entry = (Entry) iter.next(); - e = (Exception) entry.getValue(); - Subscriber subscriber = (Subscriber) entry.getKey(); - msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName()); - log.error(e.getMessage(), e); - } + errors.size() > 1 ? "s" : "").append(" detected: "); + Iterator iter = errors.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = (Entry) iter.next(); + e = (Exception) entry.getValue(); + Subscriber subscriber = (Subscriber) entry.getKey(); + msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName()); + log.error(e.getMessage(), e); + } throw new ExchangeException(msg.toString(), e); } - ThreadPool.getInstance().run(new Runnable() { + // all task are done, no need to wait for anybody + ThreadPool.getInstance().submit(new Runnable() { public void run() { - while (!batch.isEmpty()) { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // waked up from outside ... ignore - } - } cleanTemporaryStore(activationContent); } - }); } + + + /** + * Waits for a {@link CountDownLatch} to count down ignoring any + * interruptions. Should any interruption occur the interruption status + * will be set. + * + * @see CountDownLatch#await() + * + * @param latch the latch on which to wait + */ + private static void awaitIgnoringInterruption(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException e) { + // waked up externally - ignore + // try again + awaitIgnoringInterruption(latch); + // be a good citizen and set back the interruption status + Thread.currentThread().interrupt(); + } + } /** * Send activation request if subscribed to the activated URI @@ -197,7 +211,6 @@ String handle = getActivationURL(subscriber); - String versionName = null; try { // authentication headers if (subscriber.getAuthenticationMethod() != null && "form".equalsIgnoreCase(subscriber.getAuthenticationMethod())) { @@ -222,13 +235,16 @@ throw new ExchangeException(e); } } + + public void doDeactivate() throws ExchangeException { - Iterator subscribers = ActivationManagerFactory.getActivationManager().getSubscribers().iterator(); - final Vector batch = new Vector(); - final Hashtable errors = new Hashtable(); - while (subscribers.hasNext()) { - final Subscriber subscriber = (Subscriber) subscribers.next(); + Collection subscribers = ActivationManagerFactory.getActivationManager().getSubscribers(); + final CountDownLatch batch = new CountDownLatch(subscribers.size()); + final Map errors = new ConcurrentHashMap(); + Iterator subscriberIterator = subscribers.iterator(); + while (subscriberIterator.hasNext()) { + final Subscriber subscriber = (Subscriber) subscriberIterator.next(); if (subscriber.isActive()) { // Create runnable task for each subscriber. Runnable r = new Runnable() { @@ -239,25 +255,19 @@ log.error("Failed to deactivate content.", e); errors.put(subscriber,e); } finally { - batch.remove(this); + batch.countDown(); } } }; - batch.add(r); - // execute task. - ThreadPool.getInstance().run(r); + ThreadPool.getInstance().submit(r); + } else { + // count down directly + batch.countDown(); } } //end of subscriber loop // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user. - while (!batch.isEmpty()) { - log.debug("Waiting for {} tasks to finish.", new Integer(batch.size())); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // waked up externally - ignore - } - } + awaitIgnoringInterruption(batch); // collect all the errors and send them back. if (!errors.isEmpty()) {