Create a custom input component
The job of a Baker input is to fetch blob of data containing one or multiple serialized records and send them to Baker.
The input isn’t in charge of splitting/parsing the input data into Records (that is done by Baker),
but only retrieving them as fast as possible in raw format adding, if any, metadata to them and
then sending those values to Baker through a
*Data channel. The channel size is
customizable in the topology TOML with [input] chansize=<value> (default to 1024).
To create an input and make it available to Baker, one must:
- Implement the Input interface
- Fill an InputDescstructure and register it within Baker viaComponents.
Daemon vs Batch
The input component determines the Baker behavior between a batch processor or a long-living daemon.
If the input exits when its data processing has completed, then Baker waits for the topology to end and then exits.
If the input never exits, then Baker acts as a daemon.
Data
The Data object that the input must fill in
with read data has two fields: Bytes, that must contain the raw read bytes (possibly containing
more records separated by \n), and Meta.
Metadata can contain additional
information Baker will associate with each of the serialized Record contained in Data.
Typical information could be the time of retrieval, the filename (in case Records come from a file), etc.
The Input interface
New Input components need to implement the Input interface.
type Input interface {
	Run(output chan<- *Data) error
	Stop()
	Stats() InputStats
	FreeMem(data *Data)
}
The Run function implements the component logic and receives a channel where it sends the
raw data it processes.
FreeMem(data *Data) is called by Baker when data is no longer needed. This is an occasion
for the input to recycle memory, for example if the input uses a sync.Pool to create new
instances of baker.Data.
InputDesc
var MyInputDesc = baker.InputDesc{
	Name:   "MyInput",
	New:    NewMyInput,
	Config: &MyInputConfig{},
	Help:   "High-level description of MyInput",
}
This object has a Name, that is used in the Baker configuration file to identify the input,
a costructor-like function (New), a config object (where the parsed input configuration from the
TOML file is stored) and a help text that must help the users to use the component and its
configuration parameters.
The New function
The New field in the InputDesc object should be to assigned to a function that returns a new Input.
The function receives a InputParams object and returns an instance of Input.
The function should verify the configuration params into InputParams.DecodedConfig and initialize
the component.
Input configuration and help
The input configuration object (MyInputConfig in the previous example) must export all
configuration parameters that the user can set in the TOML topology file.
Each field in the struct must include a help string tag (mandatory) and a required boolean tag
(default to false).
All these parameters appear in the generated help. help should describe the parameter role and/or
its possible values, required informs Baker it should refuse configurations in which that field
is not defined.
Write tests
To test an input component we suggest two main paths:
- test the component in isolation, calling the Runfunction
- write an higher-level test by running a complete Baker topology
Regardless of the chosen path, two additional unit tests are always suggested:
- test the New()(constructor-like) function, to check that the function is able to correctly instantiate the component with valid configurations and intercept incorrect ones (in case that’s possible)
- create small and isolated functions where possible and unit-test them
Test calling Run()
In case we want to test the component calling the Run function, this is an example of test where,
after some initialization, the input.Run function is called and the produced Data is checked
in a goroutine:
func TestMyInput(t *testing.T) {
    ch := make(chan *baker.Data)
    defer close(ch)
    // start a goroutine that acts as Baker, consuming the baker.Data produced by the input
    go func() {
        for data := range ch {
            // test `data`, that comes from the component,
            // like checking its content, parse the records, metadata, etc
            if something_is_wrong(data) {
                t.Fatalf("error!")
            }
        }
    }()
    // Configure the input.
    cfg := ...
    input, err := NewMyInput(cfg) // use the contructor-like New function
    // check err
    // run the input
    if err := input.Run(ch); err != nil {
        t.Fatal(err)
    }
}
The List input has an example
of this testing strategy (look for the TestListBasic test).
Test the component running a topology
If we want to test the component creating and running a topology, we need to create one starting
from the TOML configuration and then calling NewConfigFromToml, NewTopologyFromConfig and Run.
The Base, Recorder and RawRecorder outputs included in the
outputtest package can be
helpful here to obtain the output and check it:
func TestMyInput(t *testing.T) {
    toml := `
    [input]
    name = "MyInput"
    [output]
    name="RawRecorder"
    procs=1
    `
    // Add the input to be tested and a testing output
    c := baker.Components{
        Inputs:  []baker.InputDesc{MyInputDesc},
        Outputs: []baker.OutputDesc{outputtest.RawRecorderDesc},
    }
    // Create and start the topology
    cfg, err := baker.NewConfigFromToml(strings.NewReader(toml), c)
    if err != nil {
        t.Error(err)
    }
    topology, err := baker.NewTopologyFromConfig(cfg)
    if err != nil {
        t.Error(err)
    }
    topology.Start()
    
    // In this goroutine we should provide some inputs to the component
    // The format and how to send them to the component, depends on
    // the component itself
    go func() {
        defer topology.Stop()
        sendDataToMyInput() // fake function, you need to implement your logic here
    }
    topology.Wait() // wait for Baker to quit after `topology.Stop()`
    if err := topology.Error(); err != nil {
        t.Fatalf("topology error: %v", err)
    }
    // retrieve the output and test the records
    out := topology.Output[0].(*outputtest.Recorder)
    if len(out.Records) != want {
        t.Errorf("want %d log lines, got %d", want, len(out.Records))
    }
    // more testing on out.Records...
}
The TCP input includes an example
of this testing strategy.