博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang常见的几种并发模型框架
阅读量:5892 次
发布时间:2019-06-19

本文共 6577 字,大约阅读时间需要 21 分钟。

在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。

package mainimport (    "fmt"    "math/rand"    "os"    "runtime"    "sync"    "sync/atomic"    "time")type Scenario struct {    Name        string    Description []string    Examples    []string    RunExample  func()}var s1 = &Scenario{    Name: "s1",    Description: []string{        "简单并发执行任务",    },    Examples: []string{        "比如并发的请求后端某个接口",    },    RunExample: RunScenario1,}var s2 = &Scenario{    Name: "s2",    Description: []string{        "持续一定时间的高并发模型",    },    Examples: []string{        "在规定时间内,持续的高并发请求后端服务, 防止服务死循环",    },    RunExample: RunScenario2,}var s3 = &Scenario{    Name: "s3",    Description: []string{        "基于大数据量的并发任务模型, goroutine worker pool",    },    Examples: []string{        "比如技术支持要给某个客户删除几个TB/GB的文件",    },    RunExample: RunScenario3,}var s4 = &Scenario{    Name: "s4",    Description: []string{        "等待异步任务执行结果(goroutine+select+channel)",    },    Examples: []string{        "",    },    RunExample: RunScenario4,}var s5 = &Scenario{    Name: "s5",    Description: []string{        "定时的反馈结果(Ticker)",    },    Examples: []string{        "比如测试上传接口的性能,要实时给出指标: 吞吐率,IOPS,成功率等",    },    RunExample: RunScenario5,}var Scenarios []*Scenariofunc init() {    Scenarios = append(Scenarios, s1)    Scenarios = append(Scenarios, s2)    Scenarios = append(Scenarios, s3)    Scenarios = append(Scenarios, s4)    Scenarios = append(Scenarios, s5)}// 常用的并发与同步场景func main() {    if len(os.Args) == 1 {        fmt.Println("请选择使用场景 ==> ")        for _, sc := range Scenarios {            fmt.Printf("场景: %s ,", sc.Name)            printDescription(sc.Description)        }        return    }    for _, arg := range os.Args[1:] {        sc := matchScenario(arg)        if sc != nil {            printDescription(sc.Description)            printExamples(sc.Examples)            sc.RunExample()        }    }}func printDescription(str []string) {    fmt.Printf("场景描述: %s \n", str)}func printExamples(str []string) {    fmt.Printf("场景举例: %s \n", str)}func matchScenario(name string) *Scenario {    for _, sc := range Scenarios {        if sc.Name == name {            return sc        }    }    return nil}var doSomething = func(i int) string {    time.Sleep(time.Millisecond * time.Duration(10))    fmt.Printf("Goroutine %d do things .... \n", i)    return fmt.Sprintf("Goroutine %d", i)}var takeSomthing = func(res string) string {    time.Sleep(time.Millisecond * time.Duration(10))    tmp := fmt.Sprintf("Take result from %s.... \n", res)    fmt.Println(tmp)    return tmp}// 场景1: 简单并发任务func RunScenario1() {    count := 10    var wg sync.WaitGroup    for i := 0; i < count; i++ {        wg.Add(1)        go func(index int) {            defer wg.Done()            doSomething(index)        }(i)    }    wg.Wait()}// 场景2: 按时间来持续并发func RunScenario2() {    timeout := time.Now().Add(time.Second * time.Duration(10))    n := runtime.NumCPU()    waitForAll := make(chan struct{})    done := make(chan struct{})    concurrentCount := make(chan struct{}, n)    for i := 0; i < n; i++ {        concurrentCount <- struct{}{}    }    go func() {        for time.Now().Before(timeout) {            <-done            concurrentCount <- struct{}{}        }        waitForAll <- struct{}{}    }()    go func() {        for {            <-concurrentCount            go func() {                doSomething(rand.Intn(n))                done <- struct{}{}            }()        }    }()    <-waitForAll}// 场景3:以 worker pool 方式 并发做事/发送请求func RunScenario3() {    numOfConcurrency := runtime.NumCPU()    taskTool := 10    jobs := make(chan int, taskTool)    results := make(chan int, taskTool)    var wg sync.WaitGroup    // workExample    workExampleFunc := func(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {        defer wg.Done()        for job := range jobs {            res := job * 2            fmt.Printf("Worker %d do things, produce result %d \n", id, res)            time.Sleep(time.Millisecond * time.Duration(100))            results <- res        }    }    for i := 0; i < numOfConcurrency; i++ {        wg.Add(1)        go workExampleFunc(i, jobs, results, &wg)    }    totalTasks := 100    wg.Add(1)    go func() {        defer wg.Done()        for i := 0; i < totalTasks; i++ {            n := <-results            fmt.Printf("Got results %d \n", n)        }        close(results)    }()    for i := 0; i < totalTasks; i++ {        jobs <- i    }    close(jobs)    wg.Wait()}// 场景4: 等待异步任务执行结果(goroutine+select+channel)func RunScenario4() {    sth := make(chan string)    result := make(chan string)    go func() {        id := rand.Intn(100)        for {            sth <- doSomething(id)        }    }()    go func() {        for {            result <- takeSomthing(<-sth)        }    }()    select {    case c := <-result:        fmt.Printf("Got result %s ", c)    case <-time.After(time.Duration(30 * time.Second)):        fmt.Errorf("指定时间内都没有得到结果")    }}var doUploadMock = func() bool {    time.Sleep(time.Millisecond * time.Duration(100))    n := rand.Intn(100)    if n > 50 {        return true    } else {        return false    }}// 场景5: 定时的反馈结果(Ticker)// 测试上传接口的性能,要实时给出指标: 吞吐率,成功率等func RunScenario5() {    totalSize := int64(0)    totalCount := int64(0)    totalErr := int64(0)    concurrencyCount := runtime.NumCPU()    stop := make(chan struct{})    fileSizeExample := int64(10)    timeout := 10 // seconds to stop    go func() {        for i := 0; i < concurrencyCount; i++ {            go func(index int) {                for {                    select {                    case <-stop:                        return                    default:                        break                    }                    res := doUploadMock()                    if res {                        atomic.AddInt64(&totalCount, 1)                        atomic.AddInt64(&totalSize, fileSizeExample)                    } else {                        atomic.AddInt64(&totalErr, 1)                    }                }            }(i)        }    }()    t := time.NewTicker(time.Second)    index := 0    for {        select {        case <-t.C:            index++            tmpCount := atomic.LoadInt64(&totalCount)            tmpSize := atomic.LoadInt64(&totalSize)            tmpErr := atomic.LoadInt64(&totalErr)            fmt.Printf("吞吐率: %d,成功率: %d \n", tmpSize/int64(index), tmpCount*100/(tmpCount+tmpErr))            if index > timeout {                t.Stop()                close(stop)                return            }        }    }}

转载于:https://blog.51cto.com/11140372/2354901

你可能感兴趣的文章
从Oracle Public Yum为Oracle Linux建立本地的Yum源
查看>>
spring4+mybaits3整合—项目Demo
查看>>
PHP数据类型
查看>>
MySQL utf8mb4 字符集:支持 emoji 表情符号
查看>>
IOS开发-如何debug及处理闪退,My App Crashed,Now What? - P...
查看>>
马云_拥抱敏捷-----华为的SDN实践
查看>>
Cocos2d-x 3移动游戏编程
查看>>
Android开发——09Google I/O之让Android UI性能更高效(1)
查看>>
广度优先搜索知识总结
查看>>
Java多线程机制详解(转)
查看>>
在 SELECT 查询中使用表表达式
查看>>
我的友情链接
查看>>
(二) php if语句,switch语句,continue语句,return语句,for 、while、do while 循环
查看>>
Hadoop集群(第7期)_Eclipse开发环境设置
查看>>
ARC 下两种释放对象的方法
查看>>
scala中的continue和break
查看>>
edx 获取当前request
查看>>
算法导论-分治、最大子序列问题
查看>>
安卓中如何实现滑动导航
查看>>
Java-金额小数转换成中文大写金额
查看>>