From 640888051a8fffb692042216430555f2490f6b6f Mon Sep 17 00:00:00 2001 From: Heng Sin Low Date: Wed, 14 Nov 2012 10:11:17 +0800 Subject: [PATCH] IDEMPIERE-502 Use single thread pool for client and server process --- .../src/org/compiere/Adempiere.java | 25 ++- .../src/org/compiere/util/Trx.java | 39 ++-- .../org/compiere/server/AcctProcessor.java | 6 +- .../org/compiere/server/AdempiereServer.java | 173 ++++++--------- .../compiere/server/AdempiereServerMgr.java | 204 +++++++++--------- .../org/compiere/server/AlertProcessor.java | 4 +- .../org/compiere/web/AdempiereMonitor.java | 85 ++++---- .../webui/dashboard/DashboardRunnable.java | 86 +------- .../webui/desktop/DashboardController.java | 28 +-- 9 files changed, 260 insertions(+), 390 deletions(-) diff --git a/org.adempiere.base/src/org/compiere/Adempiere.java b/org.adempiere.base/src/org/compiere/Adempiere.java index e97a159f6d..b30722ce88 100644 --- a/org.adempiere.base/src/org/compiere/Adempiere.java +++ b/org.adempiere.base/src/org/compiere/Adempiere.java @@ -23,8 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.Properties; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -50,6 +49,7 @@ import org.compiere.util.Ini; import org.compiere.util.Login; import org.compiere.util.SecureEngine; import org.compiere.util.SecureInterface; +import org.compiere.util.Trx; import org.compiere.util.Util; import org.eclipse.core.runtime.IProduct; import org.eclipse.core.runtime.Platform; @@ -111,7 +111,7 @@ public final class Adempiere private static CLogger log = null; /** Thread pool **/ - private static ThreadPoolExecutor threadPoolExecutor = null; + private static ScheduledThreadPoolExecutor threadPoolExecutor = null; /** A list of event listeners for this component. */ private static EventListenerList m_listenerList = new EventListenerList(); @@ -553,8 +553,10 @@ public final class Adempiere } // startup private static void createThreadPool() { - int min = 10; - int max = 200; + int max = Runtime.getRuntime().availableProcessors() * 3; + int min = max / 2; + int defaultMax = max; + int defaultMin = min; Properties properties = Ini.getProperties(); String maxSize = properties.getProperty("MaxThreadPoolSize"); String minSize = properties.getProperty("MinThreadPoolSize"); @@ -572,14 +574,19 @@ public final class Adempiere max = min; } if (max <= 0) { - max = 200; + max = defaultMax; } if (min < 0) { - min = 10; + min = defaultMin; } + // start thread pool - threadPoolExecutor = new ThreadPoolExecutor(min, max, 1, TimeUnit.MINUTES, new LinkedBlockingQueue()); + threadPoolExecutor = new ScheduledThreadPoolExecutor(min); + threadPoolExecutor.setMaximumPoolSize(max); + threadPoolExecutor.setKeepAliveTime(10, TimeUnit.MINUTES); threadPoolExecutor.allowCoreThreadTimeOut(true); + + Trx.startTrxMonitor(); } /** @@ -660,7 +667,7 @@ public final class Adempiere } } - public static ThreadPoolExecutor getThreadPoolExecutor() { + public static ScheduledThreadPoolExecutor getThreadPoolExecutor() { return threadPoolExecutor; } diff --git a/org.adempiere.base/src/org/compiere/util/Trx.java b/org.adempiere.base/src/org/compiere/util/Trx.java index 4c409e9037..98ab7cbe11 100644 --- a/org.adempiere.base/src/org/compiere/util/Trx.java +++ b/org.adempiere.base/src/org/compiere/util/Trx.java @@ -25,9 +25,11 @@ import java.sql.Savepoint; import java.util.Collection; import java.util.Date; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.adempiere.exceptions.AdempiereException; +import org.compiere.Adempiere; import org.compiere.model.PO; /** @@ -85,13 +87,9 @@ public class Trx implements VetoableChangeListener private static Trx.TrxMonitor s_monitor = new Trx.TrxMonitor(); - static + public static void startTrxMonitor() { - Thread monitorThread = new Thread(s_monitor); - monitorThread.setDaemon(true); - monitorThread.setPriority(Thread.MIN_PRIORITY); - monitorThread.setName("Trx-Monitor"); - monitorThread.start(); + Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(s_monitor, 5, 5, TimeUnit.MINUTES); } /** @@ -611,32 +609,23 @@ public class Trx implements VetoableChangeListener public void run() { - for(;;) + if (Trx.s_cache != null && !Trx.s_cache.isEmpty()) { - if (Trx.s_cache != null && !Trx.s_cache.isEmpty()) + Trx[] trxs = Trx.s_cache.values().toArray(new Trx[0]); + for(int i = 0; i < trxs.length; i++) { - Trx[] trxs = Trx.s_cache.values().toArray(new Trx[0]); - for(int i = 0; i < trxs.length; i++) - { - if (trxs[i].m_startTime <= 0) - continue; + if (trxs[i].m_startTime <= 0) + continue; - long since = System.currentTimeMillis() - trxs[i].m_startTime; - if (since > trxs[i].getTimeout() * 1000) - { - trxs[i].log.log(Level.WARNING, "Transaction timeout. Name="+trxs[i].getTrxName() + ", timeout(sec)="+(since / 1000)); - trxs[i].close(); - } + long since = System.currentTimeMillis() - trxs[i].m_startTime; + if (since > trxs[i].getTimeout() * 1000) + { + trxs[i].log.log(Level.WARNING, "Transaction timeout. Name="+trxs[i].getTrxName() + ", timeout(sec)="+(since / 1000)); + trxs[i].close(); } } - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { - Thread.interrupted(); - } } } - } private boolean isLocalTrx(String trxName) diff --git a/org.adempiere.server/src/main/server/org/compiere/server/AcctProcessor.java b/org.adempiere.server/src/main/server/org/compiere/server/AcctProcessor.java index 5c2d2bdc5a..28c4b226ef 100644 --- a/org.adempiere.server/src/main/server/org/compiere/server/AcctProcessor.java +++ b/org.adempiere.server/src/main/server/org/compiere/server/AcctProcessor.java @@ -26,13 +26,11 @@ import java.util.List; import java.util.logging.Level; import org.compiere.acct.DocManager; -import org.compiere.model.AdempiereProcessor2; import org.compiere.model.MAcctProcessor; import org.compiere.model.MAcctProcessorLog; import org.compiere.model.MAcctSchema; import org.compiere.model.MClient; import org.compiere.model.MCost; -import org.compiere.model.MSchedule; import org.compiere.util.DB; import org.compiere.util.Env; import org.compiere.util.TimeUtil; @@ -130,7 +128,7 @@ public class AcctProcessor extends AdempiereServer pstmt.setInt(1, m_model.getAD_Client_ID()); pstmt.setBigDecimal(2, value); rs = pstmt.executeQuery(); - while (!isInterrupted() && rs.next()) + while (!Thread.currentThread().isInterrupted() && rs.next()) { BigDecimal processedOn = rs.getBigDecimal(1); if (!listProcessedOn.contains(processedOn)) @@ -198,7 +196,7 @@ public class AcctProcessor extends AdempiereServer } catch (Exception e) { - log.log(Level.SEVERE, getName() + ": " + TableName, e); + log.log(Level.SEVERE, TableName, e); ok = false; } if (!ok) diff --git a/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServer.java b/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServer.java index b1386b31b0..330100a4e2 100644 --- a/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServer.java +++ b/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServer.java @@ -43,7 +43,7 @@ import org.compiere.wf.MWorkflowProcessor; * @author Jorg Janke * @version $Id: AdempiereServer.java,v 1.3 2006/10/09 00:23:26 jjanke Exp $ */ -public abstract class AdempiereServer extends Thread +public abstract class AdempiereServer implements Runnable { /** * Create New Server Thead @@ -78,7 +78,6 @@ public abstract class AdempiereServer extends Thread */ protected AdempiereServer (AdempiereProcessor model, int initialNap) { - super (AdempiereServerGroup.get(), null, model.getName(), 0); p_model = model; m_ctx = new Properties(model.getCtx()); if (p_system == null) @@ -86,7 +85,16 @@ public abstract class AdempiereServer extends Thread p_client = MClient.get(m_ctx); Env.setContext(m_ctx, "#AD_Client_ID", p_client.getAD_Client_ID()); m_initialNap = initialNap; - // log.info(model.getName() + " - " + getThreadGroup()); + + Timestamp dateNextRun = getDateNextRun(true); + if (dateNextRun != null) + m_nextWork = dateNextRun.getTime(); + + long now = System.currentTimeMillis(); + if (m_nextWork > now) + { + m_sleepMS = m_nextWork - now; + } } // ServerBase /** The Processor Model */ @@ -97,7 +105,7 @@ public abstract class AdempiereServer extends Thread /** Milliseconds to sleep - 10 Min default */ protected long m_sleepMS = 600000; /** Sleeping */ - private volatile boolean m_sleeping = false; + private volatile boolean m_sleeping = true; /** Server start time */ protected long m_start = 0; /** Number of Work executions */ @@ -136,41 +144,18 @@ public abstract class AdempiereServer extends Thread { return m_sleepMS; } // getSleepMS - - - /** - * Sleep for set time - * @return true if not interrupted - */ - public boolean sleep() - { - if (isInterrupted()) - { - log.info (getName() + ": interrupted"); - return false; - } - log.fine(getName() + ": sleeping " + TimeUtil.formatElapsed(m_sleepMS)); - m_sleeping = true; - try - { - sleep (m_sleepMS); - } - catch (InterruptedException e) - { - log.info (getName() + ": interrupted"); - m_sleeping = false; - return false; - } - m_sleeping = false; - return true; - } // sleep + + public long getInitialNap() + { + return m_initialNap; + } /** * Run Now */ public void runNow() - { - log.info(getName()); + { + m_sleeping = false; p_startWork = System.currentTimeMillis(); doWork(); long now = System.currentTimeMillis(); @@ -182,8 +167,10 @@ public abstract class AdempiereServer extends Thread // p_model.setDateLastRun(new Timestamp(now)); p_model.saveEx(); - // - log.fine(getName() + ": " + getStatistics()); + // + if (log.isLoggable(Level.FINE)) + log.fine(getStatistics()); + m_sleeping = true; } // runNow /************************************************************************** @@ -191,76 +178,43 @@ public abstract class AdempiereServer extends Thread */ public void run () { - try - { - log.fine(getName() + ": pre-nap - " + m_initialNap); - sleep (m_initialNap * 1000); - } - catch (InterruptedException e) - { - log.log(Level.SEVERE, getName() + ": pre-nap interrupted", e); - return; - } + m_sleeping = false; + if (m_start == 0) + m_start = System.currentTimeMillis(); + + // --------------- + p_startWork = System.currentTimeMillis(); + doWork(); + long now = System.currentTimeMillis(); + // --------------- - m_start = System.currentTimeMillis(); - while (true) - { - if (m_nextWork == 0) - { - Timestamp dateNextRun = getDateNextRun(true); - if (dateNextRun != null) - m_nextWork = dateNextRun.getTime(); - } - long now = System.currentTimeMillis(); - if (m_nextWork > now) - { - m_sleepMS = m_nextWork - now; - if (!sleep ()) - break; - } - if (isInterrupted()) - { - log.info (getName() + ": interrupted"); - break; - } + p_runCount++; + m_runLastMS = now - p_startWork; + m_runTotalMS += m_runLastMS; - // --------------- - p_startWork = System.currentTimeMillis(); - doWork(); - now = System.currentTimeMillis(); - // --------------- - - p_runCount++; - m_runLastMS = now - p_startWork; - m_runTotalMS += m_runLastMS; - - // Finished work - calculate datetime for next run - Timestamp lastRun = new Timestamp(now); - if (p_model instanceof AdempiereProcessor2) + // Finished work - calculate datetime for next run + Timestamp lastRun = new Timestamp(now); + if (p_model instanceof AdempiereProcessor2) + { + AdempiereProcessor2 ap = (AdempiereProcessor2) p_model; + if (ap.isIgnoreProcessingTime()) { - AdempiereProcessor2 ap = (AdempiereProcessor2) p_model; - if (ap.isIgnoreProcessingTime()) - { - lastRun = new Timestamp(p_startWork); - } + lastRun = new Timestamp(p_startWork); } - - m_nextWork = MSchedule.getNextRunMS(lastRun.getTime(), - p_model.getScheduleType(), p_model.getFrequencyType(), - p_model.getFrequency(), p_model.getCronPattern()); + } + + m_nextWork = MSchedule.getNextRunMS(lastRun.getTime(), + p_model.getScheduleType(), p_model.getFrequencyType(), + p_model.getFrequency(), p_model.getCronPattern()); - m_sleepMS = m_nextWork - now; - log.info(getName() + " Next run: " + new Timestamp(m_nextWork) + " sleep " + m_sleepMS); - // - p_model.setDateLastRun(lastRun); - p_model.setDateNextRun(new Timestamp(m_nextWork)); - p_model.saveEx(); - // - log.fine(getName() + ": " + getStatistics()); - if (!sleep()) - break; - } - m_start = 0; + m_sleepMS = m_nextWork - now; + if (log.isLoggable(Level.INFO)) + log.info(" Next run: " + new Timestamp(m_nextWork) + " sleep " + m_sleepMS); + // + p_model.setDateLastRun(lastRun); + p_model.setDateNextRun(new Timestamp(m_nextWork)); + p_model.saveEx(); + m_sleeping = true; } // run /** @@ -347,11 +301,8 @@ public abstract class AdempiereServer extends Thread */ public String toString () { - StringBuffer sb = new StringBuffer (getName()) - .append (",Prio=").append(getPriority()) - .append (",").append (getThreadGroup()) - .append (",Alive=").append(isAlive()) - .append (",Sleeping=").append(m_sleeping) + StringBuffer sb = new StringBuffer () + .append ("Sleeping=").append(m_sleeping) .append (",Last=").append(getDateLastRun()); if (m_sleeping) sb.append (",Next=").append(getDateNextRun(false)); @@ -392,6 +343,16 @@ public abstract class AdempiereServer extends Thread } // getLogs + protected boolean isInterrupted() { + return Thread.currentThread().isInterrupted(); + } + + + public String getName() { + return p_model.getName(); + } + + public static boolean isOKtoRunOnIP(AdempiereProcessor model) { if (model instanceof AdempiereProcessor2) { int AD_Schedule_ID = ((AdempiereProcessor2)model).getAD_Schedule_ID(); diff --git a/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServerMgr.java b/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServerMgr.java index 0d376ebc40..5c332b498f 100644 --- a/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServerMgr.java +++ b/org.adempiere.server/src/main/server/org/compiere/server/AdempiereServerMgr.java @@ -20,6 +20,8 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.adempiere.base.Service; @@ -75,7 +77,7 @@ public class AdempiereServerMgr } // AdempiereServerMgr /** The Servers */ - private ArrayList m_servers = new ArrayList(); + private ArrayList m_servers = new ArrayList(); /** Context */ private Properties m_ctx = Env.getCtx(); /** Start */ @@ -107,7 +109,7 @@ public class AdempiereServerMgr { log.info(""); int noServers = 0; - m_servers=new ArrayList(); + m_servers=new ArrayList(); // Accounting MAcctProcessor[] acctModels = MAcctProcessor.getActive(m_ctx); for (int i = 0; i < acctModels.length; i++) @@ -115,9 +117,9 @@ public class AdempiereServerMgr MAcctProcessor pModel = acctModels[i]; AdempiereServer server = AdempiereServer.create(pModel); if (server != null) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-2); + m_servers.add(new ServerWrapper(server)); } } // Request @@ -127,9 +129,9 @@ public class AdempiereServerMgr MRequestProcessor pModel = requestModels[i]; AdempiereServer server = AdempiereServer.create(pModel); if (server != null) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-2); + m_servers.add(new ServerWrapper(server)); } } // Workflow @@ -139,9 +141,9 @@ public class AdempiereServerMgr MWorkflowProcessor pModel = workflowModels[i]; AdempiereServer server = AdempiereServer.create(pModel); if (server != null) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-2); + m_servers.add(new ServerWrapper(server)); } } // Alert @@ -151,9 +153,9 @@ public class AdempiereServerMgr MAlertProcessor pModel = alertModels[i]; AdempiereServer server = AdempiereServer.create(pModel); if (server != null) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-2); + m_servers.add(new ServerWrapper(server)); } } // Scheduler @@ -163,9 +165,9 @@ public class AdempiereServerMgr MScheduler pModel = schedulerModels[i]; AdempiereServer server = AdempiereServer.create(pModel); if (server != null) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-2); + m_servers.add(new ServerWrapper(server)); } } // LDAP @@ -175,9 +177,9 @@ public class AdempiereServerMgr MLdapProcessor lp = ldapModels[i]; AdempiereServer server = AdempiereServer.create(lp); if (server != null) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-1); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-1); + m_servers.add(new ServerWrapper(server)); } } @@ -192,9 +194,9 @@ public class AdempiereServerMgr { for (AdempiereServer server : servers) { - server.start(); - server.setPriority(Thread.NORM_PRIORITY-1); - m_servers.add(server); +// server.start(); +// server.setPriority(Thread.NORM_PRIORITY-1); + m_servers.add(new ServerWrapper(server)); } } } @@ -220,47 +222,17 @@ public class AdempiereServerMgr public boolean startAll() { log.info (""); - AdempiereServer[] servers = getInActive(); + ServerWrapper[] servers = getInActive(); for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; try { - if (server.isAlive()) + if (server.scheduleFuture != null || !server.scheduleFuture.isDone()) continue; - // Wait until dead - if (server.isInterrupted()) - { - int maxWait = 10; // 10 iterations = 1 sec - while (server.isAlive()) - { - if (maxWait-- == 0) - { - log.severe ("Wait timeout for interruped " + server); - break; - } - try - { - Thread.sleep(100); // 1/10 sec - } - catch (InterruptedException e) - { - log.log(Level.SEVERE, "While sleeping", e); - } - } - } // Do start - if (!server.isAlive()) - { - // replace - server = AdempiereServer.create (server.getModel()); - if (server == null) - m_servers.remove(i); - else - m_servers.set(i, server); - server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - } + // replace + server.start(); } catch (Exception e) { @@ -273,10 +245,10 @@ public class AdempiereServerMgr int noStopped = 0; for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; try { - if (server.isAlive()) + if (server.scheduleFuture != null && !server.scheduleFuture.isDone()) { log.info("Alive: " + server); noRunning++; @@ -294,7 +266,6 @@ public class AdempiereServerMgr } } log.fine("Running=" + noRunning + ", Stopped=" + noStopped); - AdempiereServerGroup.get().dump(); return noStopped == 0; } // startAll @@ -305,24 +276,16 @@ public class AdempiereServerMgr */ public boolean start (String serverID) { - AdempiereServer server = getServer(serverID); + ServerWrapper server = getServer(serverID); if (server == null) return false; - if (server.isAlive()) + if (server.scheduleFuture != null && !server.scheduleFuture.isDone()) return true; try { // replace - int index = m_servers.indexOf(server); - server = AdempiereServer.create (server.getModel()); - if (server == null) - m_servers.remove(index); - else - m_servers.set(index, server); server.start(); - server.setPriority(Thread.NORM_PRIORITY-2); - Thread.yield(); } catch (Exception e) { @@ -330,10 +293,7 @@ public class AdempiereServerMgr return false; } log.info(server.toString()); - AdempiereServerGroup.get().dump(); - if (server == null) - return false; - return server.isAlive(); + return (server.scheduleFuture != null && !server.scheduleFuture.isDone()); } // startIt /** @@ -343,17 +303,16 @@ public class AdempiereServerMgr public boolean stopAll() { log.info (""); - AdempiereServer[] servers = getActive(); + ServerWrapper[] servers = getActive(); // Interrupt for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; try { - if (server.isAlive() && !server.isInterrupted()) + if (server.scheduleFuture != null && !server.scheduleFuture.isDone()) { - server.setPriority(Thread.MAX_PRIORITY-1); - server.interrupt(); + server.scheduleFuture.cancel(true); } } catch (Exception e) @@ -366,11 +325,11 @@ public class AdempiereServerMgr // Wait for death for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; try { int maxWait = 10; // 10 iterations = 1 sec - while (server.isAlive()) + while (server.scheduleFuture != null && !server.scheduleFuture.isDone()) { if (maxWait-- == 0) { @@ -391,10 +350,10 @@ public class AdempiereServerMgr int noStopped = 0; for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; try { - if (server.isAlive()) + if (server.scheduleFuture != null && !server.scheduleFuture.isDone()) { log.warning ("Alive: " + server); noRunning++; @@ -423,15 +382,15 @@ public class AdempiereServerMgr */ public boolean stop (String serverID) { - AdempiereServer server = getServer(serverID); + ServerWrapper server = getServer(serverID); if (server == null) return false; - if (!server.isAlive()) + if (server.scheduleFuture == null || server.scheduleFuture.isDone()) return true; try { - server.interrupt(); + server.scheduleFuture.cancel(true); Thread.sleep(10); // 1/100 sec } catch (Exception e) @@ -440,8 +399,7 @@ public class AdempiereServerMgr return false; } log.info(server.toString()); - AdempiereServerGroup.get().dump(); - return !server.isAlive(); + return (server.scheduleFuture == null || server.scheduleFuture.isDone()); } // stop @@ -459,16 +417,16 @@ public class AdempiereServerMgr * Get Active Servers * @return array of active servers */ - protected AdempiereServer[] getActive() + protected ServerWrapper[] getActive() { - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList(); for (int i = 0; i < m_servers.size(); i++) { - AdempiereServer server = (AdempiereServer)m_servers.get(i); - if (server != null && server.isAlive() && !server.isInterrupted()) + ServerWrapper server = (ServerWrapper)m_servers.get(i); + if (server != null && server.scheduleFuture != null && !server.scheduleFuture.isDone()) list.add (server); } - AdempiereServer[] retValue = new AdempiereServer[list.size ()]; + ServerWrapper[] retValue = new ServerWrapper[list.size ()]; list.toArray (retValue); return retValue; } // getActive @@ -477,16 +435,16 @@ public class AdempiereServerMgr * Get InActive Servers * @return array of inactive servers */ - protected AdempiereServer[] getInActive() + protected ServerWrapper[] getInActive() { - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList(); for (int i = 0; i < m_servers.size(); i++) { - AdempiereServer server = (AdempiereServer)m_servers.get(i); - if (server != null && (!server.isAlive() || !server.isInterrupted())) + ServerWrapper server = m_servers.get(i); + if (server != null && (server.scheduleFuture == null || server.scheduleFuture.isDone())) list.add (server); } - AdempiereServer[] retValue = new AdempiereServer[list.size()]; + ServerWrapper[] retValue = new ServerWrapper[list.size()]; list.toArray (retValue); return retValue; } // getInActive @@ -495,9 +453,9 @@ public class AdempiereServerMgr * Get all Servers * @return array of servers */ - public AdempiereServer[] getAll() + public ServerWrapper[] getAll() { - AdempiereServer[] retValue = new AdempiereServer[m_servers.size()]; + ServerWrapper[] retValue = new ServerWrapper[m_servers.size()]; m_servers.toArray (retValue); return retValue; } // getAll @@ -507,14 +465,14 @@ public class AdempiereServerMgr * @param serverID server id * @return server or null */ - public AdempiereServer getServer (String serverID) + public ServerWrapper getServer (String serverID) { if (serverID == null) return null; for (int i = 0; i < m_servers.size(); i++) { - AdempiereServer server = (AdempiereServer)m_servers.get(i); - if (serverID.equals(server.getServerID())) + ServerWrapper server = m_servers.get(i); + if (serverID.equals(server.server.getServerID())) return server; } return null; @@ -553,8 +511,8 @@ public class AdempiereServerMgr int noStopped = 0; for (int i = 0; i < m_servers.size(); i++) { - AdempiereServer server = (AdempiereServer)m_servers.get(i); - if (server.isAlive()) + ServerWrapper server = m_servers.get(i); + if (server.scheduleFuture != null && !server.scheduleFuture.isDone()) noRunning++; else noStopped++; @@ -574,4 +532,38 @@ public class AdempiereServerMgr return m_start; } // getStartTime + public static class ServerWrapper implements Runnable + { + + protected AdempiereServer server; + protected volatile ScheduledFuture scheduleFuture; + + public ServerWrapper(AdempiereServer server) { + this.server = server; + start(); + } + + public void start() { + scheduleFuture = Adempiere.getThreadPoolExecutor().schedule(this, server.getInitialNap() * 1000 + server.getSleepMS(), TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + server.run(); + scheduleFuture = Adempiere.getThreadPoolExecutor().schedule(this, server.getSleepMS(), TimeUnit.MILLISECONDS); + } + + public AdempiereServer getServer() { + return server; + } + + public boolean isAlive() { + return scheduleFuture != null && !scheduleFuture.isDone(); + } + + public boolean isInterrupted() { + return scheduleFuture != null && scheduleFuture.isCancelled(); + } + + } } // AdempiereServerMgr diff --git a/org.adempiere.server/src/main/server/org/compiere/server/AlertProcessor.java b/org.adempiere.server/src/main/server/org/compiere/server/AlertProcessor.java index 2591b9c3f7..2a3375af9c 100644 --- a/org.adempiere.server/src/main/server/org/compiere/server/AlertProcessor.java +++ b/org.adempiere.server/src/main/server/org/compiere/server/AlertProcessor.java @@ -454,8 +454,8 @@ public class AlertProcessor extends AdempiereServer Adempiere.startup(true); MAlertProcessor model = new MAlertProcessor (Env.getCtx(), 100, null); AlertProcessor ap = new AlertProcessor(model); - ap.start(); - + AdempiereServerMgr.ServerWrapper wrapper = new AdempiereServerMgr.ServerWrapper(ap); + wrapper.start(); } diff --git a/org.adempiere.server/src/main/servlet/org/compiere/web/AdempiereMonitor.java b/org.adempiere.server/src/main/servlet/org/compiere/web/AdempiereMonitor.java index 690fe7c908..f83eb0371a 100644 --- a/org.adempiere.server/src/main/servlet/org/compiere/web/AdempiereMonitor.java +++ b/org.adempiere.server/src/main/servlet/org/compiere/web/AdempiereMonitor.java @@ -20,7 +20,6 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.lang.management.ManagementFactory; @@ -65,9 +64,9 @@ import org.compiere.model.AdempiereProcessorLog; import org.compiere.model.MClient; import org.compiere.model.MStore; import org.compiere.model.MSystem; -import org.compiere.server.AdempiereServer; import org.compiere.server.AdempiereServerGroup; import org.compiere.server.AdempiereServerMgr; +import org.compiere.server.AdempiereServerMgr.ServerWrapper; import org.compiere.util.CLogFile; import org.compiere.util.CLogMgt; import org.compiere.util.CLogger; @@ -181,8 +180,8 @@ public class AdempiereMonitor extends HttpServlet return false; log.info ("ServerID=" + serverID); - AdempiereServer server = m_serverMgr.getServer(serverID); - if (server == null) + ServerWrapper server = m_serverMgr.getServer(serverID); + if (server == null || server.getServer() == null) { m_message = new p(); m_message.addElement(new strong("Server not found: ")); @@ -199,7 +198,7 @@ public class AdempiereMonitor extends HttpServlet para.addElement(link); b.addElement(para); // - b.addElement(new h2(server.getName())); + b.addElement(new h2(server.getServer().getName())); // table table = new table(); table.setBorder(1); @@ -216,7 +215,7 @@ public class AdempiereMonitor extends HttpServlet // line.addElement(new th().addElement("Description")); table.addElement(line); - AdempiereProcessorLog[] logs = server.getLogs(); + AdempiereProcessorLog[] logs = server.getServer().getLogs(); for (int i = 0; i < logs.length; i++) { AdempiereProcessorLog pLog = logs[i]; @@ -252,8 +251,8 @@ public class AdempiereMonitor extends HttpServlet return false; log.info ("ServerID=" + serverID); - AdempiereServer server = m_serverMgr.getServer(serverID); - if (server == null) + ServerWrapper server = m_serverMgr.getServer(serverID); + if (server == null || server.getServer() == null) { m_message = new p(); m_message.addElement(new strong("Server not found: ")); @@ -261,7 +260,7 @@ public class AdempiereMonitor extends HttpServlet return false; } // - server.runNow(); + server.getServer().runNow(); // return true; } // processRunParameter @@ -306,8 +305,8 @@ public class AdempiereMonitor extends HttpServlet this.createSummaryPage(request, response,true); m_dirAccessList = getDirAcessList(); } else { - AdempiereServer server = m_serverMgr.getServer(serverID); - if (server == null) { + ServerWrapper server = m_serverMgr.getServer(serverID); + if (server == null || server.getServer() == null) { m_message = new p(); m_message.addElement(new strong("Server not found: ")); m_message.addElement(serverID); @@ -317,7 +316,7 @@ public class AdempiereMonitor extends HttpServlet ok = m_serverMgr.start(serverID); else ok = m_serverMgr.stop(serverID); - m_message.addElement(server.getName()); + m_message.addElement(server.getServer().getName()); } } } @@ -622,13 +621,13 @@ public class AdempiereMonitor extends HttpServlet // ***** Server Links ***** bb.addElement(new hr()); para = new p(); - AdempiereServer[] servers = m_serverMgr.getAll(); + ServerWrapper[] servers = m_serverMgr.getAll(); for (int i = 0; i < servers.length; i++) { if (i > 0) para.addElement(new br()); - AdempiereServer server = servers[i]; - link = new a ("#" + server.getServerID(), server.getName()); + ServerWrapper server = servers[i]; + link = new a ("#" + server.getServer().getServerID(), server.getServer().getName()); para.addElement(link); font status = null; if (server.isAlive()) @@ -646,10 +645,10 @@ public class AdempiereMonitor extends HttpServlet bb.removeEndEndModifier(); for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; bb.addElement(new hr()); - bb.addElement(new a().setName(server.getServerID())); - bb.addElement(new h2(server.getName())); + bb.addElement(new a().setName(server.getServer().getServerID())); + bb.addElement(new h2(server.getServer().getName())); // table = new table(); table.setBorder(1); @@ -660,10 +659,8 @@ public class AdempiereMonitor extends HttpServlet if (server.isAlive()) { String msg = "Stop"; - if (server.isInterrupted()) - msg += " (Interrupted)"; - link = new a ("adempiereMonitor?Action=Stop_" + server.getServerID(), msg); - if (server.isSleeping()) + link = new a ("adempiereMonitor?Action=Stop_" + server.getServer().getServerID(), msg); + if (server.getServer().isSleeping()) { line.addElement(new th().addElement("Sleeping")); line.addElement(new td().addElement(link)); @@ -676,47 +673,45 @@ public class AdempiereMonitor extends HttpServlet table.addElement(line); line = new tr(); line.addElement(new th().addElement("Start - Elapsed")); - line.addElement(new td().addElement(WebEnv.getCellContent(server.getStartTime()) - + " - " + TimeUtil.formatElapsed(server.getStartTime()))); + line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getStartTime()) + + " - " + TimeUtil.formatElapsed(server.getServer().getStartTime()))); } else { String msg = "Start"; - if (server.isInterrupted()) - msg += " (Interrupted)"; line.addElement(new th().addElement("Not Started")); - link = new a ("adempiereMonitor?Action=Start_" + server.getServerID(), msg); + link = new a ("adempiereMonitor?Action=Start_" + server.getServer().getServerID(), msg); line.addElement(new td().addElement(link)); } table.addElement(line); // line = new tr(); line.addElement(new th().addElement("Description")); - line.addElement(new td().addElement(WebEnv.getCellContent(server.getDescription()))); + line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getDescription()))); table.addElement(line); // line = new tr(); line.addElement(new th().addElement("Last Run")); - line.addElement(new td().addElement(WebEnv.getCellContent(server.getDateLastRun()))); + line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getDateLastRun()))); table.addElement(line); line = new tr(); line.addElement(new th().addElement("Info")); - line.addElement(new td().addElement(WebEnv.getCellContent(server.getServerInfo()))); + line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getServerInfo()))); table.addElement(line); // line = new tr(); line.addElement(new th().addElement("Next Run")); td td = new td(); - td.addElement(WebEnv.getCellContent(server.getDateNextRun(false))); + td.addElement(WebEnv.getCellContent(server.getServer().getDateNextRun(false))); td.addElement(" - "); - link = new a ("adempiereMonitor?RunNow=" + server.getServerID(), "(Run Now)"); + link = new a ("adempiereMonitor?RunNow=" + server.getServer().getServerID(), "(Run Now)"); td.addElement(link); line.addElement(td); table.addElement(line); // line = new tr(); line.addElement(new th().addElement("Statistics")); - line.addElement(new td().addElement(server.getStatistics())); + line.addElement(new td().addElement(server.getServer().getStatistics())); table.addElement(line); // @@ -725,7 +720,7 @@ public class AdempiereMonitor extends HttpServlet link = new a ("#top", "Top"); bb.addElement(link); bb.addElement(" - "); - link = new a ("adempiereMonitor?Log=" + server.getServerID(), "Log"); + link = new a ("adempiereMonitor?Log=" + server.getServer().getServerID(), "Log"); bb.addElement(link); bb.addElement(" - "); link = new a ("adempiereMonitor", "Refresh"); @@ -787,29 +782,29 @@ public class AdempiereMonitor extends HttpServlet writer.print(m_serverMgr.getServerCount()); writer.println(""); - AdempiereServer[] servers = m_serverMgr.getAll(); + ServerWrapper[] servers = m_serverMgr.getAll(); for (int i = 0; i < servers.length; i++) { - AdempiereServer server = servers[i]; + ServerWrapper server = servers[i]; writer.println("\t\t"); writer.print("\t\t\t"); - writer.print(server.getServerID()); + writer.print(server.getServer().getServerID()); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getName()); + writer.print(server.getServer().getName()); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getDescription()); + writer.print(server.getServer().getDescription()); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getServerInfo()); + writer.print(server.getServer().getServerInfo()); writer.println(""); writer.print("\t\t\t"); if (server.isAlive()) { if (server.isInterrupted()) writer.print("Interrupted"); - else if (server.isSleeping()) + else if (server.getServer().isSleeping()) writer.print("Sleeping"); else writer.print("Running"); @@ -818,16 +813,16 @@ public class AdempiereMonitor extends HttpServlet writer.print("Stopped"); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getStartTime()); + writer.print(server.getServer().getStartTime()); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getDateLastRun()); + writer.print(server.getServer().getDateLastRun()); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getDateNextRun(false)); + writer.print(server.getServer().getDateNextRun(false)); writer.println(""); writer.print("\t\t\t"); - writer.print(server.getStatistics()); + writer.print(server.getServer().getStatistics()); writer.println(""); writer.println("\t\t"); } diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/dashboard/DashboardRunnable.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/dashboard/DashboardRunnable.java index eabe8e82a6..eab74678a7 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/dashboard/DashboardRunnable.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/dashboard/DashboardRunnable.java @@ -21,15 +21,12 @@ import java.util.Properties; import java.util.logging.Level; import org.adempiere.util.ServerContext; -import org.adempiere.webui.AdempiereWebUI; import org.adempiere.webui.desktop.IDesktop; import org.adempiere.webui.session.SessionContextListener; import org.adempiere.webui.util.ServerPushTemplate; -import org.compiere.model.MSysConfig; import org.compiere.util.CLogger; import org.zkoss.util.Locales; import org.zkoss.zk.ui.Desktop; -import org.zkoss.zk.ui.DesktopUnavailableException; import org.zkoss.zk.ui.event.Events; /** @@ -44,7 +41,6 @@ public class DashboardRunnable implements Runnable, Serializable private static final long serialVersionUID = 5995227773511788894L; private Desktop desktop; - private boolean stop = false; private List dashboardPanels; private IDesktop appDesktop; private Locale locale; @@ -71,78 +67,12 @@ public class DashboardRunnable implements Runnable, Serializable } public void run() - { - // default Update every one minutes - int interval = MSysConfig.getIntValue(MSysConfig.ZK_DASHBOARD_REFRESH_INTERVAL, 60000); - int cumulativeFailure = 0; - while(!stop) { - try { - Thread.sleep(interval); - } catch (InterruptedException e1) { - if (stop) break; - } - - if (desktop.isAlive()) { - Locales.setThreadLocal(locale); - try { - refreshDashboard(); - cumulativeFailure = 0; - } catch (DesktopUnavailableException de) { - cumulativeFailure++; - } catch (Exception e) { - logger.log(Level.INFO, e.getLocalizedMessage(), (e.getCause() != null ? e.getCause() : e)); - cumulativeFailure++; - } - if (cumulativeFailure > 3) - break; - } else { - logger.log(Level.INFO, "Desktop destroy, will kill session."); - killSession(); - break; - } - } - } - - private void killSession() { - if (desktop.getSession() != null && desktop.getSession().getNativeSession() != null) - { - //differentiate between real destroy and refresh - try - { - Thread.sleep(90000); - } - catch (InterruptedException e) - { - try - { - desktop.getSession().getAttributes().clear(); - desktop.getSession().invalidate(); - } - catch (Exception e1) {} - return; - } - - try - { - Object sessionObj = desktop.getSession().getAttribute(AdempiereWebUI.ZK_DESKTOP_SESSION_KEY); - if (sessionObj != null && sessionObj instanceof Desktop) - { - Desktop sessionDesktop = (Desktop) sessionObj; - - //don't destroy session if it have been attached to another desktop ( refresh will do that ) - if (sessionDesktop == desktop) - { - desktop.getSession().getAttributes().clear(); - desktop.getSession().invalidate(); - } - } - else - { - desktop.getSession().getAttributes().clear(); - desktop.getSession().invalidate(); - } - } - catch (Exception e1) {} + { + Locales.setThreadLocal(locale); + try { + refreshDashboard(); + } catch (Exception e) { + logger.log(Level.INFO, e.getLocalizedMessage(), (e.getCause() != null ? e.getCause() : e)); } } @@ -184,10 +114,6 @@ public class DashboardRunnable implements Runnable, Serializable } } - public void stop() { - stop = true; - } - /** * Add DashboardPanel to the auto refresh list * @param dashboardPanel diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DashboardController.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DashboardController.java index 5bdc7a0179..744e1b2168 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DashboardController.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DashboardController.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.adempiere.webui.apps.AEnv; @@ -38,6 +40,7 @@ import org.adempiere.webui.report.HTMLExtension; import org.adempiere.webui.session.SessionManager; import org.adempiere.webui.window.FDialog; import org.adempiere.webui.window.ZkReportViewerProvider; +import org.compiere.Adempiere; import org.compiere.model.I_AD_Menu; import org.compiere.model.MDashboardContent; import org.compiere.model.MDashboardPreference; @@ -48,6 +51,7 @@ import org.compiere.model.MPInstancePara; import org.compiere.model.MProcess; import org.compiere.model.MQuery; import org.compiere.model.MRole; +import org.compiere.model.MSysConfig; import org.compiere.model.MTable; import org.compiere.print.ReportEngine; import org.compiere.process.ProcessInfo; @@ -91,8 +95,8 @@ public class DashboardController implements EventListener { private List columnList = new ArrayList(); private Anchorlayout dashboardLayout; private Anchorchildren maximizedHolder; - private Thread dashboardThread; private DashboardRunnable dashboardRunnable; + private ScheduledFuture dashboardFuture; public DashboardController() { dashboardLayout = new Anchorlayout(); @@ -406,9 +410,9 @@ public class DashboardController implements EventListener { { dashboardRunnable.refreshDashboard(); - dashboardThread = new Thread(dashboardRunnable, "UpdateInfo"); - dashboardThread.setDaemon(true); - dashboardThread.start(); + // default Update every one minutes + int interval = MSysConfig.getIntValue(MSysConfig.ZK_DASHBOARD_REFRESH_INTERVAL, 60000); + dashboardFuture = Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(dashboardRunnable, interval, interval, TimeUnit.MILLISECONDS); } } @@ -620,15 +624,14 @@ public class DashboardController implements EventListener { * @param appDesktop */ public void onSetPage(Page page, Desktop desktop, IDesktop appDesktop) { - if (dashboardThread != null && dashboardThread.isAlive()) { - dashboardRunnable.stop(); - dashboardThread.interrupt(); + if (dashboardFuture != null && !dashboardFuture.isDone()) { + dashboardFuture.cancel(true); DashboardRunnable tmp = dashboardRunnable; dashboardRunnable = new DashboardRunnable(tmp, desktop, appDesktop); - dashboardThread = new Thread(dashboardRunnable, "UpdateInfo"); - dashboardThread.setDaemon(true); - dashboardThread.start(); + // default Update every one minutes + int interval = MSysConfig.getIntValue(MSysConfig.ZK_DASHBOARD_REFRESH_INTERVAL, 60000); + dashboardFuture = Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(dashboardRunnable, interval, interval, TimeUnit.MILLISECONDS); } } @@ -636,9 +639,8 @@ public class DashboardController implements EventListener { * clean up for logout */ public void onLogOut() { - if (dashboardThread != null && dashboardThread.isAlive()) { - dashboardRunnable.stop(); - dashboardThread.interrupt(); + if (dashboardFuture != null && !dashboardFuture.isDone()) { + dashboardFuture.cancel(true); } }