| Products | Versions |
|---|---|
| TIBCO Streaming | - |
How can I tell if there is queuing in a StreamBase server, or when the server has become quiet after data input has stopped?
2018-01-04 10:14:44.105-0500 [main] INFO com.sb.support.QueueZero - new snapshot 2018-01-04 10:14:44.106-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 20445 2018-01-04 10:14:45.110-0500 [main] INFO com.sb.support.QueueZero - new snapshot 2018-01-04 10:14:45.111-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 1545 2018-01-04 10:14:46.106-0500 [main] INFO com.sb.support.QueueZero - new snapshot 2018-01-04 10:14:46.106-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl.DataRefresh:ParallelSequence 1 2018-01-04 10:14:46.107-0500 [main] INFO com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 3 2018-01-04 10:14:46.107-0500 [main] INFO com.sb.support.QueueZero - All queues low, exiting.
package com.sb.support;
import java.util.*;
import org.slf4j.*;
import com.streambase.sb.StreamBaseException;
import com.streambase.sb.client.*;
import com.streambase.sb.monitor.*;
/*
* QueueZero monitors all concurrent region queues for the SB instance running at the provided URL.
* When all monitored queues have fewer than 20 elements, the server is expected to quiet in the next second.
* When the server is sufficiently quiet (caught up with all data) then QueueZero exits normally.
* This allows QueueZero to pause a script until the target SB server is quiet, then continue the script.
*
* Run as: java -jar QueueZero.jar
*
* This is provided AS IS by TIBCO Support as an example with no guarantee of correctness for any specific application.
* Use at your own risk.
*
* Author: Greg Buhtz, TIBCO Software, 2018
*/
class MyListener implements MonitorListener {
StreamBaseMonitor mon;
final static private Logger logger = LoggerFactory
.getLogger(QueueZero.class);
@Override
public void snapshotReceived(Snapshot monSnap) {
// write to confirm continued function
logger.info("new snapshot");
Iterator<ModuleInfo> mods = monSnap.moduleInfos();
boolean quit = true;
// loop through reported modules (containers and concurrent regions)
while (mods.hasNext()) {
ModuleInfo mi = monSnap.getModuleInfo(mods.next().getName());
// loop through queues
for (ModuleInfo.QueueInfo q: mi.queues()) {
// filter for main region queue (ignores aliases)
// note: add additional filters here if specific queues are to be ignored
if (q.getName().contains("ParallelSequence")) {
int len = q.getCurrentQueueLength();
// report queue size for non-zero queues to assist diagnostics
if (len>0) {
logger.info(String.format("%s %d",q.getName(),q.getCurrentQueueLength()));
}
// Do not exit if any queue is large enough that continuous processing will continue
// Expectation is that any queue with fewer than 20 elements will be drawn down to zero in under one second
// resulting in less than 100% cpu use and that it is safe to proceed with 'sbadmin' commands.
if (len > 20) { quit = false; }
}
}
}
if (quit) {
logger.info("All queues low, exiting.");
mon.terminate();
}
}
MyListener(StreamBaseMonitor mon) {
this.mon = mon;
}
}
public class QueueZero {
private final static String SB_URI = "sb://localhost:10000";
private static StreamBaseMonitor mon;
public static void main(String[] args) {
MyListener ml;
try {
StreamBaseURI uri = null;
if (args.length > 0) {
uri = new StreamBaseURI(new String(args[0]));
} else
uri = new StreamBaseURI(SB_URI);
mon = new StreamBaseMonitor(uri);
ml = new MyListener(mon);
mon.addMonitorListener(ml);
mon.run();
} catch (StreamBaseException e) {
System.out.println("QueueZero: Exception: " + e.getMessage());
//e.printStackTrace();
} finally {
if (mon != null) {
mon.close();
}
}
}
}