aboutsummaryrefslogtreecommitdiffstats
path: root/skate
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-04-27 10:09:02 +0200
committerMartin Czygan <martin.czygan@gmail.com>2021-04-27 10:09:02 +0200
commit87ff9eb5dd33719c3947034ea4695c3e27ba6f9d (patch)
treebd7940b6d7b2ee16a0a9ead9ad9611f7318ee90b /skate
parent6fe4bf4f15432e5078faf60de23ff8cdee637035 (diff)
downloadrefcat-87ff9eb5dd33719c3947034ea4695c3e27ba6f9d.tar.gz
refcat-87ff9eb5dd33719c3947034ea4695c3e27ba6f9d.zip
add prefix wrapper
Diffstat (limited to 'skate')
-rw-r--r--skate/cmd/skate-map/main.go12
-rw-r--r--skate/map.go29
2 files changed, 33 insertions, 8 deletions
diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go
index ffe1017..94cf606 100644
--- a/skate/cmd/skate-map/main.go
+++ b/skate/cmd/skate-map/main.go
@@ -27,18 +27,18 @@ func main() {
// XXX: introduce prefixes
availableMappers := map[string]skate.Mapper{
"id": skate.Identity,
- "ff": skate.CreateFixedMapper(*extraValue),
- "title": skate.MapperTitle,
- "tnorm": skate.MapperTitleNormalized,
- "tnysi": skate.MapperTitleNysiis,
- "tsand": skate.MapperTitleSandcrawler,
+ "field": skate.WithPrefix(skate.CreateFixedMapper(*extraValue), "field"),
+ "title": skate.WithPrefix(skate.MapperTitle, "title"),
+ "tnorm": skate.WithPrefix(skate.MapperTitleNormalized, "tnorm"),
+ "tnysi": skate.WithPrefix(skate.MapperTitleNysiis, "tnysi"),
+ "tsand": skate.WithPrefix(skate.MapperTitleSandcrawler, "tsand"),
}
switch {
case *mapperName != "":
if f, ok := availableMappers[*mapperName]; !ok {
log.Fatalf("unknown mapper name: %v", *mapperName)
} else {
- pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.TSV)
+ pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.AsTSV)
pp.NumWorkers = *numWorkers
pp.BatchSize = *batchSize
pp.Verbose = *verbose
diff --git a/skate/map.go b/skate/map.go
index 8d8094b..b06bf96 100644
--- a/skate/map.go
+++ b/skate/map.go
@@ -2,6 +2,7 @@ package skate
import (
"bytes"
+ "errors"
"reflect"
"runtime"
"strconv"
@@ -13,8 +14,12 @@ import (
var (
bTab = []byte("\t")
bNewline = []byte("\n")
+
+ ErrZeroFields = errors.New("zero fields")
+ ErrMissingFieldName = errors.New("missing field name")
)
+// Title is a document with a title.
type TitleDoc struct {
Title string `json:"title"`
}
@@ -35,9 +40,9 @@ type PartialDoc struct {
// doc). We want fields, but we do not want to bake in TSV into each function.
type Mapper func([]byte) ([][]byte, error)
-// TSV serialized the result of a field mapper as TSV. This is a slim adapter,
+// AsTSV serialized the result of a field mapper as AsTSV. This is a slim adapter,
// e.g. to parallel.Processor, which expects this function signature.
-func (f Mapper) TSV(p []byte) ([]byte, error) {
+func (f Mapper) AsTSV(p []byte) ([]byte, error) {
fields, err := f(p)
if err != nil {
return nil, err
@@ -45,6 +50,21 @@ func (f Mapper) TSV(p []byte) ([]byte, error) {
return append(bytes.Join(fields, bTab), bNewline...), nil
}
+// WithPrefix adds a given prefix to the first element.
+func WithPrefix(f Mapper, prefix string) Mapper {
+ return func(p []byte) ([][]byte, error) {
+ fields, err := f(p)
+ if err != nil {
+ return fields, err
+ }
+ if len(fields) == 0 {
+ return nil, ErrZeroFields
+ }
+ fields[0] = append([]byte(prefix+":"), fields[0]...)
+ return fields, err
+ }
+}
+
// NameOf returns name of value, e.g. the name of a function.
func NameOf(f interface{}) string {
v := reflect.ValueOf(f)
@@ -64,6 +84,11 @@ func Identity(p []byte) ([][]byte, error) {
// CreateFixedMapper extract the value from a given fixed top level json key.
// Returns a function that maps doc to (v, doc).
func CreateFixedMapper(field string) Mapper {
+ if field == "" {
+ return func(p []byte) ([][]byte, error) {
+ return nil, ErrMissingFieldName
+ }
+ }
f := func(p []byte) ([][]byte, error) {
var (
doc map[string]interface{}