Cookies Psst! Do you accept cookies?

We use cookies to enhance and personalise your experience.
Please accept our cookies. Checkout our Cookie Policy for more information.

(Part 11)Golang Framework Hands-on - Adaptive Registration of FaaS Parameter Types Based on Reflection

8729d750-897c-4ba3-98b4-c346188d034e

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

To be continued.

Next, we will enhance the Function in KisFlow to better focus on processing business data. We will change the previous Function implementation:

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName3Handler ----")
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

In this implementation, raw data is obtained from flow.Input(). We will modify it so that the business can directly obtain the specific data structure type it wants without assertions and type conversions. The modified Function extended parameter usage is roughly as follows:

proto

type StuScores struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

type StuAvgScore struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

FaaS

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    proto.StuScores
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    proto.StuAvgScore
}

// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        avgScore := proto.StuAvgScore{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // Commit the result data
        _ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
    }

    return nil
}

In this way, we can directly obtain the desired target output structure data through the third parameter rows, without needing assertions and conversions, thereby focusing more on the business-side development efficiency.

Of course, if you want to obtain the raw data, you can still get it from flow.Input().

This chapter will implement the above functionality in KisFlow.

11.1 Self-Describing FaaS Business Callback Functions

In this section, we will complete the conceptual transformation for self-describing FaaS. Previously, the FaaS callback was defined as:

type FaaS func(context.Context, Flow) error

We need a structure to describe this function's properties, including its name, address, number of parameters, parameter types, return type, etc.

11.1.1 FaaSDesc: Self-Describing Callback Type

Create a new file faas.go under kis-flow/kis/ and define the following structure:

kis-flow/kis/faas.go

// FaaS: Function as a Service

// Change
// type FaaS func(context.Context, Flow) error
// to
// type FaaS func(context.Context, Flow, ...interface{}) error
// to allow data transmission through arbitrary input types in variadic parameters
type FaaS interface{}

// FaaSDesc: Description of the FaaS callback business function
type FaaSDesc struct {
    FnName    string         // Function name
    f         interface{}    // FaaS function
    fName     string         // Function name
    ArgsType  []reflect.Type // List of function parameter types
    ArgNum    int            // Number of function parameters
    FuncType  reflect.Type   // Function type
    FuncValue reflect.Value  // Function value (function address)
}

The previous FaaS type is improved to interface{}, and FaaSDesc now has some attributes.

  • FnName: Indicates the name of the current function, such as "funcDemo1" in our previous examples. This is used to identify the function in KisFlow.
  • f: The defined FaaS function.
  • fName: The name of the function defined by f. ArgsType: The list of parameter types of the defined f function, which is a slice.
  • ArgNum: The number of input parameters of the defined f function.
  • FuncType: The data type of the defined f function.
  • FuncValue: The value of the defined f function (address of the schedulable function).

11.1.2 Create a New FaaSDesc Object

Below is a constructor function for creating a FaaSDesc object. The parameter types are the FunctionName in KisFlow and the defined FaaS function:

kis-flow/kis/faas.go

// NewFaaSDesc creates a FaaSDesc instance based on the registered FnName and FaaS callback function
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
    // The callback function FaaS, function value (function address)
    funcValue := reflect.ValueOf(f)

    // The type of the callback function FaaS
    funcType := funcValue.Type()

    // Check if the provided FaaS pointer is a function type
    if !isFuncType(funcType) {
        return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
    }

    // Check if the provided FaaS function has exactly one return value of type error
    if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
        return nil, errors.New("function must have exactly one return value of type error")
    }

    // The list of parameter types of the FaaS function
    argsType := make([]reflect.Type, funcType.NumIn())

    // Get the name of the FaaS function
    fullName := runtime.FuncForPC(funcValue.Pointer()).Name()

    // Ensure the parameter list of FaaS func(context.Context, Flow, ...interface{}) error includes context.Context and kis.Flow

    // Check if the parameter list includes kis.Flow
    containsKisFlow := false
    // Check if the parameter list includes context.Context
    containsCtx := false

    // Iterate over the parameter types of the FaaS function
    for i := 0; i < funcType.NumIn(); i++ {

        // Get the type of the i-th parameter
        paramType := funcType.In(i)

        if isFlowType(paramType) {
            // Check if the parameter list includes kis.Flow
            containsKisFlow = true

        } else if isContextType(paramType) {
            // Check if the parameter list includes context.Context
            containsCtx = true

        } else if isSliceType(paramType) {
            // Check if the parameter list includes a slice type

            // Get the element type of the current slice parameter
            itemType := paramType.Elem()

            // If the current parameter is a pointer type, get the type of the structure it points to
            if itemType.Kind() == reflect.Ptr {
                itemType = itemType.Elem() // Get the type of the structure it points to
            }
        } else {
            // Other types are not supported...
        }

        // Append the current parameter type to the argsType list
        argsType[i] = paramType
    }

    if !containsKisFlow {
        // If the parameter list does not include kis.Flow, return an error
        return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
    }

    if !containsCtx {
        // If the parameter list does not include context.Context, return an error
        return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
    }

    // Return the FaaSDesc instance
    return &FaaSDesc{
        FnName:    fnName,
        f:         f,
        fName:     fullName,
        ArgsType:  argsType,
        ArgNum:    len(argsType),
        FuncType:  funcType,
        FuncValue: funcValue,
    }, nil
}

