Original post

I am getting emails from a database and then checking their spam score. There are duplicate emails in the database. I can group the results by the email hash to get a list of unique emails. This way I don’t check the spam score of the same email more than once. After I have the spam score for an email, I want to update all the rows in the database that have that hash.

So, to do this, I thought I could have a few workers querying the API, another goroutine to query the database to get all the records I need. Then have a function to save the records as the API returns results. It doesn’t matter what order I do this, so I thought this would be a good opportunity to learn about concurrency.

I’ve tried several variations of the code below. The output is also below. The program never quits. I think I’m having a hard time figuring out where to close which channel.

Thank you :slight_smile:

package main

import (
        "log"
        "strconv"

        "github.com/fatih/color"
        "github.com/golevi/comint/models"
        "github.com/golevi/spamcheck"
)

func worker(in, out chan models.Email) {
        for email := range in {
                color.Yellow("Working %sn", email.Hash)
                scr := spamcheck.NewRequest(email.Mail)
                resp, err := scr.CheckScore()
                if err != nil {
                        log.Println(err)
                }
                score, err := strconv.ParseFloat(resp.Score, 64)
                email.SpamScore = score
                out <- email
                // log.Println(email.Hash, email.SpamScore)
        }
        color.Yellow("Worker quitting")
}

func sendWorkIn(in chan models.Email, limit int) {
        var emails []models.Email
        models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
        color.Red("Found %d hashes", len(emails))

        for _, email := range emails {
                color.Cyan("Sending %sn", email.Hash)
                in <- email
        }
        close(in)
        color.Red("Finished sending emails")
}

func receiveResults(out chan models.Email, limit int) {
        color.Red("Receiving results...")
        tx := models.GetDB().Begin()
        for i := 0; i < limit; i++ {
                email := <-out
                tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
                color.Green("Updated %sn", email.Hash)
        }
        tx.Commit()
        color.Red("Finished updating database")
}

func main() {
        in, out := make(chan models.Email), make(chan models.Email)
        color.Red("Starting...")

        workers := 10
        for i := 0; i < workers; i++ {
                 worker(in, out)
        }
        limit := 100
         sendWorkIn(in, limit)
        // wait for all the work to get done
        receiveResults(out, limit)
}