Method list
Preparation and requirements
For prerequisites and requirements to use the Pub/Sub Go SDK, refer to the SDK overview documentation.
Topic
Retrieve topic
You can retrieve a topic using the methods below.
Method
| Type | Method | Description | 
|---|---|---|
| Client | Topic(name string) *Topic | Returns a topic object regardless of existence; allows configuration such as Publish Setting | 
| Topic | Exists(ctx context.Context) (bool, error) | Checks from the server whether the topic actually exists | 
| Topic | Config(ctx context.Context) (*TopicConfig, error) | Retrieves information about the topic | 
Parameters
No parameters.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    topic := client.Topic(name)
    exist, err := topic.Exists(ctx)
    if err != nil {
        return fmt.Errorf("topic.Exists: %v", err)
    }
    if !exist {
        return fmt.Errorf("not found topic")
    }
    fmt.Printf("Topic: %v\n", topic)
    tc, err := topic.Config(ctx)
    if err != nil {
        return fmt.Errorf("topic.Config: %v", err)
    }
    fmt.Printf("Topic config : %+v\n", tc)
    return nil
}
Retrieve topic list
Retrieves the list of topics in a project.
Method
| Type | Method | 
|---|---|
| Client | Topics(ctx context.Context) *TopicIterator | 
Parameters
No parameters.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    it := client.Topics(ctx)
    for {
        topic, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return fmt.Errorf("Next: %v", err)
        }
        fmt.Printf("Topic: %v\n", topic)
    }
    return nil
}
Retrieve subscription list of topic
Retrieves the list of subscriptions for a specific topic.
Method
| Type | Method | 
|---|---|
| Topic | Subscriptions(ctx context.Context) *SubscriptionIterator | 
Parameters
No parameters.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    topic := client.Topic(name)
    it := topic.Subscriptions(ctx)
    for {
        sub, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return fmt.Errorf("Next: %v", err)
        }
        fmt.Printf("Subscription: %v\n", sub)
    }
    return nil
}
Modify topic
You can update the retention period and description of a specific topic.
Method
| Type | Method | 
|---|---|
| Topic | Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error) | 
Parameters
| Type | Field | Description | 
|---|---|---|
| TopicConfigToUpdate | cfg | Object containing information to update the topic | 
pubsub.TopicConfigToUpdate
| Type | Field | Description | 
|---|---|---|
| time.Duration | RetentionDuration | Message retention period - Default: 604,800s(7 days) | 
| string | Description | Topic description | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    topic := client.Topic(name)
    cfg := &pubsub.TopicConfigToUpdate{
        RetentionDuration: duration,
        Description: "new description",
    }
    tc, err := topic.Update(ctx, cfg)
    if err != nil {
        return fmt.Errorf("Update: %v\n", err)
    }
    fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
    return nil
}
Create topic
You can create a new topic using the methods below.
Method
| Type | Method | Description | 
|---|---|---|
| Client | CreateTopic(ctx context.Context, name string) (*Topic, error) | Retention period is set to default (7 days) | 
| Client | CreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error) | Allows specifying topic configuration | 
Parameters
| Type | Field | Description | 
|---|---|---|
| TopicConfig | cfg | Object containing information about the topic to be created | 
pubsub.TopicConfig
| Type | Field | Description | 
|---|---|---|
| time.Duration | RetentionDuration | Message retention period - Default: 604,800s(7 days) | 
| string | Description | Topic description | 
Example
import (
   "context"
   "fmt"
   "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicCreate(domainID, projectID, name string) error {
   ctx := context.Background()
   client, err := pubsub.NewClient(ctx, domainID, projectID)
   if err != nil {
      return fmt.Errorf("pubsub.NewClient: %v", err)
   }
   defer client.Close()
   topic, err := client.CreateTopic(ctx, name)
   if err != nil {
      return fmt.Errorf("CreateTopic: %v", err)
   }
   fmt.Printf("Topic created: %v\n", topic)
   return nil
}
import (
   "context"
   "fmt"
   "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
   "time"
)
func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
   ctx := context.Background()
   client, err := pubsub.NewClient(ctx, domainID, projectID)
   if err != nil {
      return fmt.Errorf("pubsub.NewClient: %v", err)
   }
   defer client.Close()
   config := pubsub.TopicConfig{
      RetentionDuration: duration,
      Description: "topic description",
   }
   topic, err := client.CreateTopicWithConfig(ctx, name, &config)
   if err != nil {
      return fmt.Errorf("CreateTopicWithConfig: %v", err)
   }
   fmt.Printf("Topic created: %v\n", topic)
   return nil
}
Delete topic
You can delete a topic using the method below.
When a topic is deleted, all associated subscriptions are also deleted.
Method
| Type | Method | 
|---|---|
| Topic | Delete(ctx context.Context) error | 
Parameters
No parameters.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicDelete(domainID, projectID, name string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    topic := client.Topic(name)
    err = topic.Delete(ctx)
    if err != nil {
        return fmt.Errorf("Delete: %v\n", err)
    }
    return nil
}
Subscription
Retrieve subscription
Check whether a subscription exists.
Method
| Type | Method | Description | 
|---|---|---|
| Client | Subscription(name string) *Subscription | Returns a subscription object regardless of existence - Enables configuration such as ReceiveSettings | 
| Subscription | Exists(ctx context.Context) (bool, error) | Checks from the server whether the subscription actually exists | 
| Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | Retrieves information about the subscription | 
Parameters
No parameters.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    exist, err := subscription.Exists(ctx)
    if err != nil {
        return fmt.Errorf("subscription.Exists: %v", err)
    }
    if !exist {
        return fmt.Errorf("not found subscription")
    }
    fmt.Printf("Subscription: %v\n", subscription)
    sc, err := subscription.Config(ctx)
    if err != nil {
        return fmt.Errorf("subscription.Config: %v", err)
    }
    fmt.Printf("Subscription config : %+v\n", sc)
    return nil
}
Retrieve subscription list
Retrieves the list of subscriptions belonging to the domain and project set in the client.
Method
| Type | Method | Description | 
|---|---|---|
| Client | Subscriptions(ctx context.Context) *SubscriptionIterator | Returns an iterator for retrieving actual subscription info | 
Parameters
No parameters.
Example
import (
    "context"
    "fmt"
    "google.golang.org/api/iterator"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    it := client.Subscriptions(ctx)
    for {
        subscription, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return fmt.Errorf("Next: %v", err)
        }
        fmt.Printf("Subscription: %v\n", subscription)
    }
    return nil
}
Modify subscription
Updates a subscription. At least one field must be provided for modification.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult | Updates the subscription based on SubscriptionConfigToUpdatevalues | 
Parameters
| Type | Field | Description | 
|---|---|---|
| SubscriptionConfigToUpdate | cfg | Configuration values for the update | 
pubsub.SubscriptionConfigToUpdate
| Type | Field | Description | 
|---|---|---|
| time.Duration | AckDeadline | Acknowledgment deadline | 
| time.Duration | RetentionDuration | Message retention duration - Default: 604,800s(7 days) | 
| int | MaxDeliveryAttempt | Max delivery attempts - Min: 1, Max: infinite (1~100)- Default: -1 | 
| PushConfig | PushConfig | Configuration for Push subscription | 
| ObjectStorageConfig | ObjectStorageConfig | Configuration for Object Storage subscription | 
pubsub.PushConfig
| Type | Field | Description | 
|---|---|---|
| string | Endpoint | Target endpoint to deliver messages | 
| int | BatchSize | Number of messages sent per batch - Default: 1 | 
pubsub.ObjectStorageConfig
| Type | Field | Description | 
|---|---|---|
| string | Bucket | Name of Object Storage bucket | 
| int | ExportIntervalMinutes | Export interval to Object Storage - Default: 5 | 
| string | FilePrefix | Prefix for exported file | 
| string | FileSuffix | Suffix for exported file | 
pubsub.SubscriptionResult
| Method | Description | 
|---|---|
| Get(ctx context.Context) (string, error) | Retrieves the final status of the update request (blocks until complete). ⚠️ Enabled only with option.WithWaitStatus() | 
| Name() string | Returns the name of the requested subscription | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
    "time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    updateConfig := &pubsub.SubscriptionConfigToUpdate{
        PushConfig: &pubsub.PushConfig{
            Endpoint:  "http://localhost:8888/push",
            BatchSize: 100,
        },
        AckDeadline:        30 * time.Second,
        RetentionDuration:  48 * time.Hour,
        MaxDeliveryAttempt: 20,
    }
    result := subscription.Update(ctx, updateConfig, option.WithWaitStatus())
    status, err := result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Update: %v\n", err)
    }
    fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)
    return nil
}
Rewind subscription time
Rewind a subscription to a specified point in time and retrieve all messages published after that time.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult | Rewinds the subscription to the specified time | 
Parameters
| Type | Field | Description | 
|---|---|---|
| time.Time | t | Time to which the subscription should be rewound | 
pubsub.SubscriptionResult
| Type | Method | Description | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | Returns the final state of the subscription request (blocking until complete). ⚠️ Requires option.WithWaitStatus() | 
| SubscriptionResult | Name() string | Returns the name of the requested subscription | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
    "time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())
    _, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Seek: %v\n", err)
    }
    return nil
}
Create subscription
You can create a subscription using the following method.
Method
| Type | Method | Description | 
|---|---|---|
| Client | CreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResult | Creates a subscription (default message retention: 7 days) | 
Parameters
| Type | Field | Description | 
|---|---|---|
| SubscriptionConfig | cfg | Object containing subscription settings | 
pubsub.SubscriptionConfig
| Type | Field | Description | 
|---|---|---|
| string | Topic | Topic name | 
| time.Duration | AckDeadline | Acknowledgment deadline | 
| time.Duration | RetentionDuration | Message retention duration - Default: 604,800s(7 days) | 
| PushConfig | PushConfig | Settings for push subscription | 
| ObjectStorageConfig | ObjectStorageConfig | Settings for object storage subscription | 
| int | MaxDeliveryAttempt | Max delivery attempts - Default: 1(unlimited) | 
pubsub.PushConfig
| Type | Field | Description | 
|---|---|---|
| string | Endpoint | Destination endpoint for messages | 
| int | BatchSize | Number of messages per batch - Default: 1 | 
pubsub.ObjectStorageConfig
| Type | Field | Description | 
|---|---|---|
| string | Bucket | Object Storage bucket name | 
| int | ExportIntervalMinutes | Export interval (min: 1, max:10, default:5)⚠️ At least one file is created per interval. Files may also be created before interval ends | 
| string | FilePrefix | File prefix | 
| string | FileSuffix | File suffix | 
pubsub.SubscriptionResult
| Type | Method | Description | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | Returns the final state of the subscription request (blocking) ⚠️ Requires option.WithWaitStatus() | 
| SubscriptionResult | Name() string | Name of the created subscription | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
    "time"
)
func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    pullSubConfig := &pubsub.SubscriptionConfig{
        Topic:              topicName,
        AckDeadline:        15 * time.Second,
        RetentionDuration:  24 * time.Hour,
        MaxDeliveryAttempt: 10,
    }
    pushSubConfig := &pubsub.SubscriptionConfig{
        Topic:              topicName,
        AckDeadline:        15 * time.Second,
        RetentionDuration:  24 * time.Hour,
        MaxDeliveryAttempt: 10,
        PushConfig: &pubsub.PushConfig{
            Endpoint:  "http://localhost:9999/push",
            BatchSize: 1,
        },
    }
    objectStorageSubConfig := &pubsub.SubscriptionConfig{
        Topic:             topicName,
        RetentionDuration: 24 * time.Hour,
        ObjectStorageConfig: &pubsub.ObjectStorageConfig{
            Bucket:                "bucket",
            ExportIntervalMinutes: 2,
            FilePrefix:            "prefix",
            FileSuffix:            "suffix",
        },
    }
    result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())
    status, err := result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Create: %v\n", err)
    }
    fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
    result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())
    status, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Create: %v\n", err)
    }
    fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
    result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())
    status, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Create: %v\n", err)
    }
    fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
    return nil
}
Delete subscription
Delete a subscription using the method below.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult | Deletes subscription | 
Parameters
pubsub.SubscriptionResult
| Type | Method | Description | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | Returns final subscription status (blocking). ⚠️ Requires option.WithWaitStatus() | 
| SubscriptionResult | Name() string | Name of the subscription request | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
)
func subscriptionDelete(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    result := subscription.Delete(ctx, option.WithWaitStatus())
    _, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Delete: %v\n", err)
    }
    fmt.Printf("Subscription deleted %s\n", result.Name())
    return nil
}
Message
Publish message
You can publish messages to a topic.
Method
| Type | Method | 
|---|---|
| PublishResult | Publish(ctx context.Context, msg *Message) | 
Parameters
| Type | Field | Description | 
|---|---|---|
| Message | msg | Message to be published | 
pubsub.Message
| Type | Field | Description | 
|---|---|---|
| string | Data | Message body ⚠️ Must be Base64-encoded | 
| map[string]string | Attributes | Message attributes in key-value map format ⚠️ Keys starting with kakaocare not allowed | 
pubsub.PublishSettings
| Type | Field | Description | 
|---|---|---|
| time.Duration | DelayThreshold | Send batched messages to server at this interval - Default: 100ms | 
| int | CountThreshold | Batch this many messages before sending - Default: 100 | 
| int | NumGoroutines | Max number of goroutines - Default: 2, Max:10 | 
| time.Duration | Timeout | Timeout for server requests - Default: 60s | 
| FlowControlSettings | FlowControlSettings | Controls message flow to prevent overload | 
pubsub.FlowControlSettings
| Type | Field | Description | 
|---|---|---|
| int | MaxOutstandingMessages | Max number of unprocessed messages - Default: 1000 | 
| int | MaxOutstandingBytes | Max size of unprocessed messages - Default: 1G | 
| LimitExceededBehavior | LimitExceededBehavior | Behavior when limit exceeded: - ignore: No flow control- block(default): Wait- signal error: Return error | 
pubsub.PublishResult
| Type | Method | Description | 
|---|---|---|
| string | Get(ctx context.Context) | Retrieves message ID from server - Blocks until message is published | 
Example
import (
	"context"
	"encoding/base64"
	"fmt"
	"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, domainID, projectID)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %v", err)
	}
	defer client.Close()
	topic := client.Topic(name)
	result := topic.Publish(ctx, &pubsub.Message{
		Data:       base64.StdEncoding.EncodeToString([]byte(msg)),
		Attributes: attributes,
	})
	
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %v", err)
	}
	fmt.Printf("Published a message; msg ID: %v\n", id)
	return nil
}
Receive message asynchronously
Receive subscription messages asynchronously.
Available only for subscriptions of type Pull.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | Receives messages via streaming and executes callback function | 
Parameters
| Type | Field | Description | 
|---|---|---|
| func(ctx2 context.Context, message *Message) | callbackFunc | Callback function triggered per message | 
pubsub.ReceiveSettings
| Type | Field | Description | 
|---|---|---|
| int | NumGoroutines | Max number of concurrent goroutines - Default: 2, Max:10 | 
| FlowControlSettings | FlowControlSettings | Controls message flow | 
pubsub.FlowControlSettings
| Type | Field | Description | 
|---|---|---|
| int | MaxOutstandingMessages | Max number of unacknowledged messages - Default: 1000 | 
| int | MaxOutstandingBytes | Max size of unacknowledged messages - Default: 1G | 
| LimitExceededBehavior | LimitExceededBehavior | Behavior when limits exceeded - ignore: No flow control- block(default): Wait- signal error: Return error | 
pubsub.Message
| Type | Function | Description | 
|---|---|---|
| string | ID | Unique message ID | 
| string | Data | Message content | 
| map[string]string | Attributes | Message attributes | 
| time.Time | PublishTime | Publish timestamp | 
| *int | DeliveryAttempt | Delivery attempt count | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    subscription.ReceiveSettings.NumGoroutines = 10
    return subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
        // Process message
        message.Ack()
    })
}
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    subscription.ReceiveSettings.NumGoroutines = 10
    return subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
        // Process message
        ackResult := message.AckWithResult()
        status, err := ackResult.Get(ctx2) // blocks until acknowledge is complete
        if err != nil {
            fmt.Printf("Ack error: %v\n", err)
        }
        fmt.Printf("Ack status: %s\n", status)
    })
}
Receive message synchronously
Receive messages from a subscription synchronously.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | Retrieves messages in batch from subscription synchronously | 
Parameters
| Type | Field | Description | 
|---|---|---|
| int | maxMessages | Number of messages to pull in a batch | 
pubsub.PulledMessage
Read-only version of pubsub.Message.
| Type | Function | Description | 
|---|---|---|
| string | ID | Message ID | 
| string | Data | Message content | 
| map[string]string | Attributes | Message attributes | 
| time.Time | PublishTime | Time when published | 
| *int | DeliveryAttempt | Number of deliveries | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    maxMessages := 10
    msgs, err := subscription.Pull(ctx, maxMessages)
    if err != nil {
        return fmt.Errorf("subscription.Pull: %v", err)
    }
    for i, msg := range msgs {
        fmt.Printf("message [%v] : %v\n", i, msg)
    }
    return nil
}
Acknowledge message reception
Send acknowledgment for received messages using their AckIDs.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | Informs the system that the messages have been handled | 
Parameters
| Type | Field | Description | 
|---|---|---|
| []string | ackIDs | AckIDs list | 
pubsub.AcknowledgeResponse
| Type | Field | Description | 
|---|---|---|
| []*AcknowledgeFailureResult | Failure | Failed acknowledgment | 
pubsub.AcknowledgeFailureResult
| Type | Field | Description | 
|---|---|---|
| string | AckID | Failed AckID | 
| error | Error | Reason for failure | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    resp, err := subscription.Acknowledge(ctx, ackIDs)
    if err != nil {
        return fmt.Errorf("Acknowledge: %v", err)
    }
    if resp != nil {
        for _, failure := range resp.Failure {
            fmt.Printf("ack fail : %v\n", failure)
        }
    }
    return nil
}
Modify message ack deadline
Change the message acknowledgment deadline.
- Extend: Extends ack deadline by the subscription’s configured AckDeadline.
- Skip: Sets ack deadline to zero, making the message immediately available for redelivery.
Method
| Type | Method | Description | 
|---|---|---|
| Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | Modifies AckDeadline based on action specified | 
Parameters
| Type | Field | Description | 
|---|---|---|
| []string | ackIDs | AckIDs of the messages | 
| AckAction | action | pubsub.ActionExtendorpubsub.ActionSkip | 
pubsub.ModifyAckDeadlineResponse
| Type | Field | Description | 
|---|---|---|
| []*ModifyAckDeadlineSuccessResult | Success | Successful results | 
| []*ModifyAckDeadlineFailureResult | Failure | Failed results | 
pubsub.ModifyAckDeadlineSuccessResult
| Type | Field | Description | 
|---|---|---|
| string | AckID | Original AckID | 
| string | ReissuedAckID | New AckID after extend | 
pubsub.ModifyAckDeadlineFailureResult
| Type | Field | Description | 
|---|---|---|
| string | AckID | Failed AckID | 
| error | Error | Reason for failure | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    // Skip example
    resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
    if err != nil {
        return fmt.Errorf("ModifyAckDeadline (skip): %v", err)
    }
    // Extend example
    resp, err = subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
    if err != nil {
        return fmt.Errorf("ModifyAckDeadline (extend): %v", err)
    }
    return nil
}