How to synchronize Aggregate results across multiple nodes

How to synchronize Aggregate results across multiple nodes

book

Article ID: KB0073360

calendar_today

Updated On:

Products Versions
TIBCO Streaming 10

Description

The Aggregate operator's internal state is private and cannot be shared with other nodes. Its state is also opaque to the local EventFlow application. Its state is entirely determined by the tuples that have been submitted to it, in what order, and in some cases exactly when those tuples arrived.

How can this be made to work in a cluster with multiple nodes?

Resolution

This is a general discussion of the technologies available. The specific application design will depend upon the considerations listed here and upon your project's other functional requirements.

All of the Aggregate Operator limitations are avoided in a multiple-node cluster if the Aggregate operator is replaced with a Query Table ("the table") in Transactional Memory (TM). The table state is shared with other nodes. Its state is available for both local and cluster-wide queries. And its state may be managed by all nodes so that all nodes participate in creating a single consistent view. This is the preferred solution.

Please see product documentation page:
  TIBCO Streaming > Authoring Guide > Using Query Tables > Using the Query Table Data Construct
  "Properties: Table Settings Tab"


The main Aggregate Operator functions are replaced by the Query operators that interact with the table. The Query operator supports aggregate functions, Group By, rolling windows through use of Range Specification, and individual window selection using Matches configuration. Window evacuation (deleting rows) is simulated by deleting rows by timestamp. Triggering output based on input data as controlled by the Dimension is moved to Filter operators that see use the "close" expression to trigger suitable queries. If there are no overlapping windows (where a single tuple or row is used for multiple reports) then the Query Read operator may be replaced with the Query Delete operator set to emit the deleted rows, resulting in a table that clears itself as soon as the rows are used in a report.

See page:
  TIBCO Streaming > Authoring Guide > Using StreamBase Operators > Using the Query Operator

Cluster Types

High Availability

In a High Availability cluster for fail-over support there is a "hot", primary, or active node and a "warm" secondary node, maybe several. At failover there are two considerations. The first is to ensure the Aggregate operator in the newly promoted node has consistent state. To manage this, there needs to be the freedom to have a period where the Aggregate starts empty and is filled to current levels. The second is that during this time reporting should be suppressed.

If the Aggregate operator is replaced with a Query Table, there is no transition time required since the state is already consistent. All that is required is that data flow into the new primary node so that updates continue as expected. This may be automated by having the input adapters become active based on cluster state.

See page:
  TIBCO Streaming > StreamBase Admin Guide > Using Cluster Awareness


Load Balanced

In a Load Balanced cluster for distributed high throughput processing all nodes are active and each process a subset of the incoming data. The data is distributed to each node by a key value that ensures related data are processed together on a single node. The data in the Aggregate Operator in this case is implicitly grouped. 

When a node joins the cluster there will need to be administrative action to tell it what key values it is responsible for and its Aggregate operators will need a period of time to fill. Similarly those key values will need to be removed from the other node's queries. This may be managed automatically depending on the input adapters in use. For example, both JMS and Kafka support distribution by a key field.

When a node leaves the cluster, initially no node is responsible for its assigned the key values. There will need to be an administrative action to assign those keys to existing nodes. The Aggregate operators will need a period of time to fill. Again this may be assisted by configuration depending on the specific types of input adapters used.

If the key field distribution is associated with a Streaming cluster partition, then that may replace having to perform any manual reconfiguration because as nodes join and leave the cluster, the StreamBase Runtime ensures every partition has an active node and several secondaries. The time to fill the Aggregate operator may be eliminated if it is replaced with a Query Table in Transactional Memory. All rows previously processed by any one node are already present on its secondaries. The orphaned key values will follow their partition to any of the remaining nodes.

Please also see the considerations for High Availability, above, since those also apply in this case as well.

Issue/Introduction

Design options

Additional Information

TIBCO Streaming 10.6 Documentation