Here, we use reflection to get the related attribute values from the f function and store them in FaaSDesc. To ensure that the provided FaaS function meets the following format:

type FaaS func(context.Context, Flow, ...interface{}) error

We perform strict type checks on the context.Context and Flow parameters. The checking methods are as follows:

kis-flow/kis/faas.go

// isFuncType checks if the provided paramType is a function type
func isFuncType(paramType reflect.Type) bool {
    return paramType.Kind() == reflect.Func
}

// isFlowType checks if the provided paramType is of type kis.Flow
func isFlowType(paramType reflect.Type) bool {
    var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()

    return paramType.Implements(flowInterfaceType)
}

// isContextType checks if the provided paramType is of type context.Context
func isContextType(paramType reflect.Type) bool {
    typeName := paramType.Name()

    return strings.Contains(typeName, "Context")
}

// isSliceType checks if the provided paramType is a slice type
func isSliceType(paramType reflect.Type) bool {
    return paramType.Kind() == reflect.Slice
}

In NewFaaSDesc(), we use two boolean variables, containsKisFlow and containsCtx, to check whether the parameter list includes Context and Flow types. The following code ensures compatibility when the parameter type is a structure pointer:

            // ... ... 

            // Get the current parameter type
            itemType := paramType.Elem()

            // If the current parameter is a pointer type, get the type of the structure it points to
            if itemType.Kind() == reflect.Ptr {
                itemType = itemType.Elem() // Get the type of the structure it points to
            }

            // ... ... 

For example, the developer might define the FaaS function prototype as follows:

func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error

or:

func MyFaaSDemo(ctx context.Context, flow Flow, []A) error

11.1.3 Registering FaaS Functions

Next, we will modify the method for registering FaaS functions in the kisPool module to register a FaaSDesc description. The modified registration method is as follows:

kis-flow/kis/pool.go

// FaaS registers a Function's business logic, indexed and registered by Function Name
func (pool *kisPool) FaaS(fnName string, f FaaS) {

    // When registering a FaaS logic callback, create a FaaSDesc description object
    faaSDesc, err := NewFaaSDesc(fnName, f)
    if err != nil {
        panic(err)
    }

    pool.fnLock.Lock() // Write lock
    defer pool.fnLock.Unlock()

    if _, ok := pool.fnRouter[fnName]; !ok {
        // Register the FaaSDesc description object into fnRouter
        pool.fnRouter[fnName] = faaSDesc
    } else {
        errString := fmt.Sprintf("KisPool FaaS Repeat FuncName=%s", fnName)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}

Now, the key in fnRouter remains the FunctionName, but the value is the FaaSDesc description object for the current FaaS function.

11.1.4 Dispatching FaaSDesc in kisPool

Finally, when scheduling a function, use FaaSDesc to retrieve the function address and parameter list for scheduling. The modified CallFunction() is as follows:

kis-flow/kis/pool.go

// CallFunction dispatches a Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

    if funcDesc, ok := pool.fnRouter[fnName]; ok {

        // Parameter list for the scheduled function
        params := make([]reflect.Value, 0, funcDesc.ArgNum)

        for _, argType := range funcDesc.ArgsType {

            // If it's a Flow type parameter, pass the value of flow
            if isFlowType(argType) {
                params = append(params, reflect.ValueOf(flow))
                continue
            }

            // If it's a Context type parameter, pass the value of ctx
            if isContextType(argType) {
                params = append(params, reflect.ValueOf(ctx))
                continue
            }

            // If it's a Slice type parameter, pass the value of flow.Input()
            if isSliceType(argType) {
                params = append(params, value)
                continue
            }

            // If the parameter is neither Flow, Context, nor Slice type, give it the zero value
            params = append(params, reflect.Zero(argType))
        }

        // Invoke the logic of the current function
        retValues := funcDesc.FuncValue.Call(params)

        // Retrieve the first return value; if nil, return nil
        ret := retValues[0].Interface()
        if ret == nil {
            return nil
        }

        // If the return value is of error type, return the error
        return retValues[0].Interface().(error)

    }

    log.Logger().ErrorFX(ctx, "FuncName: %s Cannot find in KisPool, Not Added.\n", fnName)

    return errors.New("FuncName: " + fnName + " Cannot find in KisPool, Not Added.")
}

