Skip to content

消息隊列

基於 Channel 的消息隊列實現,具備基本隊列操作。

安裝

go
import "github.com/leoheung/go-patterns/container/msgQueue"

API 參考

建立消息隊列

go
// 以指定容量及裝置 ID 建立新的消息隊列
mq := msgqueue.NewChanMQ(100, "device-1")

入隊

go
// 將消息加入隊列
err := mq.Enq([]byte("hello world"))

出隊

go
// 以 Context 從隊列取出消息
ctx := context.Background()
msg, err := mq.Deq(ctx)

隊列操作

go
// 取得目前隊列長度
length := mq.Len()

// 清空所有消息
err := mq.Clear()

// 檢查隊列是否運作中
isLive := mq.IsLive()

// 重新啟動已停止的隊列
mq.Renew()

// 銷毀隊列
mq.Destroy()

完整範例

go
package main

import (
    "context"
    "fmt"
    "time"
    "github.com/leoheung/go-patterns/container/msgQueue"
)

func main() {
    // 建立容量為 10 的消息隊列
    mq := msgqueue.NewChanMQ(10, "test-device")
    defer mq.Destroy()

    // 啟動 Goroutine 消費消息
    go func() {
        ctx := context.Background()
        for i := 0; i < 5; i++ {
            msg, err := mq.Deq(ctx)
            if err != nil {
                fmt.Printf("出隊錯誤: %v\n", err)
                return
            }
            fmt.Printf("收到消息: %s\n", string(msg))
            time.Sleep(500 * time.Millisecond)
        }
    }()

    // 入隊消息
    for i := 0; i < 5; i++ {
        msg := fmt.Sprintf("消息 %d", i)
        if err := mq.Enq([]byte(msg)); err != nil {
            fmt.Printf("入隊錯誤: %v\n", err)
            return
        }
        fmt.Printf("發送消息: %s\n", msg)
    }

    // 等待所有消息處理完成
    time.Sleep(3 * time.Second)
}

特性

  • 基於 Channel: 以 Go Channel 建立,實現高效並發操作
  • 支援 Context: 透過 Context 支援取消操作
  • 生命週期管理: 建立、重新啟動及銷毀隊列
  • 線程安全: 可安全地由多個 Goroutine 並發使用

Released under the MIT License.