Create a custom output component
Output components in Baker receive records at the end of the filter chain and are in charge of storing them, eventually sending the result (like a temporary file in the disk) to an Upload component.
To create an output and make it available to Baker, one must:
- Implement the Output interface
- Fill-up an
OutputDesc
struct and register it within Baker viaComponents
The Output interface
New Output
components need to implement the Output interface.
type Output interface {
Run(in <-chan OutputRecord, upch chan<- string) error
Stats() OutputStats
CanShard() bool
}
The Run
function implements the component logic and gets a channel where it receives
OutputRecord objects and a channel to
communicate to the Upload components what to upload.
CanShard
is the function telling whether the output is able to manage sharding. Read the page
dedicated to the sharding to go deeper in the topic.
Stats
is used to report metrics, see the dedicated page.
OutputDesc
var MyOutputDesc = baker.OutputDesc{
Name: "MyOutput",
New: NewMyOutput,
Config: &MyOutputConfig{},
Raw: true,
Help: "High-level description of MyOutput",
}
This object has a Name
, that is used in the Baker configuration file to identify the output,
a constructor-like function (New
), a config object (used to parse the output configuration in the
TOML file) and a help text that must help the users to use the component and its configuration
parameters. The Raw
field instructs Baker whether it should send raw records in addition
to single fields (see below for details).
The New
function
The New
field in the OutputDesc
object should be to assigned to a function that returns a new Output
.
The function receives an OutputParams object and returns an instance of Output.
Through OutputParams
, an Output receives its index (in case there’s multiple output processes)
and a list of field indexes, in addition to the fields inherited from ComponentParams.
OutputParams.Index
indicates a unique index of the output process among the concurrent output
processes generated by Baker. The procs
configuration can be used to tune the total number of
concurrent processes, see Pipeline configuration for details.
Note that the output should extensively document in OutputDesc.Help
if it is able to manage
concurrent processing or if the user should set it with a single process (procs=1
).
Read Tuning concurrency for an in-depth guide to the subject.
OutputParams.Fields
is a list of FieldIndex
that the output will receive, ordered as they are in the TOML. They’re also in the same order as
the fields in OutputRecord.Fields, see
below for details.
If, for any reason, the output needs to retrieve the fields name (like the SQLite output does to
get the columns names), then OutputParams.FieldName
can be used.
Output configuration and help
The output configuration object (MyOutputConfig
in the previous example) must export all
configuration parameters that the user can set in the TOML topology file.
Each field in the struct must include a help
string tag (mandatory) and a required
boolean tag
(default to false
).
All these parameters appear in the generated help. help
should describe the parameter role and/or
its possible values, required
informs Baker it should refuse configurations in which that field
is not defined.
OutputRecord
The OutputRecord channel received by
the output component can be closed by Baker at any time and the output should return from the Run
function as soon as possible when this happens.
Until that moment, the output component must continuously read new records, processing them.
The OutputRecord.Fields
slice contains the string values of the fields that the user choose to
send to the output configuring the
fields
key
in the [output]
section of the TOML topology file.
Fields are ordered in the same way than the slice of FieldIndex
received in OutputParams.Fields
.
In case of a raw output, OutputRecord.Record
contains both the serialized record as a byte slice
and the field values.
Prepare data for uploading
If the output component produces files on the local filesystem, then it should send their paths to the upload component (using the shared string channel), regardless of the real presence of a configured upload (that is unknown to the output). If the upload is absent, then Baker will ignore those messages.
The output can send a single message at the end of its job (think to a sqlite database that should only be uploaded before Baker exits) or can upload files periodically, like the FileWriter component does when it rotates (i.e. it stops writing to a file, send its path to the upload, and then creates a new file).
Write tests
Tests for output components often require either mocking external resources/dependencies (think to an output writing to DynamoDB) or creating temporary files. How to test the components is strictly tied to the component implementation.
For these reasons there isn’t a single golden rule for testing outputs, but some common rules can be identified:
- test the
New()
(constructor-like) function, to check that the function is able to correctly instantiate the component with valid configurations and intercept wrong ones - create small and isolated functions where possible and unit-test them
- test the whole component at integration level
The last point is where we can go a bit deeper. A possible strategy is to create a new output
instance using the New
function, passing it the in (from Baker to the component) and out
(from the component to the upload) channels and use those channels to interact with the output.
func TestMyOutput(t *testing.T) {
cfg := ... // define cfg with component configuration
output := NewMyOutput(cfg) // use the contructor-like New function
outch := make(chan baker.OutputRecord)
upch := make(chan string)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
outch <- baker.OutputRecord{Fields: []string{"a", "b", "c"}, Record: []byte("rawrecord")}
// add more records to outch
close(outch)
for upchpath := range upch {
// check upchpath and set some vars/objs
if upchpath ... { // check the path or open the file or whatever...
checkVar = "something"
}
}
wg.Done()
}()
// run the output, consuming the outch and sending results to upch
output.Run(outch, upch)
close(upch)
wg.Wait() // wait for the job to end
// now we can check the vars/objs created in the goroutine
if checkVar != wantVar {
t.Fatalf("error!")
}
}
The SQLite
component
has a good example of this
strategy.