Index: src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java =================================================================== --- src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java (revision 18889) +++ src/main/java/info/magnolia/module/exchangesimple/ThreadPool.java (working copy) @@ -33,93 +33,45 @@ */ package info.magnolia.module.exchangesimple; -import java.util.Vector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import EDU.oswego.cs.dl.util.concurrent.Executor; +import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; /** * Simple thread pool used to speed up activation to multiple subscribers. * @author had * */ -public class ThreadPool { - - private static final Logger log = LoggerFactory.getLogger(ThreadPool.class); - - private ThreadGroup group = new ThreadGroup("PooledActivators"); +final class ThreadPool { - private Vector runnables = new Vector(); - - private static ThreadPool instance = new ThreadPool(); - /** - * Creates new pool of threads and starts them. + * Private constructor to prevent accidential instantiation. */ private ThreadPool() { - for (int i = 0; i < 10; i++) { - PooledThread pt = new PooledThread(group, "" + i); - pt.start(); - - } + throw new AssertionError("not instantiable"); } + + /** + * The fixes number of threads to use in the pool. + */ + private static final int NUM_THREADS = 10; + + private static final PooledExecutor instance; + static { + // possibly have an infinite number of jobs + instance = new PooledExecutor(new LinkedQueue()); + // always have NUM_THREADS running + instance.setMinimumPoolSize(NUM_THREADS); + instance.setMaximumPoolSize(NUM_THREADS); + } + /** * Gets single instance of the thread pool. * @return Single instance of pool per VM/classloader. */ - public static ThreadPool getInstance() { + public static Executor getInstance() { return instance; } - /** - * Schedules task for execution. Method returns immediatelly even when all worker threads are busy at the moment. - * @param r Runnable task. - */ - public void run(Runnable r) { - runnables .add(r); - synchronized (this) { - notifyAll(); - } - } - - /** - * Thread instance with infinite loop in run method to ensure periodic execution. - * @author had - * - */ - class PooledThread extends Thread { - - public PooledThread(ThreadGroup group, String name) { - super(group, name); - // make sure those threads don't stop VM from exiting. - setDaemon(true); - } - - public void run() { - while (true ) { - try { - Runnable r = null; - try { - r = (Runnable) runnables.remove(0); - } catch (IndexOutOfBoundsException e) { - // no item found sleep and retry - try { - // sleep for while - Thread.sleep(5000); - } catch (InterruptedException e1) { - // waked up from outside ... ignore - } - continue; - } - if (r != null) { - r.run(); - } - } catch (Throwable t) { - log.error("Activation error detected.", t); - } - } - } - - } } Index: src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java =================================================================== --- src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java (revision 18889) +++ src/main/java/info/magnolia/module/exchangesimple/SimpleSyndicator.java (working copy) @@ -37,36 +37,42 @@ import info.magnolia.cms.exchange.ExchangeException; import info.magnolia.cms.exchange.Subscriber; import info.magnolia.cms.exchange.Subscription; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; -import java.util.Hashtable; +import java.util.Collection; import java.util.Iterator; -import java.util.Vector; +import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; +import EDU.oswego.cs.dl.util.concurrent.CountDown; +import EDU.oswego.cs.dl.util.concurrent.Sync; + /** * * @author Sameer Charles $Id$ */ public class SimpleSyndicator extends BaseSyndicatorImpl { private static final Logger log = LoggerFactory.getLogger(SimpleSyndicator.class); - + public SimpleSyndicator() { } public void activate(final ActivationContent activationContent) throws ExchangeException { - Iterator subscribers = ActivationManagerFactory.getActivationManager().getSubscribers().iterator(); - final Vector batch = new Vector(); - final Hashtable errors = new Hashtable(); - while (subscribers.hasNext()) { - final Subscriber subscriber = (Subscriber) subscribers.next(); + Collection subscribers = ActivationManagerFactory.getActivationManager().getSubscribers(); + final Sync done = new CountDown(subscribers.size()); + final Map errors = new ConcurrentHashMap(subscribers.size()); + Iterator subscriberIterator = subscribers.iterator(); + while (subscriberIterator.hasNext()) { + final Subscriber subscriber = (Subscriber) subscriberIterator.next(); if (subscriber.isActive()) { // Create runnable task for each subscriber. Runnable r = new Runnable() { @@ -77,58 +83,87 @@ log.error("Failed to activate content.", e); errors.put(subscriber,e); } finally { - batch.remove(this); + done.release(); } } }; - batch.add(r); // execute task. - ThreadPool.getInstance().run(r); + executeInPool(r); + } else { + // count down directly + done.release(); } } //end of subscriber loop // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user. - while (!batch.isEmpty()) { - log.debug("Waiting for {} tasks to finish.", new Integer(batch.size())); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // waked up externally - ignore - } - } + acquireIgnoringInterruption(done); - String uuid = activationContent.getproperty(NODE_UUID); // collect all the errors and send them back. if (!errors.isEmpty()) { - Exception e = null; + Exception e = null; StringBuffer msg = new StringBuffer(errors.size() + " error").append( - errors.size() > 1 ? "s" : "").append(" detected: "); - Iterator iter = errors.entrySet().iterator(); - while (iter.hasNext()) { - Entry entry = (Entry) iter.next(); - e = (Exception) entry.getValue(); - Subscriber subscriber = (Subscriber) entry.getKey(); - msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName()); - log.error(e.getMessage(), e); - } + errors.size() > 1 ? "s" : "").append(" detected: "); + Iterator iter = errors.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = (Entry) iter.next(); + e = (Exception) entry.getValue(); + Subscriber subscriber = (Subscriber) entry.getKey(); + msg.append("\n").append(e.getMessage()).append(" on ").append(subscriber.getName()); + log.error(e.getMessage(), e); + } throw new ExchangeException(msg.toString(), e); } - ThreadPool.getInstance().run(new Runnable() { + // all task are done, no need to wait for anybody + executeInPool(new Runnable() { public void run() { - while (!batch.isEmpty()) { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // waked up from outside ... ignore - } - } cleanTemporaryStore(activationContent); } - }); } + + /** + * Runs a given job in the thread pool. + * + * @param job the job to run + * @throws ExchangeException if the job could not be put in the pool + */ + private static void executeInPool(Runnable job) throws ExchangeException { + try { + ThreadPool.getInstance().execute(job); + } catch (InterruptedException e) { + // this is kind of a problem, we could not add the job to the pool + // retrying might or might not work now that the interruption + // status is cleared but there is not much we can do so throwing + // an ExchangeException seems like the least bad choice + String message = "could not execute job in pool"; + log.error(message, e); + throw new ExchangeException(message, e); + } + } + + + /** + * Acquires a {@link Sync} ignoring any interruptions. Should any + * interruption occur the interruption status will be set. Might + * potentially block/wait forever + * + * @see Sync#acquire() + * + * @param latch the latch on which to wait + */ + private static void acquireIgnoringInterruption(Sync latch) { + try { + latch.acquire(); + } catch (InterruptedException e) { + // waked up externally - ignore + // try again + acquireIgnoringInterruption(latch); + // be a good citizen and set back the interruption status + Thread.currentThread().interrupt(); + } + } /** * Send activation request if subscribed to the activated URI @@ -197,7 +232,6 @@ String handle = getActivationURL(subscriber); - String versionName = null; try { // authentication headers if (subscriber.getAuthenticationMethod() != null && "form".equalsIgnoreCase(subscriber.getAuthenticationMethod())) { @@ -222,13 +256,16 @@ throw new ExchangeException(e); } } + + public void doDeactivate() throws ExchangeException { - Iterator subscribers = ActivationManagerFactory.getActivationManager().getSubscribers().iterator(); - final Vector batch = new Vector(); - final Hashtable errors = new Hashtable(); - while (subscribers.hasNext()) { - final Subscriber subscriber = (Subscriber) subscribers.next(); + Collection subscribers = ActivationManagerFactory.getActivationManager().getSubscribers(); + final Sync done = new CountDown(subscribers.size()); + final Map errors = new ConcurrentHashMap(subscribers.size()); + Iterator subscriberIterator = subscribers.iterator(); + while (subscriberIterator.hasNext()) { + final Subscriber subscriber = (Subscriber) subscriberIterator.next(); if (subscriber.isActive()) { // Create runnable task for each subscriber. Runnable r = new Runnable() { @@ -239,25 +276,19 @@ log.error("Failed to deactivate content.", e); errors.put(subscriber,e); } finally { - batch.remove(this); + done.release(); } } }; - batch.add(r); - // execute task. - ThreadPool.getInstance().run(r); + executeInPool(r); + } else { + // count down directly + done.release(); } } //end of subscriber loop // wait until all tasks are executed before returning back to user to make sure errors can be propagated back to the user. - while (!batch.isEmpty()) { - log.debug("Waiting for {} tasks to finish.", new Integer(batch.size())); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // waked up externally - ignore - } - } + acquireIgnoringInterruption(done); // collect all the errors and send them back. if (!errors.isEmpty()) {