The overall scheduling logic of the function is roughly as follows:
First, use fnName to route to the corresponding FaaSDesc from fnRouter. Iterate over the parameter list of FaaSDesc:
Extract the Context and Flow objects, extract the custom slice parameters passed in, and if the parameter is neither Flow, Context, nor Slice type, give it the zero value as shown below:

            params = append(params, reflect.Zero(argType))

Finally, execute the function scheduling:

        retValues := funcDesc.FuncValue.Call(params)

Obtain the value of the first return value error; if it is nil, return nil, otherwise return the error type.

In this way, we have successfully established the self-describing scheduling mode for FaaS. With this functionality, what can KisFlow do? In the next section, we can serialize the custom parameter data types passed in when scheduling FaaSDesc to obtain the data types expected by the developer.

11.2 Custom Data Type Serialization for FaaS Parameters

11.2.1 Serialize Interface

First, let's define a data serialization interface. Create a file named serialize.go under kis-flow/kis/ as follows:

kis-flow/kis/serialize.go

// Serialize Data Serialization Interface
type Serialize interface {
    // UnMarshal is used to deserialize KisRowArr into a specified type value.
    UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
    // Marshal is used to serialize a specified type value into KisRowArr.
    Marshal(interface{}) (common.KisRowArr, error)
}

Here, KisRowArr is the data slice that we pass to each function in KisFlow, previously defined in kis-flow/common/data_type.go:

package common

// KisRow represents a row of data
type KisRow interface{}

// KisRowArr represents a batch of data for a single business process
type KisRowArr []KisRow

/*
    KisDataMap holds all data carried by the current flow
    key :  the Function ID where the data resides
    value: the corresponding KisRow
*/
type KisDataMap map[string]KisRowArr

The Serialize interface provides two methods:

  • UnMarshal: Used to deserialize KisRowArr into a specified type value.
  • Marshal: Used to serialize a specified type value into KisRowArr. KisFlow will provide a default Serialize implementation for each FaaS function, but developers can also customize their own Serialize implementations to perform custom data serialization actions on FaaS parameters.

11.2.2 Default Serialization in KisFlow

KisFlow provides a default Serialize implementation, primarily in JSON format. Create a serialize folder under kis-flow/, and then create a file named serialize_default.go under kis-flow/serialize/ with the following code for serialization and deserialization:

kis-flow/serialize/serialize_default.go

package serialize

import (
    "encoding/json"
    "fmt"
    "kis-flow/common"
    "reflect"
)

type DefaultSerialize struct{}

// UnMarshal is used to deserialize KisRowArr into a specified type value.
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
    // Ensure the input type is a slice
    if r.Kind() != reflect.Slice {
        return reflect.Value{}, fmt.Errorf("r must be a slice")
    }

    slice := reflect.MakeSlice(r, 0, len(arr))

    // Iterate over each element and attempt deserialization
    for _, row := range arr {
        var elem reflect.Value
        var err error

        // Attempt to assert as a struct or pointer
        elem, err = unMarshalStruct(row, r.Elem())
        if err == nil {
            slice = reflect.Append(slice, elem)
            continue
        }

        // Attempt to directly deserialize a string
        elem, err = unMarshalJsonString(row, r.Elem())
        if err == nil {
            slice = reflect.Append(slice, elem)
            continue
        }

        // Attempt to serialize to JSON and then deserialize
        elem, err = unMarshalJsonStruct(row, r.Elem())
        if err == nil {
            slice = reflect.Append(slice, elem)
        } else {
            return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
        }
    }

    return slice, nil
}

