Create a custom output component

Output components in Baker receive records at the end of the filter chain and are in charge of storing them, eventually sending the result (like a temporary file in the disk) to an Upload component.

To create an output and make it available to Baker, one must:

The Output interface

New Output components need to implement the Output interface.

type Output interface {
	Run(in <-chan OutputRecord, upch chan<- string) error
	Stats() OutputStats
	CanShard() bool
}

The Run function implements the component logic and gets a channel where it receives OutputRecord objects and a channel to communicate to the Upload components what to upload.

CanShard is the function telling whether the output is able to manage sharding. Read the page dedicated to the sharding to go deeper in the topic.

Stats is used to report metrics, see the dedicated page.

OutputDesc

var MyOutputDesc = baker.OutputDesc{
	Name:   "MyOutput",
	New:    NewMyOutput,
    Config: &MyOutputConfig{},
    Raw:    true,
	Help:   "High-level description of MyOutput",
}

This object has a Name, that is used in the Baker configuration file to identify the output, a constructor-like function (New), a config object (used to parse the output configuration in the TOML file) and a help text that must help the users to use the component and its configuration parameters. The Raw field instructs Baker whether it should send raw records in addition to single fields (see below for details).

The New function

The New field in the OutputDesc object should be to assigned to a function that returns a new Output.

The function receives an OutputParams object and returns an instance of Output.

Through OutputParams, an Output receives its index (in case there’s multiple output processes) and a list of field indexes, in addition to the fields inherited from ComponentParams.

OutputParams.Index indicates a unique index of the output process among the concurrent output processes generated by Baker. The procs configuration can be used to tune the total number of concurrent processes, see Pipeline configuration for details.
Note that the output should extensively document in OutputDesc.Help if it is able to manage concurrent processing or if the user should set it with a single process (procs=1). Read Tuning concurrency for an in-depth guide to the subject.

OutputParams.Fields is a list of FieldIndex that the output will receive, ordered as they are in the TOML. They’re also in the same order as the fields in OutputRecord.Fields, see below for details.
If, for any reason, the output needs to retrieve the fields name (like the SQLite output does to get the columns names), then OutputParams.FieldName can be used.

Output configuration and help

The output configuration object (MyOutputConfig in the previous example) must export all configuration parameters that the user can set in the TOML topology file.

Each field in the struct must include a help string tag (mandatory) and a required boolean tag (default to false).

All these parameters appear in the generated help. help should describe the parameter role and/or its possible values, required informs Baker it should refuse configurations in which that field is not defined.

OutputRecord

The OutputRecord channel received by the output component can be closed by Baker at any time and the output should return from the Run function as soon as possible when this happens.

Until that moment, the output component must continuously read new records, processing them.

The OutputRecord.Fields slice contains the string values of the fields that the user choose to send to the output configuring the fields key in the [output] section of the TOML topology file.

Fields are ordered in the same way than the slice of FieldIndex received in OutputParams.Fields.

In case of a raw output, OutputRecord.Record contains both the serialized record as a byte slice and the field values.

Prepare data for uploading

If the output component produces files on the local filesystem, then it should send their paths to the upload component (using the shared string channel), regardless of the real presence of a configured upload (that is unknown to the output). If the upload is absent, then Baker will ignore those messages.

The output can send a single message at the end of its job (think to a sqlite database that should only be uploaded before Baker exits) or can upload files periodically, like the FileWriter component does when it rotates (i.e. it stops writing to a file, send its path to the upload, and then creates a new file).

Write tests

Tests for output components often require either mocking external resources/dependencies (think to an output writing to DynamoDB) or creating temporary files. How to test the components is strictly tied to the component implementation.

For these reasons there isn’t a single golden rule for testing outputs, but some common rules can be identified:

  • test the New() (constructor-like) function, to check that the function is able to correctly instantiate the component with valid configurations and intercept wrong ones
  • create small and isolated functions where possible and unit-test them
  • test the whole component at integration level

The last point is where we can go a bit deeper. A possible strategy is to create a new output instance using the New function, passing it the in (from Baker to the component) and out (from the component to the upload) channels and use those channels to interact with the output.

func TestMyOutput(t *testing.T) {
    cfg := ... // define cfg with component configuration
    output := NewMyOutput(cfg) // use the contructor-like New function
    
    outch := make(chan baker.OutputRecord)
    upch := make(chan string)

    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        outch <- baker.OutputRecord{Fields: []string{"a", "b", "c"}, Record: []byte("rawrecord")}
        // add more records to outch
        close(outch)
        for upchpath := range upch {
            // check upchpath and set some vars/objs
            if upchpath ... { // check the path or open the file or whatever...
                checkVar = "something"
            }
        }
        wg.Done()
    }()
    // run the output, consuming the outch and sending results to upch
    output.Run(outch, upch)
    close(upch)

    wg.Wait() // wait for the job to end
    
    // now we can check the vars/objs created in the goroutine
    if checkVar != wantVar {
        t.Fatalf("error!")
    }
}

The SQLite component has a good example of this strategy.

Last modified November 10, 2020