Products | Versions |
---|---|
TIBCO Streaming | - |
How can I tell if there is queuing in a StreamBase server, or when the server has become quiet after data input has stopped?
2018-01-04 10:14:44.105-0500 [main] INFO com.sb.support.QueueZero - new snapshot 2018-01-04 10:14:44.106-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 20445 2018-01-04 10:14:45.110-0500 [main] INFO com.sb.support.QueueZero - new snapshot 2018-01-04 10:14:45.111-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 1545 2018-01-04 10:14:46.106-0500 [main] INFO com.sb.support.QueueZero - new snapshot 2018-01-04 10:14:46.106-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl.DataRefresh:ParallelSequence 1 2018-01-04 10:14:46.107-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 3 2018-01-04 10:14:46.107-0500 [main] INFO com.sb.support.QueueZero - All queues low, exiting.
package com.sb.support; import java.util.*; import org.slf4j.*; import com.streambase.sb.StreamBaseException; import com.streambase.sb.client.*; import com.streambase.sb.monitor.*; /* * QueueZero monitors all concurrent region queues for the SB instance running at the provided URL. * When all monitored queues have fewer than 20 elements, the server is expected to quiet in the next second. * When the server is sufficiently quiet (caught up with all data) then QueueZero exits normally. * This allows QueueZero to pause a script until the target SB server is quiet, then continue the script. * * Run as: java -jar QueueZero.jar * * This is provided AS IS by TIBCO Support as an example with no guarantee of correctness for any specific application. * Use at your own risk. * * Author: Greg Buhtz, TIBCO Software, 2018 */ class MyListener implements MonitorListener { StreamBaseMonitor mon; final static private Logger logger = LoggerFactory .getLogger(QueueZero.class); @Override public void snapshotReceived(Snapshot monSnap) { // write to confirm continued function logger.info("new snapshot"); Iterator<ModuleInfo> mods = monSnap.moduleInfos(); boolean quit = true; // loop through reported modules (containers and concurrent regions) while (mods.hasNext()) { ModuleInfo mi = monSnap.getModuleInfo(mods.next().getName()); // loop through queues for (ModuleInfo.QueueInfo q: mi.queues()) { // filter for main region queue (ignores aliases) // note: add additional filters here if specific queues are to be ignored if (q.getName().contains("ParallelSequence")) { int len = q.getCurrentQueueLength(); // report queue size for non-zero queues to assist diagnostics if (len>0) { logger.info(String.format("%s %d",q.getName(),q.getCurrentQueueLength())); } // Do not exit if any queue is large enough that continuous processing will continue // Expectation is that any queue with fewer than 20 elements will be drawn down to zero in under one second // resulting in less than 100% cpu use and that it is safe to proceed with 'sbadmin' commands. if (len > 20) { quit = false; } } } } if (quit) { logger.info("All queues low, exiting."); mon.terminate(); } } MyListener(StreamBaseMonitor mon) { this.mon = mon; } } public class QueueZero { private final static String SB_URI = "sb://localhost:10000"; private static StreamBaseMonitor mon; public static void main(String[] args) { MyListener ml; try { StreamBaseURI uri = null; if (args.length > 0) { uri = new StreamBaseURI(new String(args[0])); } else uri = new StreamBaseURI(SB_URI); mon = new StreamBaseMonitor(uri); ml = new MyListener(mon); mon.addMonitorListener(ml); mon.run(); } catch (StreamBaseException e) { System.out.println("QueueZero: Exception: " + e.getMessage()); //e.printStackTrace(); } finally { if (mon != null) { mon.close(); } } } }