// Marshal is used to serialize a specified type value into KisRowArr (JSON serialization).
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
    var arr common.KisRowArr

    switch reflect.TypeOf(i).Kind() {
    case reflect.Slice, reflect.Array:
        slice := reflect.ValueOf(i)
        for i := 0; i < slice.Len(); i++ {
            // Serialize each element to a JSON string and add it to the slice.
            jsonBytes, err := json.Marshal(slice.Index(i).Interface())
            if err != nil {
                return nil, fmt.Errorf("failed to marshal element to JSON: %v", err)
            }
            arr = append(arr, string(jsonBytes))
        }
    default:
        // If not a slice or array type, serialize the entire structure to a JSON string.
        jsonBytes, err := json.Marshal(i)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal element to JSON: %v", err)
        }
        arr = append(arr, string(jsonBytes))
    }

    return arr, nil
}

Some helper functions are defined as follows:

kis-flow/serialize/serialize_default.go

// Attempt to assert as a struct or pointer
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
    // Check if row is a struct or struct pointer type
    rowType := reflect.TypeOf(row)
    if rowType == nil {
        return reflect.Value{}, fmt.Errorf("row is nil pointer")
    }
    if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
        return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
    }

    // If row is a pointer type, get its underlying type
    if rowType.Kind() == reflect.Ptr {
        // Null pointer
        if reflect.ValueOf(row).IsNil() {
            return reflect.Value{}, fmt.Errorf("row is nil pointer")
        }

        // Dereference
        row = reflect.ValueOf(row).Elem().Interface()

        // Get the type after dereferencing
        rowType = reflect.TypeOf(row)
    }

    // Check if row can be asserted to elemType (target type)
    if !rowType.AssignableTo(elemType) {
        return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
    }

    // Convert row to reflect.Value and return
    return reflect.ValueOf(row), nil
}

// Attempt to directly deserialize a string (deserialize JSON string to struct)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
    // Check if the source data can be asserted as a string
    str, ok := row.(string)
    if !ok {
        return reflect.Value{}, fmt.Errorf("not a string")
    }

    // Create a new struct instance to store the deserialized value
    elem := reflect.New(elemType).Elem()

    // Attempt to deserialize the JSON string into the struct.
    if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
        return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
    }

    return elem, nil
}

// Attempt to serialize to JSON and then deserialize (convert struct to JSON string, then deserialize JSON string to struct)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
    // Serialize row to JSON string
    jsonBytes, err := json.Marshal(row)
    if err != nil {
        return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v", err)
    }

    // Create a new struct instance to store the deserialized value
    elem := reflect.New(elemType).Interface()

    // Deserialize the JSON string into the struct
    if err := json.Unmarshal(jsonBytes, elem); err != nil {
        return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v", err)
    }

    return reflect.ValueOf(elem).Elem(), nil
}
  • UnMarshal(): First checks if the parameter is a slice. If it is, it serializes each element in the slice. It first tries to deserialize using unMarshalStruct(), then unMarshalJsonString(), and finally unMarshalJsonStruct() if the previous attempts fail.
  • Marshal(): Serializes any type into a JSON binary string stored in KisRowArr.

Note: The current default serialization in KisFlow only implements JSON serialization. Developers can refer to DefaultSerialize{} to implement their own serialization and deserialization for other formats.

11.2.3 Default Serialize Instance

Define a global default serialization instance, defaultSerialize, in the serialize interface definition.

kis-flow/kis/serialize.go

// defaultSerialize is the default serialization implementation provided by KisFlow (developers can customize)
var defaultSerialize = &serialize.DefaultSerialize{}

Also, provide a method to check if a data type implements the Serialize interface:

kis-flow/kis/serialize.go

// isSerialize checks if the passed paramType implements the Serialize interface
func isSerialize(paramType reflect.Type) bool {
    return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}

11.2.4 Implementing the Serialize Interface for FaaSDesc

