Products | Versions |
---|---|
TIBCO Streaming | 7.x |
I have input and output adapters and often the input adapter has connected and starts receiving data before the output adapter has connected. Tuples sent to the output adapter while disconnected are lost. How can this loss be prevented?
There are two design approaches:
A. Make the input connection dependent on the output connection state.
B. Cache all messages and emit from the cache only when the output adapter is connected.
For A: Use the Status port of the output adapter to determine when it is connected or disconnected, and use this as input to the input adapter's Control port to block new input data while output is prevented. There is still a little bit of a race condition where a message has been received before the output status changes. If this is a concern, perhaps use option B. If using a message bus for input, simply to not acknowledge the received message and discard it.
For B: Use a Query Table as a cache, and further limit data loss by using on-disk or in-transactional-memory (SB10) storage. Periodically query the table for available messages, read a few, send them, and delete them.
See CacheDemo.sbapp linked from this article. It uses the general pattern of always writing to the Cache, and removing from the Cache only when the output adapter is connected. The Query "Remove" operator reports what it deletes combining the read and delete operations into a single step. The output is ordered by input order, sending oldest to newest. The number of rows deleted is limited to limit the output load during each 1-second poll period to not overload downstream operations and not block upstream activity after recovering from a disconnect.
CacheDemo.sbapp:
To use, send in status changes to the AdapterStatus input stream and send data into the DataIn stream. Tuples will only be emitted when connected=true.
1. The AdapterStatus input stream is used to set the isConnected Dynamic Variable.
2. The CheckConnected Filter blocks removing from the Cache whether triggered by a new update or the 1-second poll.
3. The Remove operator reads all rows, in Ascending (lowest to highest id value) but only operates on up to ten (10) to not dump the entire table downstream in a single operation, which would block other activity.
4. If the Cache is empty, there is no output from the Remove query.
5. In order to send removed rows out, the Old Table Fields must be reported from the delete query.