Converting Streaming Heap-based Query Tables to use Cluster Transactional Memory

Converting Streaming Heap-based Query Tables to use Cluster Transactional Memory

book

Article ID: KB0070934

calendar_today

Updated On:

Products

TIBCO Streaming

Description

The primary way active processing state is communicated to other nodes in a Streaming cluster to support High Availability if a node fails is through common Query Tables which use Transactional Memory for storage.

The questions this article answers are:
1. How can one convert a Query Table using Heap storage to one using Transactional Memory storage?
2. Are there any performance or other considerations?

Issue/Introduction

Steps to configure a Query Table to support High Availability and performance considerations.

Resolution

Configuration Steps

Do the following in SB Studio to make the manual changes needed to move a Query Table from Type "In heap" (QT-in-Heap) to "In transactional memory" (QT-in-TM):
  1. Select the Query Table in the open EventFlow editor
  2. On the Table Settings tab in the StreamBase Properties view, change Type to "In transactional memory"
  3. On the Data Distribution tab, set Distribute Data to checked/true
  4. Set Policy Name to "default-dynamic-data-distribution-policy"
  5. Set Mapper Type to "Distributed hash"
  6. On the Schema tab, select Hash partitioning key fields using the "Partition Field or Partition Key selection" control and click on selected fields.
If you are using a Named Schema for the table schema, the control to choose the partitioning key fields may be inactive (gray) after the above changes.

To make the control active: 
  1. Set the schema to "<Private Schema>". 
  2. Click off of the selected table in the EventFlow editor (click on whitespace to select nothing).
  3. Select the table again, and on the Schema tab the partitioning key control is now active.
  4. Use it to select the partitioning mode and select the key fields.
  5. Set the schema back to the Named Schema.
  6. Click off of the selected table in the EventFlow editor (click on whitespace to select nothing) to register all changes.
  7. Save the changes.

Performance Considerations

Reading from and writing to a Query Table in Transactional Memory is slower than performing the same operations on a Query Table in Heap memory. A Transactional Memory table's contents may be distributed across multiple nodes and not all be local, so reading may involve network communication. Similarly, writes require the completion of a transaction with all replica nodes before the write is considered complete. If any node is running on Windows or is not using System V Shared Memory (Linux RAM-based storage), then reads and writes also involve reads and writes to the disk, which will further slow the transaction.

We compared performance writing 1-million rows to a Query Table on a consumer-grade, 2.7GHz Intel i7, 4-CPU system. Most use-cases require many fewer rows but the larger row count made for stronger comparisons and demonstrated that performance is impacted by total row count. In all cases a table with fewer rows handled write operations faster per-write than a table with a greater number of rows.

These are the time comparisons for writing 1-million rows to a Query Table on consumer PC hardware:
a. Windows, QT-in-Heap: 52 seconds (19K inserts per second).
b. Windows, QT-in-TM, single active node in cluster: 2:29 minutes (2.8x slower than in-Heap, 6.7K inserts per second)
c. Windows, QT-in-TM, two active nodes, one node loading data into table: 18:23 minutes (21x slower, 900 inserts per second)
d. Linux, QT-in-TM using SYSTEM_V_SHARED_MEMORY, two active nodes, one loading data: 10:42 minutes (12x slower, 1.5K inserts per second)

The 1-million rows from our test is not typical. Most query tables are much smaller. Performance will be better when maintaining tables with fewer than 100,000 rows.

When adding a new node to an existing cluster, replication to that node begins and will take a similar amount of time to the "two active node" load results. The original node or nodes will continue processing data without delay, but the additional node will take time to become fully functional after being started. The application design should take this replication time into consideration before initiating new connections to data inputs. 

Similarly, the application design should accommodate tables which can only sustain under 1000 writes per second, and with more active replicas, configuration, and other system limitations, may be limited to under 300 writes per second. For critical flows where low latency and high throughput are required to meet specific targets, test the impact of placing a Query Table into Transactional Memory before relying on a simple configuration change.
 

Application Design Recommendations

From above:
  • Take replication time into consideration before starting new inputs that would write to the tables being replicated.
  • Test the impact of placing a Query Table into Transactional Memory to make sure performance targets are maintained.
We recommend that QT-in-TM store only the minimal information needed to enable high-availability failover for load-balanced and hot/warm cluster configurations. This typically excludes partial computation results which represent in-flight transient data. Instead, candidates for storage in a Query Table in Transactional Memory are final and complete computation results remaining at the end of a flow, and only what is needed from remembered state in order to process the next tuple correctly.