Next, we will extend FaaSDesc to implement the Serialize interface. When scheduling a FaaSDesc, the input parameters passed to it will be serialized to obtain the corresponding specific type parameters. The definition is as follows:

kis-flow/kis/faas.go

// FaaSDesc describes the FaaS callback computation business function
type FaaSDesc struct {
    // +++++++
    Serialize                // Serialization implementation for the data input and output of the current function
    // +++++++
    FnName    string         // Function name
    f         interface{}    // FaaS function
    fName     string         // Function name
    ArgsType  []reflect.Type // Collection of function parameter types
    ArgNum    int            // Number of function parameters
    FuncType  reflect.Type   // Function type
    FuncValue reflect.Value  // Function value (function address)
}

Then, in the constructor method NewFaaSDesc(), add a check for custom parameters. Determine whether the passed custom parameters implement the two serialization interfaces of Serialize. If they do, use the custom serialization interface; if not, use the default DefaultSerialize{} instance.

kis-flow/kis/faas.go

// NewFaaSDesc creates an FaaSDesc description instance based on the registered FnName and FaaS callback function
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {

    // ++++++++++
    // Input/output serialization instance
    var serializeImpl Serialize
    // ++++++++++

    // ... ...
    // ... ...

    // Iterate over the parameter types of the FaaS
    for i := 0; i < funcType.NumIn(); i++ {

        // Get the type of the i-th formal parameter
        paramType := funcType.In(i)

        if isFlowType(paramType) {
            // Check if it contains a parameter of type kis.Flow
            containsKisFlow = true

        } else if isContextType(paramType) {
            // Check if it contains a parameter of type context.Context
            containsCtx = true

        } else if isSliceType(paramType) {

            // Get the element type of the current parameter slice
            itemType := paramType.Elem()

            // If the current parameter is a pointer type, get the type pointed to by the pointer
            if itemType.Kind() == reflect.Ptr {
                itemType = itemType.Elem() // Get the type pointed to by the pointer
            }


            // +++++++++++++++++++++++++++++

            // Check if f implements the Serialize interface
            if isSerialize(itemType) {
                // If the current parameter implements the Serialize interface, use the serialization implementation of the current parameter
                serializeImpl = reflect.New(itemType).Interface().(Serialize)

            } else {
                // If the current parameter does not implement the Serialize interface, use the default serialization implementation
                serializeImpl = defaultSerialize // Use global default implementation
            }
            // +++++++++++++++++++++++++++++++

        } else {
            // Other types are not supported
        }

        // Append the current parameter type to the argsType collection
        argsType[i] = paramType
    }

    // ... ...
    // ... ...

    // Return the FaaSDesc description instance
    return &FaaSDesc{
        Serialize: serializeImpl,
        FnName:    fnName,
        f:         f,
        fName:     fullName,
        ArgsType:  argsType,
        ArgNum:    len(argsType),
        FuncType:  funcType,
        FuncValue: funcValue,
    }, nil
}

11.2.5 Completing FaaS Data Serialization During Scheduling

Finally, when scheduling FaaSDesc, if it is a custom slice parameter, deserialize the raw data of flow.Input() into the structure data required by the developer. Implement it as follows:

kis-flow/kis/pool.go

// CallFunction schedules the function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

    if funcDesc, ok := pool.fnRouter[fnName]; ok {

        // List of parameters for the scheduled function
        params := make([]reflect.Value, 0, funcDesc.ArgNum)

        for _, argType := range funcDesc.ArgsType {

            // If it is a Flow type parameter, pass the value of flow
            if isFlowType(argType) {
                params = append(params, reflect.ValueOf(flow))
                continue
            }

            // If it is a Context type parameter, pass the value of ctx
            if isContextType(argType) {
                params = append(params, reflect.ValueOf(ctx))
                continue
            }

            // If it is a Slice type parameter, pass the value of flow.Input()
            if isSliceType(argType) {

                // +++++++++++++++++++
                // Deserialize the raw data in flow.Input() into the data of argType type
                value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
                if err != nil {
                    log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
                } else {
                    params = append(params, value)
                    continue
                }
                // +++++++++++++++++++
            }

            // If the passed parameter is neither Flow type, nor Context type, nor Slice type, give the default zero value
            params = append(params, reflect.Zero(argType))
        }

        // Call the computation logic of the current function
        retValues := funcDesc.FuncValue.Call(params)

        // Get the first return value; if it is nil, return nil
        ret := retValues[0].Interface()
        if ret == nil {
            return nil
        }

        // If the return value is of type error, return the error
        return retValues[0].Interface().(error)

    }

    log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

    return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}

