Here are examples for validating and returning representative complex types:
A. Validating a tuple or list or list(tuple) types in a custom function
B. Return tuple or list or list(tuple) types in a custom function
C. Return a list of ints or a list of tuples from a custom Java aggregate function
A. Validating a tuple or list or list(tuple) types in a custom function
The custom function API performs typecheck via the CustomFunctionResolver method which returns a non-null return type if the input types are acceptable. This is a one-time check used when compiling the application. Since data-types are invariant during execution, the one-time check is all that is needed.
Complex types are lists(simple), tuple(), and list(tuple) as these require additional work to validate within the Resolver.
For a simple type, no Resolver is needed because the types are known and typechecking can proceed without assistance.
For a list input parameter, the Java type is List<?> and the inner type to be handled by the function is opaque (for example, the function may be able to handle multiple list types), so the Resolver will be called to make the decision. For example, to check whether a List<int> was supplied, do:
public static CompleteDataType MyFunctionCustomFunctionResolver0(CompleteDataType arg0) {
if (arg0.getElementType()==CompleteDataType.forInt()) {
return CompleteDataType.forInt();
}
return null;
}
When handling multiple input parameters, just make certain that if any single parameter has the wrong type that the entire Resolver returns 'null'.For a tuple input parameter, the Java type is Tuple, but the required schema is opaque. For example, to check whether "field1" exists and is an int, do:
public static CompleteDataType MogrifyCustomFunctionResolver0(CompleteDataType arg0) {
try {
if (arg0.getSchema().getField("field1").getCompleteDataType() == CompleteDataType.forInt()) {
return CompleteDataType.forInt();
}
} catch (TupleException e) {
e.printStackTrace();
}
return null;
}
For a list of tuple, the pattern is a combination of the above patterns. For example, to check whether the list is of type tuple with a specific int field, do:
public static CompleteDataType DoThisCustomFunctionResolver0(CompleteDataType arg0) {
try {
if (arg0.getElementType().getSchema().getField("field1").getCompleteDataType()==CompleteDataType.forInt()) {
return CompleteDataType.forInt();
}
} catch (StreamBaseException e) {
ret urn null;
}
return null;
}
B. Return tuple or list or list(tuple) types in a custom function
The return type of CompleteDataType.forX needs to match the function declaration. The supported Java types return types (as of SB 7.3) are:
int and Integer | return CompleteDataType.forInt(); |
long and Long | return CompleteDataType.forLong(); |
double and Double | return CompleteDataType.forDouble(); |
boolean and Boolean | return CompleteDataType.forBool(); |
byte[] and String, | return CompleteDataType.forString(); |
com.streambase.sb.Timestamp | return CompleteDataType.forTimestamp(); |
com.streambase.sb.ByteArrayView | return CompleteDataType.forBlob(); |
com.streambase.sb.Tuple | return CompleteDataType.forTuple(com.streambase.sb.Schema); |
List<?> | return CompleteDataType.forList(com.streambase.sb.CompleteDataType); |
For example, the declaration:
public static Tuple ReturnAn(int arg0){...}needs a Custom Function Resolver which returns:
return CompleteDataType.forTuple(TupleSchema);where TupleSchema is a schema obtained from one of the input arguments, constructed within the function, or statically within the containing class. Getting the schema from and argument is easiest, e.g do:
return CompleteDataType.forTuple(arg3.getSchema());The argument's schema should be checked in the Resolver to make sure it meets the requirements of the function, as shown above.
If the function is to return a Tuple with a new schema or with additions or changes to an input argument's schema, the best approach is to create a
List<Schema.Field> and add the field types from the input argument as needed, and any additional Schema.Field types, and then use the Schema constructor:
Schema OutSchema = Schema(String name, List<Field> fields)Example:
static Schema TupleSchema;
@CustomFunctionResolver("ReturnAnExtraFieldCustomFunctionResolver0")
public static Tuple ReturnAnExtraField(Tuple arg0){
// Create output tuple with new schema
Tuple out = TupleSchema.createTuple();
try {
// Copy fields as needed
for (int fi=0; fi<arg0.getSchema().getFields().length; fi++) {
out.setField(fi, arg0.getField(fi));
}
// Set fields as needed
out.setField("TheInt", 10);
} catch (TupleException e) {
e.printStackTrace();
}
return out;
}
public static CompleteDataType ReturnAnExtraFieldCustomFunctionResolver0(CompleteDataType arg0) {
// Construct the new schema
List<Schema.Field> NewSchemaFields = new ArrayList<Schema.Field>();
for (int fi=0; fi<arg0.getSchema().getFields().length; fi++) {
NewSchemaFields.add(arg0.getSchema().getField(fi));
}
Schema.Field AddField = Schema.createField(DataType.INT, "TheInt");
NewSchemaFields.add(AddField);
TupleSchema = new Schema(null,NewSchemaFields);
// Check input fields for correct types here
if (arg0.getDataType()==DataType.TUPLE) {
return CompleteDataType.forTuple(TupleSchema);
}
return null;
}
The List<?> return type is typically for a list of a simple type or a list of type Tuple.This example returns a list of tuple with specific fields required in the inner tuple:
static Schema listSch;
// Return a list with elements matching the supplied ID (arg0)
@CustomFunctionResolver("ProcessListCustomFunctionResolver0")
public static List<?> ProcessList(Integer arg0, List<?> arg1){
ArrayList<Tuple> out = new ArrayList<Tuple>();
Tuple tmp = listSch.createTuple();
for (int fi=0; fi<arg1.size(); fi++) {
Tuple item = (Tuple) arg1.get(fi);
try {
if (item.getInt("id")==arg0 ) {
tmp.setInt("id", item.getInt("id"));
tmp.setDouble("val", item.getDouble("val"));
out.add(tmp);
}
} catch (NullValueException e) {
System.out.println("Function - ProcessList - "+e.toString()+" in parameter:"+arg1.toString());
} catch (TupleException e) {
System.out.println("Function - ProcessList - "+e.toString()+" in parameter:"+arg1.toString());
}
}
return out;
}
public static CompleteDataType ProcessListCustomFunctionResolver0(CompleteDataType arg0, CompleteDataType arg1) {
// Create the schema for the tuple in the returned list
List<Schema.Field> NewSchemaFields = new ArrayList<Schema.Field>();
Schema.Field AddIDField = Schema.createField(DataType.INT, "id");
Schema.Field AddValField = Schema.createField(DataType.DOUBLE, "val");
NewSchemaFields.add(AddIDField);
NewSchemaFields.add(AddValField);
listSch = new Schema(null,NewSchemaFields);
// Check the input parameters
if (arg1.getElementType() == null) {
return null; // not a list
}
if (!(arg1.getElementType().getDataType()==DataType.TUPLE)) {
return null; // element type not a tuple
}
try {
if ( arg0.getDataType()==DataType.INT &&
arg1.getElementType().getSchema().getField("id").getCompleteDataType()==CompleteDataType.forInt() &&
arg1.getElementType().getSchema().getField("val").getCompleteDataType()==CompleteDataType.forDouble()
) {
return CompleteDataType.forList(CompleteDataType.forTuple(listSch));
}
} catch (StreamBaseException e) {
e.printStackTrace();
}
return null;
}
The error reported if the Resolver returns null:"None of the Java methods named "ProcessList" in class com.tb.sb.support.MyFunctions are compatible with parameter(s): int, list((id int, val long))"
C. Return a list of ints or a list of tuples from a custom Java aggregate function
A custom resolver function is necessary in order to return complex data types from a custom Java aggregate function just as it is necessary from a custom Java simple function.The resolver function is developed in the same way for an aggregate function as for a simple function. However. there are a couple requirements to adhere to:
- When declaring a <custom-function> element for the function, there must be a name attribute whose value is "accumulate" and whose args attribute has the value "auto".
<custom-function alias="IntList" args="auto" class="aggFuncsReturningList.IntList" language="java" name="accumulate" type="aggregate"/>
- The CompleteDataType returned by the resolver is the return type of the calculate() method of the aggregate function class, not the accumulate() method.
package aggFuncsReturningList;
import com.streambase.sb.CompleteDataType;
import com.streambase.sb.client.CustomFunctionResolver;
import com.streambase.sb.operator.AggregateWindow;
/**
* Using an array is more efficient than List because StreamBase uses an array to back the list type,
* though it only works with a primitive element type.
*
*/
public class IntList extends AggregateWindow {
public static final long serialVersionUID = 1413838591887L;
private static final int[] EMPTY = new int[0];
private int[] state;
private int i;
public void init() {
state = EMPTY;
i =0;
}
public int[] calculate() {
return state;
}
@CustomFunctionResolver("accumulateResolver")
public void accumulate(int arg) {
if (state == EMPTY) {
state = new int[10];
}
state[i] = arg;
i = (i + 1) % state.length;
}
public static CompleteDataType accumulateResolver(CompleteDataType arg) {
return CompleteDataType.forIntList();
}
public void release() {
state = null;
}
}
Attached is an importable StreamBase Studio 7.5 project with the above example as well an example returning a list of tuples:
17760258_aggFuncsReturningList.tar.gz (attachment)
To use this project in TIBCO Streaming 10, import normally, then right-click the project in the Project Explorer and choose the
Upgrade StreamBase Project menu item.