Golang read a csv file of 144513 records concurrently

0

I am using the following file from the geonames database:

  

link

The geographic database GeoNames covers all countries and contains more than eleven million place names that are available for download for free.

The file has: 144513 records.

And I am doing a program in go to read it concurrently and dynamically generate some curl commands to make a post to the Elasticsearch api and that the documents can be loaded. That is, I am converting the records from the csv format file to json format to load them with the curl command in Elasticsearch.

The script that I am executing is the following:

package main

import (
    "encoding/csv"
    "fmt"
    "os"
    "os/exec"
    "github.com/satori/go.uuid"
    "runtime"
)

const (
    FILE                       = "./MX.txt"
    TOTAL_ROWS                 = 144513
    //TOTAL_ROWS                 = 5
    ELASTICSEARCH_INDEX string = "mx"
    ELASTICSEARCH_TYPE  string = "postal_code"
    FILE_DELIMITER             = '\t'
)

type Document struct {
    cp, colonia, ciudad, delegacion, lat, lon string
}

func main() {

    runtime.GOMAXPROCS(4)

    var c chan Document = make(chan Document)

    go readFile(c)
    go toCurl(c)

    var input string
    fmt.Scanln(&input)

}

func readFile(c chan Document) {

    file, err := os.Open(FILE)
    printError(err)
    defer file.Close()

    reader := csv.NewReader(file)
    reader.Comma = FILE_DELIMITER

    rows, err := reader.ReadAll()
    printError(err)

    for n, each := range rows {

        n++
        if n <= TOTAL_ROWS {
            c <- Document{
                    cp: each[1], 
                    colonia: each[2], 
                    ciudad: each[3], 
                    delegacion: each[5], 
                    lat: each[9], 
                    lon: each[10],
                }
        }
    }

}

func toCurl(c chan Document) string {

    for {

        id := uuid.NewV4().String()

        d := <-c

        curl :=
            '
            curl -u elastic:changeme  -X PUT "http://$ELASTICSEARCH_HOSTS:$ELASTICSEARCH_PORT/' + ELASTICSEARCH_INDEX + '/' + ELASTICSEARCH_TYPE + '/' + id + '" -d "
            {
                \"cp\"         : \"' + d.cp + '\",
                \"colonia\"    : \"' + d.colonia + '\",
                \"ciudad\"    : \"' + d.ciudad + '\",
                \"delegacion\"    : \"' + d.delegacion + '\",
                \"location\": {
                    \"lat\": ' + d.lat + ',
                    \"lon\": ' + d.lon + '
                }
            }"
        '
        //fmt.Println(curl)
        out, err := exec.Command("sh", "-c", curl).Output()
        printError(err)
        fmt.Printf("%v\n\n", string(out))
    }

}

func printError(err error) {
    if err != nil {
        fmt.Printf("\nError: %v \n ", err.Error())
        os.Exit(1)
    }
}

And it works.
But my question is:

I want to know if I am really making good use of the concepts of channels and goroutines to make the process better and faster. Or if there is any suggestion from you to improve the program or the processes.

    
asked by Sanx 29.08.2017 в 19:04
source

1 answer

0

We can say that what you have implemented with channels is this exactly:

This as you define in the comments is using Bulk type calls, but since this case is an implementation that you can have in more than one case where there are no bulk operations you can use what is called a queue with a buffer and use a pull of connections so you will go much faster processing, in the case you mention above it is possible to use a design such that:

Having two elements that go up to Elastic could process 50% faster on paper.

    
answered by 17.01.2018 в 15:37