This completes the integration of data serialization with the FaaSDesc module. Next, we will write a unit test to test this capability.

11.3 Unit Test for Custom Parameter Serialization

11.3.1 Definition of Flow and Function Configuration Files

For unit testing, we create two Function configurations as follows:

kis-flow/test/load_conf/func/func-avgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: Student Average Score
    must:
        - stu_id

kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: Student Average Score
    must:
        - stu_id

Next, we define a Flow to link the two functions together:

kis-flow/test/load_conf/flow/flow-StuAvg.yml

kistype: flow
status: 1
flow_name: StuAvg
flows:
    - fname: AvgStuScore
    - fname: PrintStuAvgScore

11.3.2 Definition of Custom Base Data Proto

In the kis-flow/test/ directory, create a proto/ folder and a custom base data proto for future data protocol reuse:

kis-flow/test/proto/stu_score.go

package proto

// Student Scores
type StuScores struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

// Student's Average Score
type StuAvgScore struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

11.3.3 Define Two FaaS Callback Functions

Define two FaaS functions: one to calculate a student's average score and one to print the student's average score:

kis-flow/test/faas/faas_stu_score_avg.go

package faas

import (
    "context"
    "kis-flow/kis"
    "kis-flow/serialize"
    "kis-flow/test/proto"
)

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    proto.StuScores
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    proto.StuAvgScore
}

// AvgStuScore(FaaS) calculates the student's average score
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        avgScore := proto.StuAvgScore{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // Submit the result data
        _ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
    }
    return nil
}

The AvgStuScore() function is our improved FaaS function, where the third parameter rows []*AvgStuScoreIn is a custom serialized parameter. Previously, we used flow.Input() to get the raw data and then traversed it. Although this method still works, it requires developers to manually assert and judge in the FaaS function, which increases development costs. Now, developers can describe a parameter's data through AvgStuScoreIn and use rows to get the already serialized structure, improving code readability and reducing development costs.

The implementation for printing the average score FaaS is as follows:

kis-flow/test/faas/faas_stu_score_avg_print.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
    "kis-flow/serialize"
    "kis-flow/test/proto"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    proto.StuAvgScore
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }
    return nil
}

Similar to the previous function, we use custom input parameters for logic development.

11.3.4 Unit Test Case

Next, we write a test case for the above Flow:

kis-flow/test/kis_auto_inject_param_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/file"
    "kis-flow/flow"
    "kis-flow/kis"
    "kis-flow/test/faas"
    "kis-flow/test/proto"
    "testing"
)

func TestAutoInjectParamWithConfig(t *testing.T) {
    ctx := context.Background()

    kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)

    // 1. Load the configuration files and build the Flow
    if err := file.ConfigImportYaml("load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get the Flow
    flow1 := kis.Pool().GetFlow("StuAvg")
    if flow1 == nil {
        panic("flow1 is nil")
    }

    // 3. Submit raw data
    _ = flow1.CommitRow(&faas.AvgStuScoreIn{
        StuScores: proto.StuScores{
            StuId:  100,
            Score1: 1,
            Score2: 2,
            Score3: 3,
        },
    })
    _ = flow1.CommitRow(faas.AvgStuScoreIn{
        StuScores: proto.StuScores{
            StuId:  100,
            Score1: 1,
            Score2: 2,
            Score3: 3,
        },
    })

    // Submit raw data (JSON string)
    _ = flow1.CommitRow(`{"stu_id":101}`)

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

When submitting raw data, we use the default serialization method, which supports JSON deserialization. In CommitRow(), we submit three pieces of data: the first two are structure data, and the last one is a JSON string. All of them are supported.

Navigate to kis-flow/test/ and execute:

$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig

The result is as follows:

$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]

KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

context.Background
 ====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]

KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok      kis-flow/test   0.030s

11.4 [V1.0] Source Code

https://github.com/aceld/kis-flow/releases/tag/v1.0

Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

Last Stories

What's your thoughts?

Please Register or Login to your account to be able to submit your comment.