Create a custom input component

The job of a Baker input is to fetch blob of data containing one or multiple serialized records and send them to Baker.

The input isn’t in charge of splitting/parsing the input data into Records (that is done by Baker), but only retrieving them as fast as possible in raw format adding, if any, metadata to them and then sending those values to Baker through a *Data channel. The channel size is customizable in the topology TOML with [input] chansize=<value> (default to 1024).

To create an input and make it available to Baker, one must:

Daemon vs Batch

The input component determines the Baker behavior between a batch processor or a long-living daemon.

If the input exits when its data processing has completed, then Baker waits for the topology to end and then exits.

If the input never exits, then Baker acts as a daemon.

Data

The Data object that the input must fill in with read data has two fields: Bytes, that must contain the raw read bytes (possibly containing more records separated by \n), and Meta.

Metadata can contain additional information Baker will associate with each of the serialized Record contained in Data.
Typical information could be the time of retrieval, the filename (in case Records come from a file), etc.

The Input interface

New Input components need to implement the Input interface.

type Input interface {
	Run(output chan<- *Data) error
	Stop()
	Stats() InputStats
	FreeMem(data *Data)
}

The Run function implements the component logic and receives a channel where it sends the raw data it processes.

FreeMem(data *Data) is called by Baker when data is no longer needed. This is an occasion for the input to recycle memory, for example if the input uses a sync.Pool to create new instances of baker.Data.

InputDesc

var MyInputDesc = baker.InputDesc{
	Name:   "MyInput",
	New:    NewMyInput,
	Config: &MyInputConfig{},
	Help:   "High-level description of MyInput",
}

This object has a Name, that is used in the Baker configuration file to identify the input, a costructor-like function (New), a config object (where the parsed input configuration from the TOML file is stored) and a help text that must help the users to use the component and its configuration parameters.

The New function

The New field in the InputDesc object should be to assigned to a function that returns a new Input.

The function receives a InputParams object and returns an instance of Input.

The function should verify the configuration params into InputParams.DecodedConfig and initialize the component.

Input configuration and help

The input configuration object (MyInputConfig in the previous example) must export all configuration parameters that the user can set in the TOML topology file.

Each field in the struct must include a help string tag (mandatory) and a required boolean tag (default to false).

All these parameters appear in the generated help. help should describe the parameter role and/or its possible values, required informs Baker it should refuse configurations in which that field is not defined.

Write tests

To test an input component we suggest two main paths:

  • test the component in isolation, calling the Run function
  • write an higher-level test by running a complete Baker topology

Regardless of the chosen path, two additional unit tests are always suggested:

  • test the New() (constructor-like) function, to check that the function is able to correctly instantiate the component with valid configurations and intercept incorrect ones (in case that’s possible)
  • create small and isolated functions where possible and unit-test them

Test calling Run()

In case we want to test the component calling the Run function, this is an example of test where, after some initialization, the input.Run function is called and the produced Data is checked in a goroutine:

func TestMyInput(t *testing.T) {
    ch := make(chan *baker.Data)
    defer close(ch)

    // start a goroutine that acts as Baker, consuming the baker.Data produced by the input
    go func() {
        for data := range ch {
            // test `data`, that comes from the component,
            // like checking its content, parse the records, metadata, etc
            if something_is_wrong(data) {
                t.Fatalf("error!")
            }
        }
    }()

    // Configure the input.
    cfg := ...

    input, err := NewMyInput(cfg) // use the contructor-like New function
    // check err

    // run the input
    if err := input.Run(ch); err != nil {
        t.Fatal(err)
    }
}

The List input has an example of this testing strategy (look for the TestListBasic test).

Test the component running a topology

If we want to test the component creating and running a topology, we need to create one starting from the TOML configuration and then calling NewConfigFromToml, NewTopologyFromConfig and Run.

The Base, Recorder and RawRecorder outputs included in the outputtest package can be helpful here to obtain the output and check it:

func TestMyInput(t *testing.T) {
    toml := `
    [input]
    name = "MyInput"

    [output]
    name="RawRecorder"
    procs=1
    `
    // Add the input to be tested and a testing output
    c := baker.Components{
        Inputs:  []baker.InputDesc{MyInputDesc},
        Outputs: []baker.OutputDesc{outputtest.RawRecorderDesc},
    }

    // Create and start the topology
    cfg, err := baker.NewConfigFromToml(strings.NewReader(toml), c)
    if err != nil {
        t.Error(err)
    }
    topology, err := baker.NewTopologyFromConfig(cfg)
    if err != nil {
        t.Error(err)
    }
    topology.Start()
    
    // In this goroutine we should provide some inputs to the component
    // The format and how to send them to the component, depends on
    // the component itself
    go func() {
        defer topology.Stop()
        sendDataToMyInput() // fake function, you need to implement your logic here
    }

    topology.Wait() // wait for Baker to quit after `topology.Stop()`
    if err := topology.Error(); err != nil {
        t.Fatalf("topology error: %v", err)
    }

    // retrieve the output and test the records
    out := topology.Output[0].(*outputtest.Recorder)
    if len(out.Records) != want {
        t.Errorf("want %d log lines, got %d", want, len(out.Records))
    }

    // more testing on out.Records...
}

The TCP input includes an example of this testing strategy.

Last modified November 2, 2020