一个goroutine数据流任务的暂停⏸️与恢复⏯

发布于 2018-07-13 作者 超级苦工 421次 浏览 版块 分享

熟悉go编程的同学,肯定都用过time.Sleep来暂停goroutine的执行,但是time.Sleep无法实现按照事件暂停和恢复。换句话说,你一旦设定了暂停时间,那后面的事情就由不得你了,你设了暂停10秒就是10秒,设了1分钟就是1分钟,而且你没法“永远暂停”下去。

那么现在问题就来了,我有一个数据流的播放任务,希望做到用户点击暂停⏸️按钮(发出暂停信号)的时候,播放任务被暂停(不再输出数据),而用户点击恢复⏯按钮(发出恢复信号)的时候,播放任务从被暂停的地方继续。这个暂停和恢复可以按照用户的意愿进行无数多次,暂停多久不能预先设定,而是看用户的心情😄

关于这个问题,我在网上找了挺久,并没有找到特别好的例子,直到看到《Go并发编程实战》这本书才得到了启示。

Go并发编程实战

select 语句

select语句是一个仅能被用于发送和接收通道中的元素值的专用语句,一个select语句在被执行的时候会根据通道的值选择执行其中的某一个分支,通道里没有值的时候就走default分支(前提是你写了这么一个default分支)

现在利用select语句,我们可以写出控制数据流的关键代码(为方便起见,在此我们把数据流简化为一个循环输出)

// 略去一些声明
循环总次数 := 10000
运行信号 := make(chan struct{})
for i := 0; i < 循环总次数; {
    select {
    case <-运行信号:
        fmt.Printf("数据流播放到:%d\n", i)
        time.Sleep(1 * time.Second)    // 让播放显得慢一点,每次停1秒
        i++
    default:
        continue   // 暂停时保持空转
    }
}

我们要暂停数据流播放的时候,只需要让

运行信号 = nil

即可。

此时,名为“运行信号”的通道里没有值,所以select语句只会走default分支,那么整个for循环就进入到了一个无限空转的过程中,直到下一次“运行信号”里再有值才会继续数据流的播放。


Task 数据流的模拟

package task   // task.go
import "log"

// Task 任务结构体
type Task struct {
    任务编号      string
    循环总次数     int
    是否完成      bool
    工作信号       <-chan struct{}
    工作信号备份   <-chan struct{}
}

// NewTask 新任务
func NewTask(任务编号 string, 循环总次数 int) *Task {
    ch := make(chan struct{})
    defer close(ch)

    return &Task{
        任务编号:       任务编号,
        循环总次数:    循环总次数,
        工作信号:       ch,
        工作信号备份:   ch,
    }
}

// Play 开始运行
func (t *Task) Play() {
    log.Printf("启动任务:%s , 次数: %d\n", t.任务编号, t.循环总次数)
    for i := 0; i < t.循环总次数; {      // 用循环模拟数据流
        select {
        case <-t.工作信号:
            log.Printf("任务 %s @ %d\n", t.任务编号, i)
            time.Sleep(1 * time.Second)    // 让模拟数据流走得慢点        
            i++
        default:    
            continue // ⚠️ 这个 continue 是Pause时运行 空转的
        }
    }
    t.是否完成 = true
}

// Pause 暂停
func (t *Task) Pause() {
    if !t.是否完成 {
        t.工作信号 = nil            
        log.Printf("%s # 暂停 \n", t.任务编号)
    } else {
        log.Printf("%s # 已运行完毕,不能暂停 \n", t.任务编号)
    }
}

// Resume 恢复执行
func (t *Task) Resume() {
    if !t.是否完成 {
        t.工作信号 = t.工作信号备份           
        log.Printf("%s # 恢复执行 \n", t.任务编号)
    } else {
        log.Printf("%s # 已运行完毕,不能恢复 \n", t.任务编号)
    }
}

主程序 模拟多次暂停与恢复

package main

import (
    "(...路径)/task"
    "log"
    "sync"
    "time"
)

func main() {
    任务1名称 := "task #15"
    t1 := task.NewTask(任务1名称, 15)    // 创建一个数据流任务

    var wg sync.WaitGroup
    wg.Add(5)

    go func() {
        defer wg.Done()
        log.Println("任务启动")
        t1.Play()  
        log.Println("播放结束")         
    }()

    go func() {
        defer wg.Done()
        time.Sleep(4 * time.Second)
        log.Printf("%s 启动4秒后,试图暂停\n", 任务1名称)
        t1.Pause()
    }()

    go func() {
        defer wg.Done()
        time.Sleep(8 * time.Second)
        log.Printf("%s 启动暂停8秒后,试图恢复\n", 任务1名称)
        t1.Resume()
    }()

    go func() {
        defer wg.Done()
        time.Sleep(12*time.Second)
        log.Printf("%s 启动12秒后,再次暂停 \n", 任务1名称)
        t1.Pause()
    }()

    go func() {
        defer wg.Done()
        time.Sleep(15 * time.Second)
        log.Printf("%s 启动15秒后,再次恢复\n", 任务1名称)
        t1.Resume()
    }()

    wg.Wait()
}

小结

以上模拟了一个最简单的数据流的多次暂停与恢复,我们可以根据实际项目中的需要来扩展Task以及其调用。

Go并发编程实战》写的非常细致,Go并发的原理讲得很清楚,也有很多贴近实战的代码,对于想深入学习Go语言的同学来说是本很好的教程。

收藏
暂无回复