Skip to content

Message Queue

A channel-based message queue implementation with basic queue operations.

Installation

go
import "github.com/leoheung/go-patterns/container/msgQueue"

API Reference

Create a Message Queue

go
// Create a new message queue with specified capacity and device ID
mq := msgqueue.NewChanMQ(100, "device-1")

Enqueue

go
// Enqueue a message
err := mq.Enq([]byte("hello world"))

Dequeue

go
// Dequeue a message with context
ctx := context.Background()
msg, err := mq.Deq(ctx)

Queue Operations

go
// Get current queue length
length := mq.Len()

// Clear all messages
err := mq.Clear()

// Check if queue is live
isLive := mq.IsLive()

// Renew a dead queue
mq.Renew()

// Destroy the queue
mq.Destroy()

Complete Example

go
package main

import (
    "context"
    "fmt"
    "time"
    "github.com/leoheung/go-patterns/container/msgQueue"
)

func main() {
    // Create a message queue with capacity 10
    mq := msgqueue.NewChanMQ(10, "test-device")
    defer mq.Destroy()

    // Start a goroutine to consume messages
    go func() {
        ctx := context.Background()
        for i := 0; i < 5; i++ {
            msg, err := mq.Deq(ctx)
            if err != nil {
                fmt.Printf("Error dequeuing: %v\n", err)
                return
            }
            fmt.Printf("Received message: %s\n", string(msg))
            time.Sleep(500 * time.Millisecond)
        }
    }()

    // Enqueue messages
    for i := 0; i < 5; i++ {
        msg := fmt.Sprintf("Message %d", i)
        if err := mq.Enq([]byte(msg)); err != nil {
            fmt.Printf("Error enqueuing: %v\n", err)
            return
        }
        fmt.Printf("Sent message: %s\n", msg)
    }

    // Wait for all messages to be processed
    time.Sleep(3 * time.Second)
}

Features

  • Channel-based: Built on Go channels for efficient concurrent operations
  • Context support: Supports cancellation via context
  • Lifecycle management: Create, renew, and destroy queues
  • Thread-safe: Safe for concurrent use by multiple goroutines

Released under the MIT License.