Original post

I am getting emails from a database and then checking their spam score. There are duplicate emails in the database. I can group the results by the email hash to get a list of unique emails. This way I don’t check the spam score of the same email more than once. After I have the spam score for an email, I want to update all the rows in the database that have that hash.

So, to do this, I thought I could have a few workers querying the API, another goroutine to query the database to get all the records I need. Then have a function to save the records as the API returns results. It doesn’t matter what order I do this, so I thought this would be a good opportunity to learn about concurrency.

I’ve tried several variations of the code below. The output is also below. The program never quits. I think I’m having a hard time figuring out where to close which channel.

Thank you :slight_smile:

package main

import (
        "log"
        "strconv"

        "github.com/fatih/color"
        "github.com/golevi/comint/models"
        "github.com/golevi/spamcheck"
)

func worker(in, out chan models.Email) {
        for {
                select {
                case <-in:
                        email := <-in
                        color.Yellow("Working %sn", email.Hash)
                        scr := spamcheck.NewRequest(email.Mail)
                        resp, err := scr.CheckScore()
                        if err != nil {
                                log.Println(err)
                        }
                        score, err := strconv.ParseFloat(resp.Score, 64)
                        email.SpamScore = score
                        out <- email
                }
        }
}

func sendWorkIn(in chan models.Email) {
        var emails []models.Email
        models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(10).Find(&emails)
        // 10 emails, only five workers, it should block after five, until there is
        // a free worker.
        for _, email := range emails {
                color.Cyan("Sending %sn", email.Hash)
                in <- email
        }
}

func receiveResults(out chan models.Email) {
        for {
                select {
                case <-out:
                        email := <-out
                        models.GetDB().Model(&models.Email{}).UpdateColumn("spam_score = ?", email.SpamScore).Where("hash = ?", email.Hash)
                        color.Green("Updated %sn", email.Hash)
                }
        }
}

func main() {
        in, out := make(chan models.Email), make(chan models.Email)

        // create 5 gophers that are ready to work independently of each other
        for i := 0; i < 5; i++ {
                 worker(in, out)
        }
        // create 1 gopher to give everyone work
         sendWorkIn(in)
        // wait for all the work to get done
        receiveResults(out)
}

Output

Sending 05b82a690cad5cd31794be967fb55164
Sending 11d69cbcd9fd336453c6c4f45d6e803a
Sending 11fda4cd339296d0054a7495aa52243b
Sending 123035d472b801baf7a5f0b647cbcc7e
Sending 124dd24230899487d5563c4e999d0751
Sending 1254f9026e115647496ab32596c85235
Sending 125fc7e8ba9d6010e3ff462e2bcbffd8
Sending 12638ac494441f6fae4d20381973208a
Sending 12841b34d33cd686be207ef4bab691e7
Sending 1290fc5decda316bf853afd2816cdc0d
Working 12638ac494441f6fae4d20381973208a
Working 1254f9026e115647496ab32596c85235
Working 125fc7e8ba9d6010e3ff462e2bcbffd8
Working 12841b34d33cd686be207ef4bab691e7
Working 1290fc5decda316bf853afd2816cdc0d
Updated 1254f9026e115647496ab32596c85235
Updated 12841b34d33cd686be207ef4bab691e7