diff --git a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java index 8ae86a59c2..5bdc567d72 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java +++ b/org.adempiere.ui.zk/WEB-INF/src/fi/jawsy/jawwa/zk/atmosphere/AtmosphereServerPush.java @@ -9,8 +9,11 @@ import org.zkoss.lang.Library; import org.zkoss.zk.au.out.AuScript; import org.zkoss.zk.ui.Desktop; import org.zkoss.zk.ui.DesktopUnavailableException; +import org.zkoss.zk.ui.Executions; +import org.zkoss.zk.ui.UiException; import org.zkoss.zk.ui.event.Event; import org.zkoss.zk.ui.event.EventListener; +import org.zkoss.zk.ui.impl.ExecutionCarryOver; import org.zkoss.zk.ui.sys.DesktopCtrl; import org.zkoss.zk.ui.sys.Scheduler; import org.zkoss.zk.ui.sys.ServerPush; @@ -31,6 +34,10 @@ public class AtmosphereServerPush implements ServerPush { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final AtomicReference resource = new AtomicReference(); private final int timeout; + + private ThreadInfo _active; + private ExecutionCarryOver _carryOver; + private final Object _mutex = new Object(); public AtmosphereServerPush() { String timeoutString = Library.getProperty("fi.jawsy.jawwa.zk.atmosphere.timeout"); @@ -43,7 +50,47 @@ public class AtmosphereServerPush implements ServerPush { @Override public boolean activate(long timeout) throws InterruptedException, DesktopUnavailableException { - throw new UnsupportedOperationException("activate is not supported by AtmosphereServerPush"); + final Thread curr = Thread.currentThread(); + if (_active != null && _active.thread.equals(curr)) { //re-activate + ++_active.nActive; + return true; + } + + final ThreadInfo info = new ThreadInfo(curr); + + EventListener task = new EventListener() { + @Override + public void onEvent(Event event) throws Exception { + if (event.getName().equals("onNewData")) + { + synchronized (_mutex) { + _carryOver = new ExecutionCarryOver(desktop.get()); + + synchronized (info) { + info.nActive = 1; //granted + info.notify(); + } + + try { + _mutex.wait(); //wait until the server push is done + } catch (InterruptedException ex) { + throw UiException.Aide.wrap(ex); + } + } + } + } + }; + + synchronized (info) { + Executions.schedule(desktop.get(), task, new Event("onNewData")); + if (info.nActive == 0) + info.wait(timeout <= 0 ? 10*60*1000: timeout); + } + + _carryOver.carryOver(); + _active = info; + + return true; } public void clearResource(AtmosphereResource resource) { @@ -59,7 +106,26 @@ public class AtmosphereServerPush implements ServerPush { @Override public boolean deactivate(boolean stop) { - throw new UnsupportedOperationException("deactivate is not supported by AtmosphereServerPush"); + boolean stopped = false; + if (_active != null && Thread.currentThread().equals(_active.thread)) { + if (--_active.nActive <= 0) { + if (stop) + { + stop(); + stopped = true; + } + + _carryOver.cleanup(); + _carryOver = null; + _active.nActive = 0; //just in case + _active = null; + + synchronized (_mutex) { + _mutex.notify(); + } + } + } + return stopped; } @Override @@ -70,6 +136,13 @@ public class AtmosphereServerPush implements ServerPush { @Override public void onPiggyback() { + Desktop desktop = this.desktop.get(); + if (desktop == null) { + return; + } + + if (Executions.getCurrent() != null && _carryOver == null) + _carryOver = new ExecutionCarryOver(desktop); } @Override @@ -127,4 +200,16 @@ public class AtmosphereServerPush implements ServerPush { this.resource.set(null); } } + + private static class ThreadInfo { + private final Thread thread; + /** # of activate() was called. */ + private int nActive; + private ThreadInfo(Thread thread) { + this.thread = thread; + } + public String toString() { + return "[" + thread + ',' + nActive + ']'; + } + } }