The primary cause of additional latency is the communication to ensure data is copied correctly and completely between nodes. For more information about transaction communication and handling, see product documentation topics:
  Spotfire Streaming > Architecture Guide > Transactions
  Spotfire Streaming > Architecture Guide > Distributed Computing
  Spotfire Streaming > Architecture Guide > High Availability
Note that the Distributed Router operator also uses transactions to ensure correctness when communicating between nodes, so has similar throughput limitations.

For fastest communication between nodes without transactional guarantees, send data using a Stream-to-Stream "Remote Container Connection" (which uses StreamBase Client connections by SB URI, sb:// protocol) as described by documentation topics:
  Spotfire Streaming > StreamBase Admin Guide > Using StreamBase Containers > Container Connections
  Spotfire Streaming > StreamBase References > Command Reference > sburi
  Spotfire Streaming > Authoring Guide > Using Streams > Defining Output Streams, "Properties: Advanced Tab"
The StreamBase Client protocol will drop data warning during disconnection and reconnection periods. Otherwise while in a stable connection no data loss is expected. The protocol makes no guarantees and is not intended for long-distance connections but only for stable connections within a single sub-network.

For transaction tuning, the built-in "default-dynamic-data-distribution-policy" is optimal for most applications. From our testing we did not find customizing this policy by either changing the partition or thread count made a significant change to the performance times unless the counts were extremely restrictive (which made performance worse).
 

Deadlock Considerations

When writing to a specific row in a QT-in-TM, make sure updates are not done by two nodes at the same time. This can occur when starting multiple nodes at the same time where the application loads data into the Query Table at startup. Similarly, if two nodes are receiving data from the same source, then they may be updating the same tables with the same data at nearly the same time.

NOTE: One way to make sure only one node is responsible for loading the table is to read the table first, and only proceed to load the table if it has no rows in it. The quickest check is to Read All Rows sorted by the index with a Limit of one (1). This returns a single data row (if it exists). If one row is returned and has real data and not the fallback values, then the reading node knows some other node is already writing the the table and it does not have to. Do not use the count() aggregate expression for this, in case the table has many thousands of rows this function produces a table-lock which may take awhile to resolve and cause deadlocks while it is processing.

If two nodes write to the same row at the same time this can cause a Distributed Deadlock. A normal Transactional Memory deadlock between two local threads is resolved on a single node typically in under one second. A Distributed Deadlock has a 60-second timeout for the deadlock to be resolved through activity on either node, and may block other table readers and writers due to per-row locking for several minutes. 

An EventFlow pattern which makes a table more prone to deadlocks (normal and distributed) by increasing the time a table remains accessed by a thread is to have in the same flow a Query Read followed by a Query Write of the same row. This causes a read promotion where the read becomes a write lock, blocking other readers and writers that may be doing a concurrent read or waiting to write. This is a race and whether a deadlock occurs or not is unpredictable. Also, the rate of writes to the table is important. The more frequently a table is written to the more opportunity for simple deadlock or distributed deadlock occur.

Deadlocks are reported in the node logs/deadlock.log file after they are resolved. Active deadlocks need to be resolved before they can be reported. These reports appear like this:
 
2023-10-16 11:33:59.923000-0400 [20900:15056]: (osstmgr.cpp:6506) 2023-10-16 11:33:59.914000 Promotion deadlock detected in transaction 351:9
by engine application::default-engine-for-com.example.QTSpeed running on node A.qts.
Deadlock promoting read lock on object default.DataTable:109872 (1544710558:1915874064:120233758350:109872)
which was modified and committed by another transaction.

2023-10-16 11:55:04.233000-0400 [20900:15056]: (osstmgr.cpp:6506) 2023-10-16 11:55:04.228000 Global transaction deadlock processed on
by engine application::default-engine-for-com.example.QTSpeed running on node A.qts in transaction 351:14

The table that was involved in the deadlock is identified in the list that looks like this:
 
Locks held by transaction 351:9:
    default.DataTable:110171 (1544710558:1915874064:120233758350:110171) write lock
    default.DataTable:109959 (1544710558:1915874064:120233758350:109959) write lock

If a table is frequently involved in a deadlock, consider making one node in the cluster responsible for all updates to the table using Cluster Aware operator settings, and consider making all writes to that table from one parallel region only in your application design. Also consider whether the data in that table is necessary to support failover or whether that data is available elsewhere (a JMS bus, a Kafka journal, a CSV file on a shared file system, or a JDBC database as some examples). If the data is easily recoverable for the node that takes over processing after another node's failure, then maybe that table does not need to be in Transactional Memory, but just re-loaded quickly from external sources.