Monitor StreamBase Queues from Java Client

Monitor StreamBase Queues from Java Client

book

Article ID: KB0082781

calendar_today

Updated On:

Products Versions
TIBCO Streaming -

Description

How can I tell if there is queuing in a StreamBase server, or when the server has become quiet after data input has stopped?

Resolution

The following Java application connects to a StreamBase server and continues to run until no internal queue has more than 20 tuples remaining. This may be called from scripts that need to wait for all the tuple data to be processed before performing the next action.

Steps to Create:

In Studio, to create the jar file:
1. Open a new StreamBase project and set Java Build Path Option "Enable support for compiling StreamBase Client API classes".
2. Right-click the java-src folder and choose New > (Other) > Java > Package with name "com.sb.support".
3. Right-click the com.sb.support folder and choose New > File, set Name "QueueZero.java".
4. Paste in the code below and Save (there should be no errors).
5. Run QueueZero as a Java application from SB Studio. This creates a default Launch Configuration. Stop it if it does not stop by itself.
Note: If there is no SB server to connect to, this will report error:
QueueZero: Exception: Unable to connect to server(s) [sb://localhost/]
and quit. This is OK.

6. Right-click QueueZero.java and choose Export > Java > Runnable Jar File, choose the Launch configuration "QueueZero - QueueZero", set an Export destination, set Library handling to "Extract required libraries into generated JAR", and click Finish.
Note: You may see dialog message "This operation repacks referenced libraries". Please review and make sure the application is only used by authorized users based on your software license agreement.

7. Run the resultant jar from the command-line as:
QueueZero.jar [sb://hostname:port]
or
java -jar QueueZero.jar [sb://hostname:port]

Output will appear as (example):
2018-01-04 10:14:44.105-0500 [main] INFO  com.sb.support.QueueZero - new snapshot
2018-01-04 10:14:44.106-0500 [main] INFO  com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 20445
2018-01-04 10:14:45.110-0500 [main] INFO  com.sb.support.QueueZero - new snapshot
2018-01-04 10:14:45.111-0500 [main] INFO  com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 1545
2018-01-04 10:14:46.106-0500 [main] INFO  com.sb.support.QueueZero - new snapshot
2018-01-04 10:14:46.106-0500 [main] INFO  com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl.DataRefresh:ParallelSequence 1
2018-01-04 10:14:46.107-0500 [main] INFO  com.sb.support.QueueZero - ItemsSales.EngineFactory.EngineImpl:ParallelSequence 3
2018-01-04 10:14:46.107-0500 [main] INFO  com.sb.support.QueueZero - All queues low, exiting.
 

Discussion:

As provided this application connects to the StreamBase server and continuously reports queue information once per second for all queues still containing tuples. It quits when no queue has more than 20 tuples, indicating a server which is nearly quiet. This is useful for scripts that want to wait for the server to become quiet before initiating new activity. This can assist with completing processing before shutdown, or before initiating a new step which will have high demand for CPU.

If you find that ALL queues do not consistently empty to below 20 tuples at any time, you may increase the threshold value or exclude some queues from consideration (by editing the Java source and re-exporting).

Source Code:

package com.sb.support;

import java.util.*;
import org.slf4j.*;
import com.streambase.sb.StreamBaseException;
import com.streambase.sb.client.*;
import com.streambase.sb.monitor.*;

/*
 * QueueZero monitors all concurrent region queues for the SB instance running at the provided URL.
 * When all monitored queues have fewer than 20 elements, the server is expected to quiet in the next second.
 * When the server is sufficiently quiet (caught up with all data) then QueueZero exits normally.
 * This allows QueueZero to pause a script until the target SB server is quiet, then continue the script.
 * 
 * Run as: java -jar QueueZero.jar
 * 
 * This is provided AS IS by TIBCO Support as an example with no guarantee of correctness for any specific application. 
 * Use at your own risk.
 * 
 * Author: Greg Buhtz, TIBCO Software, 2018
 */

class MyListener implements MonitorListener {
	StreamBaseMonitor mon;
	final static private Logger logger = LoggerFactory
			.getLogger(QueueZero.class);
	
	@Override
	public void snapshotReceived(Snapshot monSnap) {
		// write to confirm continued function
		logger.info("new snapshot");
		Iterator<ModuleInfo> mods = monSnap.moduleInfos();
		boolean quit = true;
		// loop through reported modules (containers and concurrent regions)
		while (mods.hasNext()) {
			ModuleInfo mi = monSnap.getModuleInfo(mods.next().getName());
			// loop through queues
			for (ModuleInfo.QueueInfo q: mi.queues()) {
				// filter for main region queue (ignores aliases)
				// note: add additional filters here if specific queues are to be ignored
				if (q.getName().contains("ParallelSequence")) {
					int len = q.getCurrentQueueLength();
					// report queue size for non-zero queues to assist diagnostics
					if (len>0) {
					    logger.info(String.format("%s %d",q.getName(),q.getCurrentQueueLength()));
					}
					// Do not exit if any queue is large enough that continuous processing will continue
					// Expectation is that any queue with fewer than 20 elements will be drawn down to zero in under one second
					//  resulting in less than 100% cpu use and that it is safe to proceed with 'sbadmin' commands. 
					if (len > 20) { quit = false; } 
				}
			}
		}
		if (quit) {
			logger.info("All queues low, exiting.");
			mon.terminate(); 
		}
	}
	
	MyListener(StreamBaseMonitor mon) {
		this.mon = mon;
	}
}

public class QueueZero {

	private final static String SB_URI = "sb://localhost:10000";
	private static StreamBaseMonitor mon;

	public static void main(String[] args) {
		MyListener ml;
		try {
			StreamBaseURI uri = null;
			if (args.length > 0) {
				uri = new StreamBaseURI(new String(args[0]));
			} else
				uri = new StreamBaseURI(SB_URI);
			mon = new StreamBaseMonitor(uri);
			ml = new MyListener(mon);
			mon.addMonitorListener(ml);
			mon.run();
		} catch (StreamBaseException e) {
			System.out.println("QueueZero: Exception: " + e.getMessage());
			//e.printStackTrace();
		} finally {
			if (mon != null) {
					mon.close();
			}
		}
	}
}

 

Issue/Introduction

Java source code example provided