diff --git a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java index b6cdf89995..8ae86a59c2 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java +++ b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java @@ -1,26 +1,20 @@ package fi.jawsy.jawwa.zk.atmosphere; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.atmosphere.cpr.AtmosphereResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.zkoss.lang.Library; -import org.zkoss.util.logging.Log; import org.zkoss.zk.au.out.AuScript; import org.zkoss.zk.ui.Desktop; import org.zkoss.zk.ui.DesktopUnavailableException; -import org.zkoss.zk.ui.Execution; -import org.zkoss.zk.ui.Executions; -import org.zkoss.zk.ui.UiException; import org.zkoss.zk.ui.event.Event; import org.zkoss.zk.ui.event.EventListener; -import org.zkoss.zk.ui.impl.ExecutionCarryOver; import org.zkoss.zk.ui.sys.DesktopCtrl; import org.zkoss.zk.ui.sys.Scheduler; import org.zkoss.zk.ui.sys.ServerPush; import org.zkoss.zk.ui.util.Clients; -import org.zkoss.zk.ui.util.Configuration; /** * ZK server push implementation based on Atmosphere. @@ -29,318 +23,101 @@ import org.zkoss.zk.ui.util.Configuration; * (Executions.activate/deactivate) is attempted. */ public class AtmosphereServerPush implements ServerPush { - private static final Log log = Log.lookup(AtmosphereServerPush.class); - /** Denote a server-push thread gives up the activation (timeout). */ - private static final int GIVEUP = -99; - private Desktop _desktop; - /** List of ThreadInfo. */ - private final List _pending = new LinkedList(); - /** The active thread. */ - private ThreadInfo _active; - /** The info to carray over from onPiggyback to the server-push thread. */ - private ExecutionCarryOver _carryOver; -// private final int _min, _max, _factor; - /** A mutex that is used by this object to wait for the server-push thread - * to complete. - */ - private final Object _mutex = new Object(); - - public static final int DEFAULT_TIMEOUT = 1000 * 60 * 5; - private final AtomicReference resource = new AtomicReference(); - private final int timeout; - - public AtmosphereServerPush() { -// this(-1, -1, -1); -// } -// -// public AtmosphereServerPush(int min, int max, int factor) { -// _min = min; -// _max = max; -// _factor = factor; - - String timeoutString = Library.getProperty("fi.jawsy.jawwa.zk.atmosphere.timeout"); - if (timeoutString == null || timeoutString.length() == 0) { + public static final int DEFAULT_TIMEOUT = 1000 * 60 * 5; + + private final AtomicReference desktop = new AtomicReference(); + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + private final AtomicReference resource = new AtomicReference(); + private final int timeout; + + public AtmosphereServerPush() { + String timeoutString = Library.getProperty("fi.jawsy.jawwa.zk.atmosphere.timeout"); + if (timeoutString == null || timeoutString.trim().length() == 0) { timeout = DEFAULT_TIMEOUT; } else { timeout = Integer.valueOf(timeoutString); } - } - - protected void startClientPush() { -// Clients.response("zk.clientpush", new AuScript(null, getStartScript())); - Clients.response("jawwa.atmosphere.serverpush", new AuScript(null, getStartScript())); - } - - protected void stopClientPush() { -// Clients.response("zk.clientpush", new AuScript(null, getStopScript())); - Clients.response("jawwa.atmosphere.serverpush", new AuScript(null, getStopScript())); - } - - protected String getStartScript() { -// final String start = _desktop.getWebApp().getConfiguration() -// .getPreference("PollingServerPush.start", null); -// if (start != null) -// return start; -// -// final StringBuffer sb = new StringBuffer(128) -// .append("zk.load('zk.cpsp');zk.afterLoad(function(){zk.cpsp.start('") -// .append(_desktop.getId()).append('\''); -// -// final int min = _min > 0 ? _min: getIntPref("PollingServerPush.delay.min"), -// max = _max > 0 ? _max: getIntPref("PollingServerPush.delay.max"), -// factor = _factor > 0 ? _factor: getIntPref("PollingServerPush.delay.factor"); -// if (min > 0 || max > 0 || factor > 0) -// sb.append(',').append(min).append(',').append(max) -// .append(',').append(factor); -// -// return sb.append(");});").toString(); - - int clientTimeout = timeout + 1000 * 60; - return "jawwa.atmosphere.startServerPush('" + _desktop.getId() + "', " + clientTimeout + ");"; - } - -// private int getIntPref(String key) { -// final String s = _desktop.getWebApp().getConfiguration() -// .getPreference(key, null); -// if (s != null) { -// try { -// return Integer.parseInt(s); -// } catch (NumberFormatException ex) { -// log.warning("Not a number specified at "+key); -// } -// } -// return -1; -// } - - protected String getStopScript() { -// final String stop = _desktop.getWebApp().getConfiguration() -// .getPreference("PollingServerPush.stop", null); -// return stop != null ? stop: -// "zk.cpsp.stop('" + _desktop.getId() + "');"; - return "jawwa.atmosphere.stopServerPush('" + _desktop.getId() + "');"; - } - - @Override - public boolean isActive() { - return _active != null && _active.nActive > 0; - } - - @Override - public void start(Desktop desktop) { - if (_desktop != null) { - log.warning("Ignored: Sever-push already started"); - return; - } + } - _desktop = desktop; - startClientPush(); - } - - @Override - public void stop() { - if (_desktop == null) { - log.warning("Ignored: Sever-push not started"); - return; - } - - final Execution exec = Executions.getCurrent(); - final boolean inexec = exec != null && exec.getDesktop() == _desktop; - //it might be caused by DesktopCache expunge (when creating another desktop) - try { - if (inexec && _desktop.isAlive()) //Bug 1815480: don't send if timeout - stopClientPush(); - commitResponse(); - } finally { - _desktop = null; //to cause DesktopUnavailableException being thrown - wakePending(); - - //if inexec, either in working thread, or other event listener - //if in working thread, we cannot notify here (too early to wake). - //if other listener, no need notify (since onPiggyback not running) - if (!inexec) { - synchronized (_mutex) { - _mutex.notify(); //wake up onPiggyback - } - } - } - } - - private void wakePending() { - synchronized (_pending) { - for (ThreadInfo info: _pending) { - synchronized (info) { - info.notify(); - } - } - _pending.clear(); - } - } - - @Override - public void onPiggyback() { - final Configuration config = _desktop.getWebApp().getConfiguration(); - long tmexpired = 0; - for (int cnt = 0; !_pending.isEmpty();) { - //Don't hold the client too long. - //In addition, an ill-written code might activate again - //before onPiggyback returns. It causes dead-loop in this case. - if (tmexpired == 0) { //first time - tmexpired = System.currentTimeMillis() - + (config.getMaxProcessTime() >> 1); - cnt = _pending.size() + 3; - } else if (--cnt < 0 || System.currentTimeMillis() > tmexpired) { - break; - } - - final ThreadInfo info; - synchronized (_pending) { - if (_pending.isEmpty()) - return; //nothing to do - info = _pending.remove(0); - } - - //Note: we have to sync _mutex before info. Otherwise, - //sync(info) might cause deactivate() to run before _mutex.wait - synchronized (_mutex) { - _carryOver = new ExecutionCarryOver(_desktop); - - synchronized (info) { - if (info.nActive == GIVEUP) - continue; //give up and try next - info.nActive = 1; //granted - info.notify(); - } - - if (_desktop == null) //just in case - break; - - try { - _mutex.wait(); //wait until the server push is done - } catch (InterruptedException ex) { - throw UiException.Aide.wrap(ex); - } - } - } - } - - @Override - public - void schedule(EventListener listener, T event, Scheduler scheduler) { - scheduler.schedule(listener, event); //delegate back - commitResponse(); - } - - @Override - public boolean activate(long timeout) throws InterruptedException, DesktopUnavailableException { - final Thread curr = Thread.currentThread(); - if (_active != null && _active.thread.equals(curr)) { //re-activate - ++_active.nActive; - return true; - } - - final ThreadInfo info = new ThreadInfo(curr); - synchronized (_pending) { - if (_desktop != null) - _pending.add(info); - } - - boolean loop; - do { - loop = false; - synchronized (info) { - if (_desktop != null) { - if (info.nActive == 0) //not granted yet - info.wait(timeout <= 0 ? 10*60*1000: timeout); - - if (info.nActive <= 0) { //not granted - boolean bTimeout = timeout > 0; - boolean bDead = _desktop == null || !_desktop.isAlive(); - if (bTimeout || bDead) { //not timeout - info.nActive = GIVEUP; //denote timeout (and give up) - synchronized (_pending) { //undo pending - _pending.remove(info); - } - - if (bDead) - throw new DesktopUnavailableException("Stopped"); - return false; //timeout - } - - log.debug("Executions.activate() took more than 10 minutes"); - loop = true; //try again - } - } - } - } while (loop); - - if (_desktop == null) - throw new DesktopUnavailableException("Stopped"); - - _carryOver.carryOver(); - _active = info; - return true; - - //Note: we don't mimic inEventListener since 1) ZK doesn't assume it - //2) Window depends on it - } - @Override - public boolean deactivate(boolean stop) { - boolean stopped = false; - if (_active != null && - Thread.currentThread().equals(_active.thread)) { - if (--_active.nActive <= 0) { - if (stop) - stopClientPush(); + public boolean activate(long timeout) throws InterruptedException, DesktopUnavailableException { + throw new UnsupportedOperationException("activate is not supported by AtmosphereServerPush"); + } - _carryOver.cleanup(); - _carryOver = null; - _active.nActive = 0; //just in case - _active = null; - - if (stop) { - wakePending(); - _desktop = null; - stopped = true; - } - - //wake up onPiggyback - synchronized (_mutex) { - _mutex.notify(); - } - - try {Thread.sleep(100);} catch (Throwable ex) {} - //to minimize the chance that the server-push thread - //activate again, before onPiggback polls next _pending - } - } - return stopped; - } - public void clearResource(AtmosphereResource resource) { this.resource.compareAndSet(resource, null); } - + private void commitResponse() { - AtmosphereResource resource = this.resource.getAndSet(null); + AtmosphereResource resource = this.resource.getAndSet(null); if (resource != null) { resource.resume(); } } - + + @Override + public boolean deactivate(boolean stop) { + throw new UnsupportedOperationException("deactivate is not supported by AtmosphereServerPush"); + } + + @Override + public boolean isActive() { +// throw new UnsupportedOperationException("isActive is not supported by AtmosphereServerPush"); + return true; + } + + @Override + public void onPiggyback() { + } + + @Override + public void schedule(EventListener task, T event, + Scheduler scheduler) { + scheduler.schedule(task, event); + commitResponse(); + } + + @Override + public void start(Desktop desktop) { + Desktop oldDesktop = this.desktop.getAndSet(desktop); + if (oldDesktop != null) { + log.warn("Server push already started for desktop " + desktop.getId()); + return; + } + + log.debug("Starting server push for " + desktop); + int clientTimeout = timeout + 1000 * 60; + Clients.response("jawwa.atmosphere.serverpush", new AuScript(null, "jawwa.atmosphere.startServerPush('" + desktop.getId() + "', " + clientTimeout + + ");")); + } + + @Override + public void stop() { + Desktop desktop = this.desktop.getAndSet(null); + if (desktop == null) { + log.warn("Server push hasn't been started or has already stopped"); + return; + } + + log.debug("Stopping server push for " + desktop); + Clients.response("jawwa.atmosphere.serverpush", new AuScript(null, "jawwa.atmosphere.stopServerPush('" + desktop.getId() + "');")); + commitResponse(); + } + public void updateResource(AtmosphereResource resource) { commitResponse(); boolean shouldSuspend = true; - if (_desktop == null) { + Desktop desktop = this.desktop.get(); + if (desktop == null) { return; } - - if (_desktop instanceof DesktopCtrl) - { - DesktopCtrl desktopCtrl = (DesktopCtrl) _desktop; - shouldSuspend = !desktopCtrl.scheduledServerPush(); + + if (desktop instanceof DesktopCtrl) { + DesktopCtrl desktopCtrl = (DesktopCtrl) desktop; + shouldSuspend = !desktopCtrl.scheduledServerPush(); } if (shouldSuspend) { @@ -350,17 +127,4 @@ public class AtmosphereServerPush implements ServerPush { this.resource.set(null); } } - - private static class ThreadInfo { - private final Thread thread; - /** # of activate() was called. */ - private int nActive; - private ThreadInfo(Thread thread) { - this.thread = thread; - } - public String toString() { - return "[" + thread + ',' + nActive + ']'; - } - } - } diff --git a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/Either.java b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/Either.java new file mode 100644 index 0000000000..55260cdcf4 --- /dev/null +++ b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/Either.java @@ -0,0 +1,19 @@ +package fi.jawsy.jawwa.zk.atmosphere; + +public class Either { + private L left; + private R right; + + public Either(L l, R r) { + left = l; + right = r; + } + + public L getLeftValue() { + return left; + } + + public R getRightValue() { + return right; + } +} diff --git a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/ZkAtmosphereHandler.java b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/ZkAtmosphereHandler.java index d93d3b6a1b..5d0a7b2399 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/ZkAtmosphereHandler.java +++ b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/ZkAtmosphereHandler.java @@ -2,6 +2,7 @@ package fi.jawsy.jawwa.zk.atmosphere; import java.io.IOException; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.atmosphere.cpr.AtmosphereHandler; @@ -11,110 +12,109 @@ import org.atmosphere.cpr.AtmosphereResourceEvent; import org.atmosphere.cpr.AtmosphereResponse; import org.zkoss.zk.ui.Desktop; import org.zkoss.zk.ui.Session; -import org.zkoss.zk.ui.WebApp; import org.zkoss.zk.ui.http.WebManager; import org.zkoss.zk.ui.sys.DesktopCtrl; -import org.zkoss.zk.ui.sys.ServerPush; import org.zkoss.zk.ui.sys.WebAppCtrl; /** * Atmosphere handler that integrates Atmosphere with ZK server push. */ public class ZkAtmosphereHandler implements AtmosphereHandler { - private String err; - - private AtmosphereServerPush getServerPush(AtmosphereResource resource) { - AtmosphereRequest request = resource.getRequest(); - Session session = WebManager.getSession(resource.getAtmosphereConfig().getServletContext(), request, false); - - if (session == null) - { - err = "Could not find session"; - return null; - } - else + @Override + public void destroy() { + } + + private Either getDesktop(Session session, String dtid) { + if (session.getWebApp() instanceof WebAppCtrl) { + WebAppCtrl webAppCtrl = (WebAppCtrl) session.getWebApp(); + Desktop desktop = webAppCtrl.getDesktopCache(session).getDesktopIfAny(dtid); + return new Either("Could not find desktop", desktop); + } + return new Either("Webapp does not implement WebAppCtrl", null); + } + + private Either getDesktopId(HttpServletRequest request) { + String dtid = request.getParameter("dtid"); + return new Either(dtid, "Could not find desktop id"); + } + + private Either getServerPush(AtmosphereResource resource) { + AtmosphereRequest request = resource.getRequest(); + + Either sessionEither = getSession(resource, request); + if (sessionEither.getRightValue() == null) { + return new Either(sessionEither.getLeftValue(), null); + } + Session session = sessionEither.getRightValue(); { - String desktopId = request.getParameter("dtid"); - if (desktopId == null || desktopId.length() == 0) - { - err = "Could not find desktop id"; - return null; - } - - WebApp webApp = session.getWebApp(); - if (webApp instanceof WebAppCtrl) - { - WebAppCtrl webAppCtrl = (WebAppCtrl) webApp; - Desktop desktop = webAppCtrl.getDesktopCache(session).getDesktopIfAny(desktopId); - if (desktop == null) - { - err = "Could not find desktop"; - return null; - } - - if (desktop instanceof DesktopCtrl) - { - DesktopCtrl desktopCtrl = (DesktopCtrl) desktop; - - ServerPush serverPush = desktopCtrl.getServerPush(); - if (serverPush == null) - { - err = "Server push is not enabled"; - return null; - } - - if (desktopCtrl.getServerPush() instanceof AtmosphereServerPush) - { - AtmosphereServerPush atmosphereServerPush = (AtmosphereServerPush) serverPush; - if (atmosphereServerPush != null) - return atmosphereServerPush; - } - - err = "Server push implementation is not AtmosphereServerPush"; - return null; - } - - err = "Desktop does not implement DesktopCtrl"; - return null; + Either dtidEither = getDesktopId(request); + if (dtidEither.getLeftValue() == null || dtidEither.getLeftValue().trim().length() == 0) { + return new Either(dtidEither.getRightValue(), null); + } + + String dtid = dtidEither.getLeftValue(); + { + Either desktopEither = getDesktop(session, dtid); + if (desktopEither.getRightValue() == null) { + return new Either (desktopEither.getLeftValue(), null); + } + + Desktop desktop = desktopEither.getRightValue(); + return getServerPush(desktop); } - - err = "Webapp does not implement WebAppCtrl"; - return null; } } - - @Override - public void onRequest(AtmosphereResource resource) throws IOException { - AtmosphereResponse response = resource.getResponse(); + + private Either getServerPush(Desktop desktop) { + if (desktop instanceof DesktopCtrl) { + DesktopCtrl desktopCtrl = (DesktopCtrl) desktop; + if (desktopCtrl.getServerPush() == null) + return new Either("Server push is not enabled", null); + if (desktopCtrl.getServerPush() instanceof AtmosphereServerPush) { + return new Either(null, (AtmosphereServerPush) desktopCtrl.getServerPush()); + } + return new Either("Server push implementation is not AtmosphereServerPush", null); + } + return new Either("Desktop does not implement DesktopCtrl", null); + } + + private Either getSession(AtmosphereResource resource, HttpServletRequest request) { + Session session = WebManager.getSession(resource.getAtmosphereConfig().getServletContext(), request, false); + if (session == null) + return new Either("Could not find session", null); + else + return new Either(null, session); + } + + @Override + public void onRequest(AtmosphereResource resource) throws IOException { + AtmosphereResponse response = resource.getResponse(); response.setContentType("text/plain"); - AtmosphereServerPush serverPush = getServerPush(resource); - if (serverPush == null) - { + Either serverPushEither = getServerPush(resource); + String error = serverPushEither.getLeftValue(); + if (error != null && serverPushEither.getRightValue() == null) { response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - response.getWriter().write(err); - return; + response.getWriter().write(error); + return; } - - serverPush.updateResource(resource); - } - @Override - public void onStateChange(AtmosphereResourceEvent event) throws IOException { - AtmosphereResource resource = event.getResource(); + AtmosphereServerPush serverPush = serverPushEither.getRightValue(); + serverPush.updateResource(resource); + } + + @Override + public void onStateChange(AtmosphereResourceEvent event) throws IOException { + AtmosphereResource resource = event.getResource(); if (event.isCancelled() || event.isResumedOnTimeout()) { - AtmosphereServerPush serverPush = getServerPush(resource); - if (serverPush != null) + AtmosphereServerPush serverPush = getServerPush(resource).getRightValue(); + if (serverPush != null) { serverPush.clearResource(resource); + } } - } - - @Override - public void destroy() { - - } + } } diff --git a/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml b/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml index ac27fe4f25..533f8f9d21 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml +++ b/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml @@ -38,6 +38,6 @@ Copyright (C) 2007 Ashley G Ramdass (ADempiere WebUI). - + diff --git a/org.adempiere.ui.zk/WEB-INF/src/web/js/jawwa/atmosphere/serverpush.js b/org.adempiere.ui.zk/WEB-INF/src/web/js/jawwa/atmosphere/serverpush.js index 0f24ca8cab..fb21c3b571 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/web/js/jawwa/atmosphere/serverpush.js +++ b/org.adempiere.ui.zk/WEB-INF/src/web/js/jawwa/atmosphere/serverpush.js @@ -47,9 +47,9 @@ data: { dtid: this.desktop.id }, - accepts: "text/plain", - dataType: "text/plain", + dataType: "", timeout: me.timeout, + transport : 'long-polling', error: function(jqxhr, textStatus, errorThrown) { me.failures += 1; me._schedule(); diff --git a/org.adempiere.ui.zk/WEB-INF/web.xml b/org.adempiere.ui.zk/WEB-INF/web.xml index 50bef1871a..04b7e432a7 100644 --- a/org.adempiere.ui.zk/WEB-INF/web.xml +++ b/org.adempiere.ui.zk/WEB-INF/web.xml @@ -1,6 +1,9 @@ - - ADempiere WebUI + + iDempiere Web Client AtmosphereServlet